# Graph-Based Modeling and Analysis of Distributed Publish-Subscribe Systems

## A Comprehensive Six-Step Methodology

---

This notebook demonstrates a comprehensive methodology for analyzing distributed publish-subscribe (pub-sub) systems using graph-based modeling. The approach enables:

- **Predictive Analysis**: Identify critical components before failures occur
- **Structural Insights**: Detect anti-patterns and architectural vulnerabilities
- **Validation**: Correlate predictions with simulation outcomes
- **Visualization**: Multi-layer graph representations for complex systems

### Target Metrics

| Metric | Target | Description |
|--------|--------|-------------|
| Spearman Correlation | ‚â• 0.7 | Correlation between predicted criticality and actual failure impact |
| F1 Score | ‚â• 0.9 | Harmonic mean of precision and recall for critical component identification |
| Precision | ‚â• 0.9 | Correctly identified critical components / Total identified |
| Recall | ‚â• 0.85 | Correctly identified critical components / Actual critical components |

### Composite Criticality Formula

$$C_{score}(v) = \alpha \cdot C_B^{norm}(v) + \beta \cdot AP(v) + \gamma \cdot I(v)$$

Where:
- $C_B^{norm}(v)$ = Normalized betweenness centrality
- $AP(v)$ = Articulation point indicator (1 if node is an articulation point, 0 otherwise)
- $I(v)$ = Impact score based on reachability loss
- $\alpha, \beta, \gamma$ = Tunable weights (default: 0.4, 0.3, 0.3)

## Table of Contents

1. [Setup and Dependencies](#1.-Setup-and-Dependencies)
2. [Step 1: Graph Data Generation](#Step-1:-Graph-Data-Generation)
3. [Step 2: Neo4j Database Import](#Step-2:-Neo4j-Database-Import)
4. [Step 3: Graph Analysis](#Step-3:-Graph-Analysis)
5. [Step 4: Simulation and Validation](#Step-4:-Simulation-and-Validation)
6. [Step 5: Visualization](#Step-5:-Visualization)
7. [Complete Pipeline Execution](#Complete-Pipeline-Execution)
8. [Results Interpretation](#Results-Interpretation)

---

## 1. Setup and Dependencies

First, let's import the required libraries and define our configuration classes.

In [None]:
# Install required packages (uncomment if needed)
# !pip install networkx scipy neo4j matplotlib --quiet

import asyncio
import json
import math
import random
import statistics
import time
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
import warnings
warnings.filterwarnings('ignore')

# Core dependencies
import networkx as nx

# Optional dependencies
try:
    from scipy import stats as scipy_stats
    SCIPY_AVAILABLE = True
except ImportError:
    SCIPY_AVAILABLE = False
    print("‚ö†Ô∏è scipy not available - using fallback correlation calculation")

try:
    from neo4j import GraphDatabase
    NEO4J_AVAILABLE = True
except ImportError:
    NEO4J_AVAILABLE = False
    print("‚ö†Ô∏è neo4j driver not available - Neo4j import will be skipped")

print(f"‚úÖ NetworkX version: {nx.__version__}")
print(f"‚úÖ Python async support: enabled")

### Configuration Classes

We define enumerations and data classes to configure our analysis pipeline.

In [None]:
class Scenario(Enum):
    """Domain scenarios for pub-sub system generation"""
    GENERIC = "generic"
    IOT_SMART_CITY = "iot_smart_city"
    FINANCIAL_TRADING = "financial_trading"
    HEALTHCARE = "healthcare"
    ECOMMERCE = "ecommerce"


class CriticalityLevel(Enum):
    """Criticality classification levels based on composite score"""
    CRITICAL = "CRITICAL"   # Score >= 0.8
    HIGH = "HIGH"           # Score >= 0.6
    MEDIUM = "MEDIUM"       # Score >= 0.4
    LOW = "LOW"             # Score >= 0.2
    MINIMAL = "MINIMAL"     # Score < 0.2


# Target validation thresholds from research methodology
TARGET_SPEARMAN_CORRELATION = 0.7
TARGET_F1_SCORE = 0.9
TARGET_PRECISION = 0.9
TARGET_RECALL = 0.85

# Default criticality scoring weights
DEFAULT_ALPHA = 0.4  # Betweenness centrality weight
DEFAULT_BETA = 0.3   # Articulation point weight
DEFAULT_GAMMA = 0.3  # Impact score weight

print("Configuration classes defined successfully!")

In [None]:
@dataclass
class GraphConfig:
    """Configuration for graph generation"""
    scale: str = 'medium'
    scenario: Scenario = Scenario.IOT_SMART_CITY
    num_nodes: int = 10
    num_applications: int = 30
    num_topics: int = 20
    num_brokers: int = 3
    edge_density: float = 0.4
    antipatterns: List[str] = field(default_factory=list)
    seed: int = 42


@dataclass
class CriticalityScore:
    """Composite criticality score for a component"""
    component_id: str
    component_type: str
    betweenness_centrality: float
    is_articulation_point: bool
    impact_score: float
    composite_score: float
    criticality_level: CriticalityLevel


@dataclass
class ValidationResult:
    """Validation results comparing predictions to simulation outcomes"""
    precision: float
    recall: float
    f1_score: float
    spearman_correlation: float
    targets_met: Dict[str, bool]


print("Data classes defined successfully!")

---

## Step 1: Graph Data Generation

The first step involves generating realistic pub-sub system topologies. Our generator supports:

- **Multiple Scales**: From tiny (3 nodes) to xlarge (50+ nodes)
- **Domain Scenarios**: IoT, Financial Trading, Healthcare, eCommerce
- **QoS Profiles**: Different reliability/latency requirements per domain
- **Anti-pattern Injection**: SPOF, God Topics, Tight Coupling

### Multi-Layer Graph Model

The system is modeled as a multi-layer graph:

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ           Application Layer             ‚îÇ
‚îÇ   (Producers, Consumers, Prosumers)     ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                      ‚îÇ PUBLISHES/SUBSCRIBES
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ              Topic Layer                ‚îÇ
‚îÇ    (Message channels with QoS)          ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                      ‚îÇ ROUTES
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ             Broker Layer                ‚îÇ
‚îÇ      (Message routing nodes)            ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                      ‚îÇ RUNS_ON
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ         Infrastructure Layer            ‚îÇ
‚îÇ   (Physical/virtual compute nodes)      ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

In [None]:
class PubSubGraphGenerator:
    """
    Generates realistic pub-sub system graphs for different domain scenarios.
    
    The generator creates a multi-layer graph model representing:
    - Infrastructure nodes (servers, edge devices)
    - Message brokers
    - Topics with QoS profiles
    - Applications (producers, consumers, prosumers)
    - Relationships between all components
    """
    
    # Scale presets defining system size
    SCALES = {
        'tiny': {'nodes': 3, 'apps': 8, 'topics': 5, 'brokers': 1},
        'small': {'nodes': 5, 'apps': 15, 'topics': 10, 'brokers': 2},
        'medium': {'nodes': 10, 'apps': 30, 'topics': 20, 'brokers': 3},
        'large': {'nodes': 25, 'apps': 80, 'topics': 50, 'brokers': 5},
        'xlarge': {'nodes': 50, 'apps': 150, 'topics': 100, 'brokers': 8}
    }
    
    # Domain-specific application types
    APP_TYPES = {
        Scenario.GENERIC: ['ServiceA', 'ServiceB', 'Processor', 'Handler', 'Monitor'],
        Scenario.IOT_SMART_CITY: [
            'TrafficSensor', 'ParkingSensor', 'AirQualityMonitor', 'EmergencyDispatcher',
            'LightingController', 'WasteManager', 'WeatherStation', 'TransitTracker'
        ],
        Scenario.FINANCIAL_TRADING: [
            'MarketDataFeed', 'OrderProcessor', 'RiskEngine', 'TradeExecutor',
            'PositionTracker', 'ComplianceMonitor', 'MatchingEngine', 'PricingService'
        ],
        Scenario.HEALTHCARE: [
            'VitalSignsMonitor', 'PatientTracker', 'AlertDispatcher', 'MedicationManager',
            'LabResultsProcessor', 'ImagingService', 'BillingService', 'AppointmentScheduler'
        ],
        Scenario.ECOMMERCE: [
            'OrderService', 'InventoryManager', 'PaymentProcessor', 'ShippingCalculator',
            'RecommendationEngine', 'CartService', 'NotificationService', 'FraudDetector'
        ]
    }
    
    # Domain-specific topic patterns
    TOPIC_PATTERNS = {
        Scenario.GENERIC: ['events', 'data', 'commands', 'status', 'metrics'],
        Scenario.IOT_SMART_CITY: [
            'traffic/flow', 'traffic/congestion', 'parking/availability', 'air_quality/readings',
            'emergency/alerts', 'lighting/status', 'weather/current', 'transit/location'
        ],
        Scenario.FINANCIAL_TRADING: [
            'market/prices', 'market/quotes', 'orders/new', 'orders/filled',
            'trades/executed', 'risk/alerts', 'positions/updates', 'compliance/events'
        ],
        Scenario.HEALTHCARE: [
            'patient/vitals', 'patient/alerts', 'lab/results', 'imaging/completed',
            'medication/administered', 'appointments/scheduled', 'billing/claims'
        ],
        Scenario.ECOMMERCE: [
            'orders/created', 'inventory/updates', 'payments/processed', 'shipping/tracking',
            'recommendations/generated', 'notifications/sent', 'fraud/detected'
        ]
    }
    
    # QoS profiles by criticality level
    QOS_PROFILES = {
        'CRITICAL': {
            'reliability': 'RELIABLE',
            'durability': 'TRANSIENT_LOCAL',
            'deadline_ms': 10,
            'latency_budget_ms': 5
        },
        'HIGH': {
            'reliability': 'RELIABLE',
            'durability': 'VOLATILE',
            'deadline_ms': 50,
            'latency_budget_ms': 25
        },
        'MEDIUM': {
            'reliability': 'RELIABLE',
            'durability': 'VOLATILE',
            'deadline_ms': 100,
            'latency_budget_ms': 50
        },
        'LOW': {
            'reliability': 'BEST_EFFORT',
            'durability': 'VOLATILE',
            'deadline_ms': 500,
            'latency_budget_ms': 200
        }
    }
    
    def __init__(self, config: GraphConfig):
        self.config = config
        random.seed(config.seed)
    
    def generate(self) -> Dict[str, Any]:
        """Generate complete pub-sub system graph"""
        print(f"üîÑ Generating {self.config.scenario.value} scenario...")
        
        graph = {
            'metadata': {
                'scenario': self.config.scenario.value,
                'scale': self.config.scale,
                'generated_at': datetime.now().isoformat(),
                'seed': self.config.seed
            },
            'nodes': [],
            'brokers': [],
            'topics': [],
            'applications': [],
            'relationships': {
                'publishes_to': [],
                'subscribes_to': [],
                'runs_on': [],
                'routes': []
            }
        }
        
        # Generate all components
        self._generate_nodes(graph)
        self._generate_brokers(graph)
        self._generate_topics(graph)
        self._generate_applications(graph)
        
        # Generate relationships
        self._generate_runs_on(graph)
        self._generate_routes(graph)
        self._generate_pub_sub_relationships(graph)
        
        # Inject anti-patterns if specified
        if self.config.antipatterns:
            self._inject_antipatterns(graph)
        
        # Ensure connectivity
        self._ensure_connectivity(graph)
        
        return graph
    
    def _generate_nodes(self, graph: Dict):
        """Generate infrastructure nodes"""
        node_types = ['edge_gateway', 'fog_server', 'cloud_server', 'edge_device']
        
        for i in range(1, self.config.num_nodes + 1):
            node_type = node_types[(i - 1) % len(node_types)]
            zone = f'zone-{(i - 1) % 3 + 1}'
            
            graph['nodes'].append({
                'id': f'N{i}',
                'name': f'{node_type.replace("_", " ").title()} {i}',
                'type': node_type,
                'zone': zone,
                'cpu_capacity': random.choice([4, 8, 16, 32]),
                'memory_gb': random.choice([8, 16, 32, 64])
            })
    
    def _generate_brokers(self, graph: Dict):
        """Generate message brokers"""
        for i in range(1, self.config.num_brokers + 1):
            graph['brokers'].append({
                'id': f'B{i}',
                'name': f'Broker-{i}',
                'zone': f'zone-{(i - 1) % 3 + 1}',
                'max_connections': random.choice([1000, 5000, 10000]),
                'protocol': random.choice(['DDS', 'MQTT', 'AMQP'])
            })
    
    def _generate_topics(self, graph: Dict):
        """Generate topics with QoS profiles"""
        patterns = self.TOPIC_PATTERNS.get(self.config.scenario, self.TOPIC_PATTERNS[Scenario.GENERIC])
        qos_levels = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
        
        for i in range(1, self.config.num_topics + 1):
            pattern = patterns[(i - 1) % len(patterns)]
            qos_level = qos_levels[(i - 1) % len(qos_levels)]
            
            # Adjust QoS based on scenario
            if self.config.scenario == Scenario.FINANCIAL_TRADING:
                if 'market' in pattern or 'order' in pattern:
                    qos_level = 'CRITICAL'
            elif self.config.scenario == Scenario.HEALTHCARE:
                if 'vital' in pattern or 'alert' in pattern:
                    qos_level = 'CRITICAL'
            
            graph['topics'].append({
                'id': f'T{i}',
                'name': f'{pattern}_{i}',
                'qos': self.QOS_PROFILES[qos_level].copy(),
                'qos_level': qos_level,
                'message_rate_hz': self._get_message_rate(pattern),
                'avg_message_size_bytes': self._get_message_size(pattern)
            })
    
    def _generate_applications(self, graph: Dict):
        """Generate applications"""
        app_types_list = self.APP_TYPES.get(self.config.scenario, self.APP_TYPES[Scenario.GENERIC])
        criticality_levels = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
        role_types = ['PRODUCER', 'CONSUMER', 'PROSUMER']
        
        for i in range(1, self.config.num_applications + 1):
            app_type = app_types_list[(i - 1) % len(app_types_list)]
            criticality = criticality_levels[(i - 1) % len(criticality_levels)]
            role = role_types[(i - 1) % len(role_types)]
            
            graph['applications'].append({
                'id': f'A{i}',
                'name': f'{app_type}_{i}',
                'type': role,
                'criticality': criticality,
                'replicas': 1 if criticality == 'LOW' else random.choice([2, 3])
            })
    
    def _generate_runs_on(self, graph: Dict):
        """Generate application to node relationships"""
        for app in graph['applications']:
            node = random.choice(graph['nodes'])
            graph['relationships']['runs_on'].append({
                'from': app['id'],
                'to': node['id']
            })
    
    def _generate_routes(self, graph: Dict):
        """Generate topic to broker routing"""
        for topic in graph['topics']:
            num_brokers = random.randint(1, min(2, len(graph['brokers'])))
            selected_brokers = random.sample(graph['brokers'], num_brokers)
            for broker in selected_brokers:
                graph['relationships']['routes'].append({
                    'from': topic['id'],
                    'to': broker['id']
                })
    
    def _generate_pub_sub_relationships(self, graph: Dict):
        """Generate publish/subscribe relationships"""
        for app in graph['applications']:
            app_type = app['type']
            
            if app_type == 'PRODUCER':
                num_publish, num_subscribe = random.randint(1, 3), 0
            elif app_type == 'CONSUMER':
                num_publish, num_subscribe = 0, random.randint(1, 4)
            else:
                num_publish, num_subscribe = random.randint(1, 2), random.randint(1, 3)
            
            if num_publish > 0:
                for topic in random.sample(graph['topics'], min(num_publish, len(graph['topics']))):
                    graph['relationships']['publishes_to'].append({'from': app['id'], 'to': topic['id']})
            
            if num_subscribe > 0:
                for topic in random.sample(graph['topics'], min(num_subscribe, len(graph['topics']))):
                    graph['relationships']['subscribes_to'].append({'from': app['id'], 'to': topic['id']})
    
    def _inject_antipatterns(self, graph: Dict):
        """Inject specified anti-patterns for analysis validation"""
        for pattern in self.config.antipatterns:
            if pattern == 'spof':
                # Single Point of Failure: Route all topics through one broker
                if len(graph['brokers']) > 1:
                    spof_broker = graph['brokers'][0]
                    for route in graph['relationships']['routes']:
                        route['to'] = spof_broker['id']
                    print(f"  ‚ö†Ô∏è Injected SPOF: {spof_broker['id']}")
            
            elif pattern == 'god_topic':
                # God Topic: One topic has excessive subscribers
                if graph['topics']:
                    god_topic = graph['topics'][0]
                    for app in graph['applications']:
                        if random.random() < 0.7:
                            graph['relationships']['subscribes_to'].append({
                                'from': app['id'], 'to': god_topic['id']
                            })
                    print(f"  ‚ö†Ô∏è Injected God Topic: {god_topic['id']}")
    
    def _ensure_connectivity(self, graph: Dict):
        """Ensure all components are connected"""
        for app in graph['applications']:
            has_pub = any(r['from'] == app['id'] for r in graph['relationships']['publishes_to'])
            has_sub = any(r['from'] == app['id'] for r in graph['relationships']['subscribes_to'])
            if not has_pub and not has_sub:
                topic = random.choice(graph['topics'])
                graph['relationships']['subscribes_to'].append({'from': app['id'], 'to': topic['id']})
    
    def _get_message_rate(self, pattern: str) -> float:
        """Get realistic message rate based on scenario"""
        if self.config.scenario == Scenario.FINANCIAL_TRADING:
            return random.choice([100, 500, 1000]) if 'market' in pattern else random.choice([10, 50, 100])
        elif self.config.scenario == Scenario.HEALTHCARE:
            return random.choice([10, 20, 50]) if 'vital' in pattern else random.choice([1, 5, 10])
        return random.choice([1, 10, 50, 100])
    
    def _get_message_size(self, pattern: str) -> int:
        """Get realistic message size based on scenario"""
        if self.config.scenario == Scenario.FINANCIAL_TRADING:
            return random.choice([64, 128, 256])
        elif self.config.scenario == Scenario.IOT_SMART_CITY:
            return random.choice([32, 64, 128])
        return random.choice([128, 256, 512, 1024])


print("‚úÖ PubSubGraphGenerator class defined!")

### Generate a Sample Graph

Let's generate an IoT Smart City scenario to demonstrate the pipeline.

In [None]:
# Configure the graph generation
config = GraphConfig(
    scale='medium',
    scenario=Scenario.IOT_SMART_CITY,
    num_nodes=10,
    num_applications=30,
    num_topics=20,
    num_brokers=3,
    antipatterns=['spof'],  # Inject a Single Point of Failure
    seed=42
)

# Generate the graph
generator = PubSubGraphGenerator(config)
graph_data = generator.generate()

# Display summary
print(f"\nüìä Generated Graph Summary:")
print(f"   ‚Ä¢ Infrastructure Nodes: {len(graph_data['nodes'])}")
print(f"   ‚Ä¢ Message Brokers: {len(graph_data['brokers'])}")
print(f"   ‚Ä¢ Topics: {len(graph_data['topics'])}")
print(f"   ‚Ä¢ Applications: {len(graph_data['applications'])}")
print(f"   ‚Ä¢ Publish Relationships: {len(graph_data['relationships']['publishes_to'])}")
print(f"   ‚Ä¢ Subscribe Relationships: {len(graph_data['relationships']['subscribes_to'])}")
print(f"   ‚Ä¢ Routing Relationships: {len(graph_data['relationships']['routes'])}")

In [None]:
# Examine sample components
print("\nüìã Sample Components:\n")

print("Infrastructure Node:")
print(json.dumps(graph_data['nodes'][0], indent=2))

print("\nTopic with QoS:")
print(json.dumps(graph_data['topics'][0], indent=2))

print("\nApplication:")
print(json.dumps(graph_data['applications'][0], indent=2))

---

## Step 2: Neo4j Database Import

For persistent storage and advanced graph queries, we can import the generated graph into Neo4j.

**Note**: This step requires a running Neo4j instance. If unavailable, the pipeline continues with in-memory analysis.

### Neo4j Schema

```cypher
// Node types
(:Node {id, name, type, zone, cpu_capacity, memory_gb})
(:Broker {id, name, zone, max_connections, protocol})
(:Topic {id, name, qos_level, message_rate_hz})
(:Application {id, name, type, criticality, replicas})

// Relationships
(Application)-[:RUNS_ON]->(Node)
(Application)-[:PUBLISHES]->(Topic)
(Application)-[:SUBSCRIBES]->(Topic)
(Topic)-[:ROUTED_BY]->(Broker)
```

In [None]:
class Neo4jImporter:
    """
    Imports graph data into Neo4j database for persistent storage and advanced queries.
    """
    
    def __init__(self, uri: str, user: str, password: str, database: str = 'neo4j'):
        self.uri = uri
        self.user = user
        self.password = password
        self.database = database
        self.driver = None
    
    def connect(self) -> bool:
        """Establish connection to Neo4j"""
        if not NEO4J_AVAILABLE:
            print("‚ö†Ô∏è Neo4j driver not available")
            return False
        
        try:
            self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
            with self.driver.session(database=self.database) as session:
                session.run("RETURN 1")
            print("‚úÖ Connected to Neo4j")
            return True
        except Exception as e:
            print(f"‚ùå Failed to connect: {e}")
            return False
    
    def close(self):
        """Close database connection"""
        if self.driver:
            self.driver.close()
    
    def clear_database(self):
        """Clear all data from database"""
        if self.driver:
            with self.driver.session(database=self.database) as session:
                session.run("MATCH (n) DETACH DELETE n")
            print("üóëÔ∏è Database cleared")
    
    def create_schema(self):
        """Create database schema with constraints and indexes"""
        if not self.driver:
            return
        
        constraints = [
            "CREATE CONSTRAINT app_id IF NOT EXISTS FOR (a:Application) REQUIRE a.id IS UNIQUE",
            "CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (t:Topic) REQUIRE t.id IS UNIQUE",
            "CREATE CONSTRAINT broker_id IF NOT EXISTS FOR (b:Broker) REQUIRE b.id IS UNIQUE",
            "CREATE CONSTRAINT node_id IF NOT EXISTS FOR (n:Node) REQUIRE n.id IS UNIQUE"
        ]
        
        with self.driver.session(database=self.database) as session:
            for constraint in constraints:
                try:
                    session.run(constraint)
                except Exception:
                    pass
        print("üìê Schema created")
    
    def import_graph(self, graph_data: Dict):
        """Import complete graph data"""
        if not self.driver:
            print("‚ö†Ô∏è Not connected, skipping import")
            return
        
        print("üì• Importing graph data...")
        
        with self.driver.session(database=self.database) as session:
            # Import nodes
            for node in graph_data.get('nodes', []):
                session.run(
                    "MERGE (n:Node {id: $id}) SET n.name = $name, n.type = $type",
                    id=node['id'], name=node['name'], type=node.get('type', 'generic')
                )
            
            # Import brokers
            for broker in graph_data.get('brokers', []):
                session.run(
                    "MERGE (b:Broker {id: $id}) SET b.name = $name",
                    id=broker['id'], name=broker['name']
                )
            
            # Import topics
            for topic in graph_data.get('topics', []):
                session.run(
                    "MERGE (t:Topic {id: $id}) SET t.name = $name, t.qos_level = $qos",
                    id=topic['id'], name=topic['name'], qos=topic.get('qos_level', 'MEDIUM')
                )
            
            # Import applications
            for app in graph_data.get('applications', []):
                session.run(
                    "MERGE (a:Application {id: $id}) SET a.name = $name, a.criticality = $crit",
                    id=app['id'], name=app['name'], crit=app.get('criticality', 'MEDIUM')
                )
            
            # Import relationships
            for rel in graph_data.get('relationships', {}).get('runs_on', []):
                session.run(
                    "MATCH (a:Application {id: $from}), (n:Node {id: $to}) MERGE (a)-[:RUNS_ON]->(n)",
                    **rel
                )
            
            for rel in graph_data.get('relationships', {}).get('publishes_to', []):
                session.run(
                    "MATCH (a:Application {id: $from}), (t:Topic {id: $to}) MERGE (a)-[:PUBLISHES]->(t)",
                    **rel
                )
            
            for rel in graph_data.get('relationships', {}).get('subscribes_to', []):
                session.run(
                    "MATCH (a:Application {id: $from}), (t:Topic {id: $to}) MERGE (a)-[:SUBSCRIBES]->(t)",
                    **rel
                )
            
            for rel in graph_data.get('relationships', {}).get('routes', []):
                session.run(
                    "MATCH (t:Topic {id: $from}), (b:Broker {id: $to}) MERGE (t)-[:ROUTED_BY]->(b)",
                    **rel
                )
        
        print("‚úÖ Graph imported successfully")


print("‚úÖ Neo4jImporter class defined!")

In [None]:
# Neo4j import (optional - uncomment and configure if Neo4j is available)

# neo4j_importer = Neo4jImporter(
#     uri='bolt://localhost:7687',
#     user='neo4j',
#     password='password'
# )

# if neo4j_importer.connect():
#     neo4j_importer.clear_database()
#     neo4j_importer.create_schema()
#     neo4j_importer.import_graph(graph_data)
#     neo4j_importer.close()

print("‚ÑπÔ∏è Neo4j import step (configure connection settings to enable)")

---

## Step 3: Graph Analysis

This is the core analytical step where we apply graph theory algorithms to identify critical components.

### Analysis Components

1. **Centrality Metrics**
   - Betweenness Centrality: Identifies nodes that act as bridges
   - Degree Centrality: Measures connectivity
   - PageRank: Importance based on incoming connections

2. **Structural Analysis**
   - Articulation Points: Nodes whose removal disconnects the graph
   - Bridges: Edges whose removal disconnects the graph
   - Cycle Detection: Identifies circular dependencies

3. **Composite Criticality Score**
   $$C_{score}(v) = \alpha \cdot C_B^{norm}(v) + \beta \cdot AP(v) + \gamma \cdot I(v)$$

4. **Anti-Pattern Detection**
   - SPOF (Single Points of Failure)
   - God Topics (excessive connectivity)
   - Circular Dependencies

In [None]:
class GraphAnalyzer:
    """
    Comprehensive graph analysis for pub-sub systems.
    
    Implements the composite criticality scoring formula:
    C_score(v) = Œ±¬∑C_B^norm(v) + Œ≤¬∑AP(v) + Œ≥¬∑I(v)
    """
    
    def __init__(self, alpha: float = DEFAULT_ALPHA, 
                 beta: float = DEFAULT_BETA, 
                 gamma: float = DEFAULT_GAMMA):
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
    
    def build_networkx_graph(self, graph_data: Dict) -> nx.DiGraph:
        """Convert graph data to NetworkX directed graph"""
        G = nx.DiGraph()
        
        # Add nodes by type
        for node in graph_data.get('nodes', []):
            G.add_node(node['id'], type='Node', name=node.get('name', node['id']),
                      node_type=node.get('type', 'generic'))
        
        for broker in graph_data.get('brokers', []):
            G.add_node(broker['id'], type='Broker', name=broker.get('name', broker['id']))
        
        for topic in graph_data.get('topics', []):
            G.add_node(topic['id'], type='Topic', name=topic.get('name', topic['id']),
                      qos_level=topic.get('qos_level', 'MEDIUM'))
        
        for app in graph_data.get('applications', []):
            G.add_node(app['id'], type='Application', name=app.get('name', app['id']),
                      app_type=app.get('type', 'PROSUMER'),
                      criticality=app.get('criticality', 'MEDIUM'))
        
        # Add edges
        relationships = graph_data.get('relationships', {})
        for rel in relationships.get('runs_on', []):
            G.add_edge(rel['from'], rel['to'], type='RUNS_ON')
        for rel in relationships.get('publishes_to', []):
            G.add_edge(rel['from'], rel['to'], type='PUBLISHES')
        for rel in relationships.get('subscribes_to', []):
            G.add_edge(rel['from'], rel['to'], type='SUBSCRIBES')
        for rel in relationships.get('routes', []):
            G.add_edge(rel['from'], rel['to'], type='ROUTES')
        
        return G
    
    def analyze(self, G: nx.DiGraph) -> Dict[str, Any]:
        """Run comprehensive analysis on the graph"""
        print("üîç Running comprehensive analysis...")
        
        results = {
            'graph_summary': self._get_graph_summary(G),
            'centrality_metrics': self._calculate_centrality(G),
            'structural_analysis': self._analyze_structure(G),
            'layer_analysis': self._analyze_layers(G),
            'anti_patterns': self._detect_anti_patterns(G),
            'criticality_scores': {}
        }
        
        # Calculate composite criticality scores
        results['criticality_scores'] = self._calculate_criticality_scores(
            G, results['centrality_metrics'], results['structural_analysis']
        )
        
        return results
    
    def _get_graph_summary(self, G: nx.DiGraph) -> Dict:
        """Get summary statistics"""
        node_types = defaultdict(int)
        for node, data in G.nodes(data=True):
            node_types[data.get('type', 'Unknown')] += 1
        
        return {
            'total_nodes': G.number_of_nodes(),
            'total_edges': G.number_of_edges(),
            'node_types': dict(node_types),
            'density': nx.density(G),
            'is_connected': nx.is_weakly_connected(G),
            'num_components': nx.number_weakly_connected_components(G)
        }
    
    def _calculate_centrality(self, G: nx.DiGraph) -> Dict:
        """Calculate various centrality metrics"""
        return {
            'betweenness': nx.betweenness_centrality(G),
            'in_degree': dict(G.in_degree()),
            'out_degree': dict(G.out_degree()),
            'pagerank': nx.pagerank(G) if G.number_of_nodes() > 0 else {},
            'closeness': nx.closeness_centrality(G)
        }
    
    def _analyze_structure(self, G: nx.DiGraph) -> Dict:
        """Analyze structural properties"""
        G_undirected = G.to_undirected()
        
        # Find articulation points (SPOFs)
        articulation_points = set()
        if nx.is_connected(G_undirected):
            articulation_points = set(nx.articulation_points(G_undirected))
        
        # Find bridges
        bridges = set()
        if nx.is_connected(G_undirected):
            bridges = set(nx.bridges(G_undirected))
        
        # Detect cycles
        try:
            cycles = list(nx.simple_cycles(G))
            has_cycles = len(cycles) > 0
        except:
            has_cycles = False
            cycles = []
        
        return {
            'articulation_points': articulation_points,
            'num_articulation_points': len(articulation_points),
            'bridges': bridges,
            'num_bridges': len(bridges),
            'has_cycles': has_cycles,
            'num_cycles': min(len(cycles), 100)
        }
    
    def _analyze_layers(self, G: nx.DiGraph) -> Dict:
        """Analyze graph by layer"""
        layers = {'Application': [], 'Topic': [], 'Broker': [], 'Node': []}
        
        for node, data in G.nodes(data=True):
            node_type = data.get('type', 'Unknown')
            if node_type in layers:
                layers[node_type].append(node)
        
        layer_stats = {}
        for layer, nodes in layers.items():
            if nodes:
                subgraph = G.subgraph(nodes)
                layer_stats[layer] = {
                    'count': len(nodes),
                    'edges': subgraph.number_of_edges(),
                    'density': nx.density(subgraph) if len(nodes) > 1 else 0
                }
        
        return layer_stats
    
    def _detect_anti_patterns(self, G: nx.DiGraph) -> Dict:
        """Detect common anti-patterns"""
        anti_patterns = {
            'spof_candidates': [],
            'god_topics': [],
            'isolated_components': [],
            'circular_dependencies': []
        }
        
        # SPOF: High betweenness centrality
        betweenness = nx.betweenness_centrality(G)
        for node, bc in betweenness.items():
            if bc > 0.3:
                anti_patterns['spof_candidates'].append({'node': node, 'betweenness': bc})
        
        # God Topics: Excessive connections
        for node, data in G.nodes(data=True):
            if data.get('type') == 'Topic':
                in_edges = G.in_degree(node)
                if in_edges > 10:
                    anti_patterns['god_topics'].append({'topic': node, 'connections': in_edges})
        
        # Circular dependencies
        try:
            cycles = list(nx.simple_cycles(G))
            anti_patterns['circular_dependencies'] = cycles[:10]
        except:
            pass
        
        return anti_patterns
    
    def _calculate_criticality_scores(self, G: nx.DiGraph, 
                                      centrality: Dict, 
                                      structural: Dict) -> Dict[str, CriticalityScore]:
        """
        Calculate composite criticality scores using:
        C_score(v) = Œ±¬∑C_B^norm(v) + Œ≤¬∑AP(v) + Œ≥¬∑I(v)
        """
        scores = {}
        
        betweenness = centrality['betweenness']
        articulation_points = structural['articulation_points']
        
        # Normalize betweenness
        max_bc = max(betweenness.values()) if betweenness else 1
        min_bc = min(betweenness.values()) if betweenness else 0
        
        for node in G.nodes():
            node_data = G.nodes[node]
            node_type = node_data.get('type', 'Unknown')
            
            # Normalized betweenness centrality
            bc_norm = (betweenness.get(node, 0) - min_bc) / (max_bc - min_bc) if max_bc > min_bc else 0
            
            # Articulation point indicator
            ap = 1.0 if node in articulation_points else 0.0
            
            # Impact score (based on reachability loss)
            impact = self._calculate_impact_score(G, node)
            
            # Composite score: C = Œ±¬∑BC + Œ≤¬∑AP + Œ≥¬∑I
            composite = self.alpha * bc_norm + self.beta * ap + self.gamma * impact
            
            # Determine criticality level
            if composite >= 0.8:
                level = CriticalityLevel.CRITICAL
            elif composite >= 0.6:
                level = CriticalityLevel.HIGH
            elif composite >= 0.4:
                level = CriticalityLevel.MEDIUM
            elif composite >= 0.2:
                level = CriticalityLevel.LOW
            else:
                level = CriticalityLevel.MINIMAL
            
            scores[node] = CriticalityScore(
                component_id=node,
                component_type=node_type,
                betweenness_centrality=bc_norm,
                is_articulation_point=node in articulation_points,
                impact_score=impact,
                composite_score=composite,
                criticality_level=level
            )
        
        return scores
    
    def _calculate_impact_score(self, G: nx.DiGraph, node: str) -> float:
        """Calculate impact score based on reachability loss when node is removed"""
        if G.number_of_nodes() <= 1:
            return 0.0
        
        # Create graph without the node
        G_removed = G.copy()
        G_removed.remove_node(node)
        
        if G_removed.number_of_nodes() == 0:
            return 1.0
        
        # Calculate reachability loss
        original_pairs = G.number_of_nodes() * (G.number_of_nodes() - 1)
        
        if nx.is_weakly_connected(G_removed):
            remaining_pairs = G_removed.number_of_nodes() * (G_removed.number_of_nodes() - 1)
        else:
            remaining_pairs = sum(
                len(comp) * (len(comp) - 1) 
                for comp in nx.weakly_connected_components(G_removed)
            )
        
        reachability_loss = 1 - (remaining_pairs / original_pairs) if original_pairs > 0 else 0
        return min(1.0, reachability_loss)


print("‚úÖ GraphAnalyzer class defined!")

In [None]:
# Run analysis on our generated graph
analyzer = GraphAnalyzer(alpha=0.4, beta=0.3, gamma=0.3)

# Build NetworkX graph
G = analyzer.build_networkx_graph(graph_data)

# Run comprehensive analysis
analysis_results = analyzer.analyze(G)

# Display results
print("\nüìä Analysis Results:")
print(f"\nGraph Summary:")
for key, value in analysis_results['graph_summary'].items():
    print(f"   ‚Ä¢ {key}: {value}")

In [None]:
# Display layer analysis
print("\nüìä Layer Analysis:")
for layer, stats in analysis_results['layer_analysis'].items():
    print(f"   {layer}: {stats['count']} nodes, {stats['edges']} edges, density={stats['density']:.4f}")

In [None]:
# Display structural analysis
structural = analysis_results['structural_analysis']
print("\nüîß Structural Analysis:")
print(f"   ‚Ä¢ Articulation Points (SPOFs): {structural['num_articulation_points']}")
print(f"   ‚Ä¢ Bridges: {structural['num_bridges']}")
print(f"   ‚Ä¢ Has Cycles: {structural['has_cycles']}")
print(f"   ‚Ä¢ Number of Cycles: {structural['num_cycles']}")

In [None]:
# Display top 10 critical components
criticality_scores = analysis_results['criticality_scores']
sorted_scores = sorted(criticality_scores.items(), key=lambda x: x[1].composite_score, reverse=True)[:10]

print("\n‚ö†Ô∏è Top 10 Critical Components:")
print("-" * 70)
print(f"{'Rank':<6}{'Component':<12}{'Type':<15}{'Score':<10}{'Level':<12}{'Is AP'}")
print("-" * 70)

for i, (node_id, score) in enumerate(sorted_scores, 1):
    print(f"{i:<6}{node_id:<12}{score.component_type:<15}{score.composite_score:<10.3f}{score.criticality_level.value:<12}{score.is_articulation_point}")

In [None]:
# Count criticality level distribution
level_counts = defaultdict(int)
for score in criticality_scores.values():
    level_counts[score.criticality_level.value] += 1

print("\nüìà Criticality Distribution:")
for level in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'MINIMAL']:
    count = level_counts.get(level, 0)
    bar = '‚ñà' * count + '‚ñë' * (20 - min(count, 20))
    print(f"   {level:<10}: {bar} {count}")

In [None]:
# Display anti-patterns detected
anti_patterns = analysis_results['anti_patterns']
print("\nüêõ Anti-Patterns Detected:")
print(f"   ‚Ä¢ SPOF Candidates: {len(anti_patterns['spof_candidates'])}")
print(f"   ‚Ä¢ God Topics: {len(anti_patterns['god_topics'])}")
print(f"   ‚Ä¢ Circular Dependencies: {len(anti_patterns['circular_dependencies'])}")

if anti_patterns['spof_candidates']:
    print("\n   SPOF Details:")
    for spof in anti_patterns['spof_candidates'][:5]:
        print(f"      ‚Üí {spof['node']}: betweenness={spof['betweenness']:.3f}")

---

## Step 4: Simulation and Validation

This step validates our analysis predictions through simulation:

1. **Baseline Simulation**: Run normal traffic to establish baseline metrics
2. **Failure Injection**: Simulate component failures
3. **Impact Measurement**: Measure actual impact on system performance
4. **Validation**: Compare predictions to actual outcomes

### Validation Metrics

- **Precision**: True Positives / (True Positives + False Positives)
- **Recall**: True Positives / (True Positives + False Negatives)
- **F1 Score**: 2 √ó (Precision √ó Recall) / (Precision + Recall)
- **Spearman Correlation**: Rank correlation between predicted and actual impact

In [None]:
class SimulationEngine:
    """
    Lightweight event-driven simulation for pub-sub systems.
    
    Features:
    - Baseline traffic simulation
    - Failure injection
    - Cascading failure propagation
    - Performance impact measurement
    """
    
    def __init__(self, G: nx.DiGraph, graph_data: Dict):
        self.G = G
        self.graph_data = graph_data
        
        # Simulation state
        self.messages_sent = 0
        self.messages_delivered = 0
        self.messages_dropped = 0
        self.total_latency_ms = 0.0
        self.active_failures = set()
    
    async def run_baseline_simulation(self, duration: int = 10) -> Dict:
        """Run baseline simulation without failures"""
        print(f"üîÑ Running baseline simulation ({duration}s)...")
        self._reset_stats()
        await self._simulate_traffic(duration)
        return self._get_stats()
    
    async def run_failure_simulation(self, 
                                     duration: int = 60,
                                     failure_time: int = 30,
                                     failure_components: List[str] = None,
                                     enable_cascading: bool = True) -> Dict:
        """Run simulation with failure injection"""
        print(f"üí• Running failure simulation ({duration}s, failure at {failure_time}s)...")
        self._reset_stats()
        
        # Pre-failure phase
        await self._simulate_traffic(failure_time)
        pre_failure_stats = self._get_stats()
        
        # Inject failures
        if failure_components:
            print(f"   Injecting failures: {failure_components}")
            for comp in failure_components:
                self.active_failures.add(comp)
                if enable_cascading:
                    cascaded = self._propagate_failure(comp)
                    self.active_failures.update(cascaded)
                    if cascaded:
                        print(f"   Cascaded to: {cascaded}")
        
        # Post-failure phase
        await self._simulate_traffic(duration - failure_time)
        post_failure_stats = self._get_stats()
        
        # Calculate impact
        impact = self._calculate_failure_impact(pre_failure_stats, post_failure_stats)
        
        return {
            'pre_failure': pre_failure_stats,
            'post_failure': post_failure_stats,
            'failed_components': list(self.active_failures),
            'impact': impact
        }
    
    async def _simulate_traffic(self, duration: int):
        """Simulate message traffic (time-compressed)"""
        topics = self.graph_data.get('topics', [])
        apps = self.graph_data.get('applications', [])
        
        total_rate = sum(t.get('message_rate_hz', 10) for t in topics)
        messages_per_second = min(total_rate, 1000)
        sim_steps = duration * 10
        
        for step in range(sim_steps):
            messages_this_step = int(messages_per_second / 10)
            
            for _ in range(messages_this_step):
                publishers = [a for a in apps if a.get('type') in ['PRODUCER', 'PROSUMER']]
                if not publishers:
                    continue
                
                publisher = random.choice(publishers)
                
                if publisher['id'] in self.active_failures:
                    self.messages_dropped += 1
                    continue
                
                pub_rels = [r for r in self.graph_data['relationships']['publishes_to'] 
                           if r['from'] == publisher['id']]
                if not pub_rels:
                    continue
                
                topic_id = random.choice(pub_rels)['to']
                
                if topic_id in self.active_failures:
                    self.messages_dropped += 1
                    continue
                
                sub_rels = [r for r in self.graph_data['relationships']['subscribes_to']
                           if r['to'] == topic_id]
                
                self.messages_sent += 1
                
                for sub_rel in sub_rels:
                    subscriber_id = sub_rel['from']
                    if subscriber_id in self.active_failures:
                        self.messages_dropped += 1
                    else:
                        self.messages_delivered += 1
                        base_latency = random.uniform(1, 20)
                        if self.active_failures:
                            base_latency *= 1.5
                        self.total_latency_ms += base_latency
            
            if step % 100 == 0:
                await asyncio.sleep(0.001)
    
    def _propagate_failure(self, component: str) -> Set[str]:
        """Propagate failure to dependent components"""
        cascaded = set()
        if component in self.G:
            for successor in self.G.successors(component):
                if random.random() < 0.5:
                    cascaded.add(successor)
        return cascaded
    
    def _reset_stats(self):
        """Reset simulation statistics"""
        self.messages_sent = 0
        self.messages_delivered = 0
        self.messages_dropped = 0
        self.total_latency_ms = 0.0
        self.active_failures.clear()
    
    def _get_stats(self) -> Dict:
        """Get current statistics"""
        delivery_rate = self.messages_delivered / self.messages_sent if self.messages_sent > 0 else 0
        avg_latency = self.total_latency_ms / self.messages_delivered if self.messages_delivered > 0 else 0
        
        return {
            'messages_sent': self.messages_sent,
            'messages_delivered': self.messages_delivered,
            'messages_dropped': self.messages_dropped,
            'delivery_rate': delivery_rate,
            'avg_latency_ms': avg_latency
        }
    
    def _calculate_failure_impact(self, pre: Dict, post: Dict) -> Dict:
        """Calculate impact of failures"""
        latency_increase = post['avg_latency_ms'] - pre['avg_latency_ms']
        delivery_decrease = pre['delivery_rate'] - post['delivery_rate']
        
        return {
            'latency_increase_ms': latency_increase,
            'latency_increase_pct': (latency_increase / pre['avg_latency_ms'] * 100) if pre['avg_latency_ms'] > 0 else 0,
            'delivery_rate_decrease': delivery_decrease,
            'messages_lost': post['messages_dropped'] - pre['messages_dropped'],
            'affected_components': len(self.active_failures)
        }


print("‚úÖ SimulationEngine class defined!")

In [None]:
class ValidationEngine:
    """
    Validates analysis predictions against simulation outcomes.
    
    Computes:
    - Precision, Recall, F1 Score
    - Spearman rank correlation
    """
    
    def validate(self, analysis_results: Dict, simulation_results: Dict) -> ValidationResult:
        """Validate analysis predictions against simulation"""
        print("‚úÖ Validating analysis results...")
        
        criticality_scores = analysis_results.get('criticality_scores', {})
        
        # Predicted critical: HIGH or CRITICAL from analysis
        predicted_critical = {
            node_id for node_id, score in criticality_scores.items()
            if score.criticality_level in [CriticalityLevel.CRITICAL, CriticalityLevel.HIGH]
        }
        
        # Actual critical: Failed components + high impact components
        failed_components = set(simulation_results.get('failed_components', []))
        actual_critical = failed_components.copy()
        
        for node_id, score in criticality_scores.items():
            if score.impact_score > 0.3:
                actual_critical.add(node_id)
        
        # Calculate precision and recall
        if predicted_critical:
            true_positives = len(predicted_critical & actual_critical)
            precision = true_positives / len(predicted_critical)
        else:
            precision = 0.0
        
        if actual_critical:
            true_positives = len(predicted_critical & actual_critical)
            recall = true_positives / len(actual_critical)
        else:
            recall = 1.0
        
        # F1 Score
        f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0
        
        # Spearman correlation
        spearman = self._calculate_spearman(criticality_scores, simulation_results)
        
        # Check targets
        targets_met = {
            'precision': precision >= TARGET_PRECISION,
            'recall': recall >= TARGET_RECALL,
            'f1_score': f1 >= TARGET_F1_SCORE,
            'spearman': spearman >= TARGET_SPEARMAN_CORRELATION
        }
        
        return ValidationResult(
            precision=precision,
            recall=recall,
            f1_score=f1,
            spearman_correlation=spearman,
            targets_met=targets_met
        )
    
    def _calculate_spearman(self, criticality_scores: Dict, simulation_results: Dict) -> float:
        """Calculate Spearman rank correlation"""
        nodes = list(criticality_scores.keys())
        predicted = [criticality_scores[n].composite_score for n in nodes]
        
        failed = set(simulation_results.get('failed_components', []))
        actual = [1.0 if n in failed else criticality_scores[n].impact_score for n in nodes]
        
        if len(predicted) < 3:
            return 0.0
        
        if SCIPY_AVAILABLE:
            try:
                correlation, _ = scipy_stats.spearmanr(predicted, actual)
                return correlation if not math.isnan(correlation) else 0.0
            except:
                pass
        
        # Fallback: simple rank correlation
        return self._simple_spearman(predicted, actual)
    
    def _simple_spearman(self, x: List[float], y: List[float]) -> float:
        """Simple Spearman correlation calculation"""
        n = len(x)
        if n < 2:
            return 0.0
        
        # Compute ranks
        def rank(vals):
            sorted_idx = sorted(range(len(vals)), key=lambda i: vals[i], reverse=True)
            ranks = [0] * len(vals)
            for r, idx in enumerate(sorted_idx):
                ranks[idx] = r + 1
            return ranks
        
        rx, ry = rank(x), rank(y)
        d_squared = sum((rx[i] - ry[i]) ** 2 for i in range(n))
        
        return 1 - (6 * d_squared) / (n * (n**2 - 1))


print("‚úÖ ValidationEngine class defined!")

In [None]:
# Run simulation and validation

# Select failure target (highest criticality component)
sorted_scores = sorted(criticality_scores.items(), key=lambda x: x[1].composite_score, reverse=True)
failure_target = sorted_scores[0][0] if sorted_scores else None

print(f"\nüéØ Selected failure target: {failure_target}")
print(f"   Criticality Score: {sorted_scores[0][1].composite_score:.3f}")
print(f"   Type: {sorted_scores[0][1].component_type}")

In [None]:
# Create simulation engine and run simulations
simulation = SimulationEngine(G, graph_data)

# Run baseline simulation
baseline_results = await simulation.run_baseline_simulation(duration=10)

print(f"\nüìä Baseline Results:")
print(f"   ‚Ä¢ Messages Sent: {baseline_results['messages_sent']:,}")
print(f"   ‚Ä¢ Messages Delivered: {baseline_results['messages_delivered']:,}")
print(f"   ‚Ä¢ Delivery Rate: {baseline_results['delivery_rate']:.1%}")
print(f"   ‚Ä¢ Avg Latency: {baseline_results['avg_latency_ms']:.2f}ms")

In [None]:
# Run failure simulation
failure_results = await simulation.run_failure_simulation(
    duration=60,
    failure_time=30,
    failure_components=[failure_target] if failure_target else [],
    enable_cascading=True
)

print(f"\nüí• Failure Simulation Results:")
print(f"\n   Pre-Failure:")
print(f"      ‚Ä¢ Delivery Rate: {failure_results['pre_failure']['delivery_rate']:.1%}")
print(f"      ‚Ä¢ Avg Latency: {failure_results['pre_failure']['avg_latency_ms']:.2f}ms")

print(f"\n   Post-Failure:")
print(f"      ‚Ä¢ Delivery Rate: {failure_results['post_failure']['delivery_rate']:.1%}")
print(f"      ‚Ä¢ Avg Latency: {failure_results['post_failure']['avg_latency_ms']:.2f}ms")

print(f"\n   Impact:")
impact = failure_results['impact']
print(f"      ‚Ä¢ Latency Increase: +{impact['latency_increase_pct']:.1f}%")
print(f"      ‚Ä¢ Components Affected: {impact['affected_components']}")
print(f"      ‚Ä¢ Failed Components: {failure_results['failed_components']}")

In [None]:
# Validate predictions
validator = ValidationEngine()
validation_result = validator.validate(analysis_results, failure_results)

print("\n‚úÖ Validation Results:")
print("-" * 60)
print(f"{'Metric':<25}{'Value':<15}{'Target':<15}{'Status'}")
print("-" * 60)

metrics = [
    ('Precision', validation_result.precision, TARGET_PRECISION, 'precision'),
    ('Recall', validation_result.recall, TARGET_RECALL, 'recall'),
    ('F1 Score', validation_result.f1_score, TARGET_F1_SCORE, 'f1_score'),
    ('Spearman Correlation', validation_result.spearman_correlation, TARGET_SPEARMAN_CORRELATION, 'spearman')
]

for name, value, target, key in metrics:
    status = "‚úÖ Met" if validation_result.targets_met[key] else "‚ùå Not Met"
    print(f"{name:<25}{value:<15.3f}‚â•{target:<14}{status}")

---

## Step 5: Visualization

The final step generates comprehensive visualizations:

1. **Interactive Graph**: Vis.js-based interactive network visualization
2. **Multi-Layer View**: Separation of Application, Topic, Broker, Infrastructure layers
3. **Dashboard**: Metrics and validation results summary
4. **Report**: Markdown documentation of findings

In [None]:
# For notebook display, let's create a simple matplotlib visualization
try:
    import matplotlib.pyplot as plt
    import matplotlib.patches as mpatches
    
    # Create figure with subplots
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))
    
    # Color nodes by type
    type_colors = {
        'Application': '#3498db',
        'Topic': '#2ecc71',
        'Broker': '#e74c3c',
        'Node': '#9b59b6'
    }
    
    node_colors = [type_colors.get(G.nodes[n].get('type', 'Unknown'), '#95a5a6') for n in G.nodes()]
    
    # Left plot: By component type
    pos = nx.spring_layout(G, k=2, iterations=50, seed=42)
    nx.draw(G, pos, ax=axes[0], node_color=node_colors, node_size=300, 
            with_labels=True, font_size=8, arrows=True, alpha=0.8)
    axes[0].set_title('Graph by Component Type', fontsize=14, fontweight='bold')
    
    # Legend for left plot
    patches = [mpatches.Patch(color=color, label=label) for label, color in type_colors.items()]
    axes[0].legend(handles=patches, loc='upper left')
    
    # Right plot: By criticality
    crit_colors = {
        'CRITICAL': '#e74c3c',
        'HIGH': '#e67e22',
        'MEDIUM': '#f1c40f',
        'LOW': '#27ae60',
        'MINIMAL': '#95a5a6'
    }
    
    node_crit_colors = []
    for n in G.nodes():
        score = criticality_scores.get(n)
        if score:
            node_crit_colors.append(crit_colors.get(score.criticality_level.value, '#95a5a6'))
        else:
            node_crit_colors.append('#95a5a6')
    
    nx.draw(G, pos, ax=axes[1], node_color=node_crit_colors, node_size=300,
            with_labels=True, font_size=8, arrows=True, alpha=0.8)
    axes[1].set_title('Graph by Criticality Level', fontsize=14, fontweight='bold')
    
    # Legend for right plot
    patches = [mpatches.Patch(color=color, label=label) for label, color in crit_colors.items()]
    axes[1].legend(handles=patches, loc='upper left')
    
    plt.tight_layout()
    plt.show()
    
    print("\n‚úÖ Visualization generated!")
    
except ImportError:
    print("‚ö†Ô∏è matplotlib not available - skipping visualization")

In [None]:
# Create criticality distribution bar chart
try:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Criticality distribution
    levels = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'MINIMAL']
    counts = [level_counts.get(l, 0) for l in levels]
    colors = ['#e74c3c', '#e67e22', '#f1c40f', '#27ae60', '#95a5a6']
    
    axes[0].bar(levels, counts, color=colors)
    axes[0].set_xlabel('Criticality Level')
    axes[0].set_ylabel('Number of Components')
    axes[0].set_title('Criticality Distribution', fontweight='bold')
    
    # Validation metrics
    metrics_names = ['Precision', 'Recall', 'F1 Score', 'Spearman']
    metrics_values = [
        validation_result.precision,
        validation_result.recall,
        validation_result.f1_score,
        validation_result.spearman_correlation
    ]
    targets = [TARGET_PRECISION, TARGET_RECALL, TARGET_F1_SCORE, TARGET_SPEARMAN_CORRELATION]
    
    x = range(len(metrics_names))
    width = 0.35
    
    bars1 = axes[1].bar([i - width/2 for i in x], metrics_values, width, label='Actual', color='#3498db')
    bars2 = axes[1].bar([i + width/2 for i in x], targets, width, label='Target', color='#e74c3c', alpha=0.5)
    
    axes[1].set_xlabel('Metric')
    axes[1].set_ylabel('Value')
    axes[1].set_title('Validation Metrics vs Targets', fontweight='bold')
    axes[1].set_xticks(x)
    axes[1].set_xticklabels(metrics_names)
    axes[1].legend()
    axes[1].set_ylim(0, 1.2)
    
    plt.tight_layout()
    plt.show()
    
except ImportError:
    print("‚ö†Ô∏è matplotlib not available")

---

## Complete Pipeline Execution

Here's how to run the entire pipeline with a single function call.

In [None]:
async def run_complete_pipeline(scenario: Scenario = Scenario.IOT_SMART_CITY,
                                 scale: str = 'medium',
                                 seed: int = 42,
                                 antipatterns: List[str] = None) -> Dict:
    """
    Run the complete end-to-end analysis pipeline.
    
    Steps:
    1. Generate graph data
    2. Build NetworkX graph
    3. Run comprehensive analysis
    4. Simulate traffic and failures
    5. Validate predictions
    
    Returns:
        Dictionary with all results
    """
    print("="*70)
    print("  GRAPH-BASED MODELING AND ANALYSIS PIPELINE")
    print("="*70)
    print(f"\nScenario: {scenario.value}")
    print(f"Scale: {scale}")
    print(f"Seed: {seed}\n")
    
    results = {}
    start_time = time.time()
    
    # Step 1: Generate
    print("\n" + "-"*50)
    print("STEP 1: GENERATE GRAPH DATA")
    print("-"*50)
    
    scale_params = PubSubGraphGenerator.SCALES.get(scale, PubSubGraphGenerator.SCALES['medium'])
    config = GraphConfig(
        scale=scale,
        scenario=scenario,
        num_nodes=scale_params['nodes'],
        num_applications=scale_params['apps'],
        num_topics=scale_params['topics'],
        num_brokers=scale_params['brokers'],
        antipatterns=antipatterns or ['spof'],
        seed=seed
    )
    
    generator = PubSubGraphGenerator(config)
    graph_data = generator.generate()
    results['graph_data'] = graph_data
    
    print(f"‚úÖ Generated: {len(graph_data['nodes'])} nodes, {len(graph_data['applications'])} apps, "
          f"{len(graph_data['topics'])} topics, {len(graph_data['brokers'])} brokers")
    
    # Step 2 & 3: Analyze
    print("\n" + "-"*50)
    print("STEP 2-3: BUILD GRAPH AND ANALYZE")
    print("-"*50)
    
    analyzer = GraphAnalyzer(alpha=0.4, beta=0.3, gamma=0.3)
    G = analyzer.build_networkx_graph(graph_data)
    analysis_results = analyzer.analyze(G)
    results['analysis'] = analysis_results
    results['graph'] = G
    
    print(f"‚úÖ Graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
    print(f"‚úÖ Articulation Points: {analysis_results['structural_analysis']['num_articulation_points']}")
    
    # Count criticality
    levels = defaultdict(int)
    for score in analysis_results['criticality_scores'].values():
        levels[score.criticality_level.value] += 1
    print(f"‚úÖ Criticality: CRITICAL={levels['CRITICAL']}, HIGH={levels['HIGH']}, "
          f"MEDIUM={levels['MEDIUM']}, LOW={levels['LOW']}")
    
    # Step 4: Simulate and Validate
    print("\n" + "-"*50)
    print("STEP 4: SIMULATE AND VALIDATE")
    print("-"*50)
    
    # Select failure target
    sorted_scores = sorted(
        analysis_results['criticality_scores'].items(),
        key=lambda x: x[1].composite_score,
        reverse=True
    )
    failure_target = sorted_scores[0][0] if sorted_scores else None
    
    simulation = SimulationEngine(G, graph_data)
    baseline = await simulation.run_baseline_simulation(10)
    failure_results = await simulation.run_failure_simulation(
        duration=60,
        failure_time=30,
        failure_components=[failure_target] if failure_target else []
    )
    results['simulation'] = failure_results
    
    print(f"‚úÖ Baseline: {baseline['delivery_rate']:.1%} delivery, {baseline['avg_latency_ms']:.2f}ms latency")
    print(f"‚úÖ Post-failure: {failure_results['post_failure']['delivery_rate']:.1%} delivery")
    
    validator = ValidationEngine()
    validation = validator.validate(analysis_results, failure_results)
    results['validation'] = validation
    
    # Summary
    print("\n" + "-"*50)
    print("STEP 5: RESULTS SUMMARY")
    print("-"*50)
    
    elapsed = time.time() - start_time
    targets_met = sum(validation.targets_met.values())
    
    print(f"\n‚úÖ Pipeline completed in {elapsed:.2f}s")
    print(f"\nValidation Results:")
    print(f"   Precision:  {validation.precision:.3f} {'‚úÖ' if validation.targets_met['precision'] else '‚ùå'}")
    print(f"   Recall:     {validation.recall:.3f} {'‚úÖ' if validation.targets_met['recall'] else '‚ùå'}")
    print(f"   F1 Score:   {validation.f1_score:.3f} {'‚úÖ' if validation.targets_met['f1_score'] else '‚ùå'}")
    print(f"   Spearman:   {validation.spearman_correlation:.3f} {'‚úÖ' if validation.targets_met['spearman'] else '‚ùå'}")
    print(f"\nTargets Met: {targets_met}/4")
    
    return results


print("‚úÖ Pipeline function defined!")

In [None]:
# Run the complete pipeline for Financial Trading scenario
results = await run_complete_pipeline(
    scenario=Scenario.FINANCIAL_TRADING,
    scale='medium',
    seed=42,
    antipatterns=['spof']
)

---

## Results Interpretation

### Key Findings

The graph-based analysis methodology provides:

1. **Proactive Identification**: Critical components are identified before failures occur through topological analysis.

2. **Multi-Metric Approach**: The composite criticality score combines:
   - Betweenness centrality (information flow importance)
   - Articulation point status (structural importance)
   - Impact score (failure consequence)

3. **Validation Framework**: Predictions are validated against simulation outcomes using standard metrics (Precision, Recall, F1, Spearman).

4. **Anti-Pattern Detection**: Common architectural issues are automatically detected.

### Recommendations

Based on the analysis:

1. **Replicate High-Criticality Components**: Components with CRITICAL or HIGH scores should have redundancy.

2. **Address SPOFs**: Articulation points represent single points of failure.

3. **Review God Topics**: Topics with excessive connections may indicate design issues.

4. **Break Circular Dependencies**: Cycles can lead to cascading failures.

In [None]:
# Display top recommendations based on analysis
print("üìã RECOMMENDATIONS")
print("="*70)

# Get analysis results
analysis = results['analysis']
criticality = analysis['criticality_scores']
anti_patterns = analysis['anti_patterns']

recommendations = []

# Check for critical components
critical_components = [n for n, s in criticality.items() if s.criticality_level == CriticalityLevel.CRITICAL]
if critical_components:
    recommendations.append(f"üî¥ {len(critical_components)} CRITICAL components require immediate attention: {critical_components[:5]}")

# Check for SPOFs
if anti_patterns['spof_candidates']:
    recommendations.append(f"‚ö†Ô∏è {len(anti_patterns['spof_candidates'])} potential SPOFs detected - consider adding redundancy")

# Check for god topics
if anti_patterns['god_topics']:
    recommendations.append(f"üì¢ {len(anti_patterns['god_topics'])} 'God Topics' detected - consider splitting high-traffic topics")

# Check validation results
validation = results['validation']
if not validation.targets_met['spearman']:
    recommendations.append("üìä Spearman correlation below target - consider tuning Œ±, Œ≤, Œ≥ weights")

if not recommendations:
    recommendations.append("‚úÖ No major issues detected - system architecture appears robust")

for i, rec in enumerate(recommendations, 1):
    print(f"\n{i}. {rec}")

---

## Conclusion

This notebook demonstrates the complete methodology for **Graph-Based Modeling and Analysis of Distributed Publish-Subscribe Systems**. The approach enables:

- **Predictive Analysis** of critical components before failures
- **Quantification** of typically qualitative architectural attributes
- **Validation** through simulation-based testing
- **Visualization** for stakeholder communication

The methodology is applicable across multiple domains:
- IoT/Smart City deployments
- Financial trading platforms
- Healthcare monitoring systems
- E-commerce infrastructure

### Next Steps

1. **Tune Parameters**: Adjust Œ±, Œ≤, Œ≥ weights for your specific domain
2. **Extend Analysis**: Add domain-specific metrics
3. **Integrate with Neo4j**: Enable persistent graph storage and advanced queries
4. **Deploy Monitoring**: Integrate with production monitoring systems

---

*Generated by Software-as-a-Graph Research Framework*