In [1]:
# Create project directory
!mkdir quantum_security_simulation
!cd quantum_security_simulation

# Create a virtual environment (optional but recommended)
!python -m venv venv
!source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install required packages
!pip install matplotlib numpy networkx


Error: Command '['/content/venv/bin/python3', '-m', 'ensurepip', '--upgrade', '--default-pip']' returned non-zero exit status 1.
/bin/bash: line 1: venv/bin/activate: No such file or directory


In [2]:
!touch security_simulation.py
!touch run_simulation.py

In [3]:
# Part 1: Import necessary libraries
import time
import random
import hashlib
import threading
import json
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
import networkx as nx
from queue import Queue
import uuid
from dataclasses import dataclass, field
import logging

# Configure logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("QSecSim")

# Part 2: Define key data structures

@dataclass
class SecurityEvent:
    """Represents a security event in the system"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: float = field(default_factory=time.time)
    source: str = ""
    event_type: str = ""
    severity: int = 1  # 1-5 scale
    data: dict = field(default_factory=dict)

    def to_dict(self):
        return {
            "id": self.id,
            "timestamp": self.timestamp,
            "source": self.source,
            "event_type": self.event_type,
            "severity": self.severity,
            "data": self.data
        }


In [4]:
# Part 3: Implement Node base class
class Node:
    """Base class for all network nodes"""
    def __init__(self, node_id, node_type="generic"):
        self.node_id = node_id
        self.node_type = node_type
        self.connections = []
        self.events = []
        self.compromised = False
        self.security_level = 5  # 1-10 scale
        self.processing_power = 10  # 1-100 scale
        self.data_storage = {}

    def connect(self, other_node):
        """Establish connection with another node"""
        if other_node not in self.connections:
            self.connections.append(other_node)
            if self not in other_node.connections:
                other_node.connect(self)

    def send_message(self, target_node, message, encrypted=False):
        """Send a message to another node"""
        if target_node in self.connections:
            # Simulate network latency
            latency = random.uniform(0.01, 0.1)
            time.sleep(latency)

            if encrypted:
                # Simulate encryption
                message = f"ENC:{hashlib.sha256(message.encode()).hexdigest()}"

            target_node.receive_message(self, message)
            return True
        return False

    def receive_message(self, source_node, message):
        """Receive a message from another node"""
        self.events.append(SecurityEvent(
            source=source_node.node_id,
            event_type="message_received",
            data={"message": message[:20] + "..." if len(message) > 20 else message}
        ))

    def detect_attack(self, attack_event):
        """Attempt to detect an attack based on security level"""
        detection_probability = self.security_level / 10.0
        return random.random() < detection_probability

    def process_security_event(self, event):
        """Process a security event"""
        self.events.append(event)


# Step 5: Implement Security Server Classes

In [5]:
# Step 5: Implement Security Server Classes
# Part 4: Implement specialized security server classes
class SecurityServer(Node):
    """Specialized node for security monitoring and response"""
    def __init__(self, node_id):
        super().__init__(node_id, node_type="security_server")
        self.security_level = 9
        self.processing_power = 80
        self.monitored_nodes = []
        self.threat_database = {}
        self.security_policies = {}

    def monitor_node(self, node):
        """Add a node to be monitored"""
        if node not in self.monitored_nodes:
            self.monitored_nodes.append(node)

    def analyze_events(self):
        """Analyze security events from all monitored nodes"""
        all_events = []
        for node in self.monitored_nodes:
            all_events.extend(node.events)

        # Sort events by timestamp
        all_events.sort(key=lambda e: e.timestamp)

        # Basic analysis (could be more sophisticated)
        alerts = []
        for event in all_events:
            if event.severity >= 4:
                alerts.append(event)

        return alerts

    def respond_to_threat(self, event):
        """Respond to a detected threat"""
        logger.info(f"Security server {self.node_id} responding to threat: {event.event_type}")

        # Implement response logic based on event type
        if event.event_type == "intrusion_attempt":
            # Increase security on affected node
            target_node = next((n for n in self.monitored_nodes
                               if n.node_id == event.data.get("target_node")), None)
            if target_node:
                target_node.security_level = min(10, target_node.security_level + 1)

        elif event.event_type == "data_breach":
            # Isolate affected node
            target_node = next((n for n in self.monitored_nodes
                               if n.node_id == event.data.get("target_node")), None)
            if target_node:
                # Simulate isolation by temporarily removing connections
                target_node.connections = []

class QuantumSecurityServer(SecurityServer):
    """Security server with quantum capabilities"""
    def __init__(self, node_id):
        super().__init__(node_id)
        self.node_type = "quantum_security_server"
        self.security_level = 10
        self.quantum_keys = {}
        self.qkd_enabled = True

    def generate_quantum_key(self, target_node):
        """Simulate quantum key distribution"""
        # In a real system, this would use quantum principles
        # Here we're just simulating the concept

        # Generate random bits
        key_size = 256
        sender_bits = [random.randint(0, 1) for _ in range(key_size)]
        sender_bases = [random.randint(0, 1) for _ in range(key_size)]

        # Receiver randomly chooses measurement bases
        receiver_bases = [random.randint(0, 1) for _ in range(key_size)]

        # Determine which bits are measured in the correct basis
        matching_bases_indices = [i for i in range(key_size)
                                 if sender_bases[i] == receiver_bases[i]]

        # Keep only the bits where bases match
        final_key = [sender_bits[i] for i in matching_bases_indices]

        # Convert binary list to a hex string key
        key_binary = ''.join(str(bit) for bit in final_key[:128])  # Use first 128 bits
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        # Store the key
        self.quantum_keys[target_node.node_id] = key_hex

        logger.info(f"Generated quantum key between {self.node_id} and {target_node.node_id}")
        return key_hex

    def send_quantum_encrypted_message(self, target_node, message):
        """Send a message encrypted with a quantum key"""
        # Ensure we have a quantum key for this node
        if target_node.node_id not in self.quantum_keys:
            self.generate_quantum_key(target_node)

        key = self.quantum_keys[target_node.node_id]

        # Simple XOR encryption simulation (not actual quantum encryption)
        message_bytes = message.encode()
        key_bytes = bytes.fromhex(key.zfill(len(message)*2))[:len(message_bytes)]
        encrypted_message = bytes(m ^ k for m, k in zip(message_bytes, key_bytes))

        # Convert to hex for transmission
        encrypted_hex = encrypted_message.hex()

        # Send the encrypted message
        self.send_message(target_node, f"QKD:{encrypted_hex}", encrypted=True)


# Step 6: Implement Network Model Base Class

In [6]:
# Part 5: Network model base class
class NetworkModel:
    """Base class for network architecture models"""
    def __init__(self, name):
        self.name = name
        self.nodes = []
        self.security_servers = []
        self.performance_metrics = defaultdict(list)
        self.attack_resilience = 0
        self.communication_efficiency = 0

    def add_node(self, node):
        """Add a node to the network"""
        self.nodes.append(node)
        if isinstance(node, SecurityServer):
            self.security_servers.append(node)

    def visualize_network(self):
        """Create a visual representation of the network"""
        G = nx.Graph()

        # Add nodes
        for node in self.nodes:
            G.add_node(node.node_id,
                      node_type=node.node_type,
                      compromised=node.compromised)

        # Add edges
        for node in self.nodes:
            for conn in node.connections:
                G.add_edge(node.node_id, conn.node_id)

        # Create plot
        plt.figure(figsize=(12, 8))
        pos = nx.spring_layout(G)

        # Color nodes by type
        node_colors = []
        for node in G.nodes:
            node_type = G.nodes[node]['node_type']
            if 'security_server' in node_type:
                node_colors.append('red')
            elif G.nodes[node]['compromised']:
                node_colors.append('black')
            else:
                node_colors.append('blue')

        nx.draw(G, pos, with_labels=True, node_color=node_colors,
               node_size=700, font_size=8, font_color='white')

        plt.title(f"Network Architecture: {self.name}")
        plt.savefig(f"{self.name.replace(' ', '_')}_network.png")
        plt.close()


# Step 7: Implement Attack Simulation Methods

In [7]:
# Part 6: Add attack simulation methods to NetworkModel
def simulate_attack(self, attack_type, target_node=None):
    """Simulate an attack on the network"""
    if not target_node:
        target_node = random.choice(self.nodes)

    logger.info(f"Simulating {attack_type} attack on {target_node.node_id}")

    # Create attack event
    attack_event = SecurityEvent(
        source="external_attacker",
        event_type=attack_type,
        severity=5,
        data={"target_node": target_node.node_id}
    )

    # Process the attack
    if attack_type == "ddos":
        self._simulate_ddos_attack(target_node, attack_event)
    elif attack_type == "data_breach":
        self._simulate_data_breach(target_node, attack_event)
    elif attack_type == "quantum_computing_attack":
        self._simulate_quantum_computing_attack(target_node, attack_event)

    # Return whether the attack was successful
    return target_node.compromised

def _simulate_ddos_attack(self, target_node, attack_event):
    """Simulate a DDoS attack"""
    # DDoS reduces processing power temporarily
    original_power = target_node.processing_power
    target_node.processing_power = max(1, original_power - 70)

    # Check if any security server detected the attack
    detected = False
    for server in self.security_servers:
        if target_node in server.monitored_nodes:
            if server.detect_attack(attack_event):
                detected = True
                server.respond_to_threat(attack_event)
                break

    # If not detected, node is effectively compromised temporarily
    if not detected:
        target_node.compromised = True
        logger.info(f"DDoS attack successful on {target_node.node_id}")
    else:
        logger.info(f"DDoS attack detected and mitigated on {target_node.node_id}")

    # Restore processing power after attack (simulating recovery)
    time.sleep(0.5)  # Simulate time passing
    target_node.processing_power = original_power
    target_node.compromised = False

def _simulate_data_breach(self, target_node, attack_event):
    """Simulate a data breach attempt"""
    # Success probability based on node security level
    breach_probability = 1 - (target_node.security_level / 10.0)
    breach_successful = random.random() < breach_probability

    if breach_successful:
        # Node is compromised
        target_node.compromised = True
        logger.info(f"Data breach successful on {target_node.node_id}")

        # Exfiltrate data
        stolen_data = {"node_id": target_node.node_id,
                      "data_sample": target_node.data_storage}

        # Check if any security server detected the breach after it happened
        for server in self.security_servers:
            if target_node in server.monitored_nodes:
                detection_probability = server.security_level / 10.0
                if random.random() < detection_probability:
                    server.respond_to_threat(attack_event)
                    logger.info(f"Data breach detected (after the fact) on {target_node.node_id}")
                    break
    else:
        # Attack prevented
        for server in self.security_servers:
            if target_node in server.monitored_nodes:
                server.process_security_event(attack_event)
                logger.info(f"Data breach prevented on {target_node.node_id}")
                break

def _simulate_quantum_computing_attack(self, target_node, attack_event):
    """Simulate a quantum computing attack on encryption"""
    # This type of attack targets encrypted communications

    # For non-quantum protected nodes, this attack is very effective
    has_quantum_protection = False

    # Check if this node is protected by quantum encryption
    for server in self.security_servers:
        if isinstance(server, QuantumSecurityServer) and server.qkd_enabled:
            if target_node in server.monitored_nodes:
                has_quantum_protection = True
                break

    if has_quantum_protection:
        # Quantum encryption is resistant to quantum computing attacks
        logger.info(f"Quantum computing attack failed against quantum-protected node {target_node.node_id}")
        target_node.compromised = False
    else:
        # Traditional encryption is vulnerable
        logger.info(f"Quantum computing attack successful against non-quantum node {target_node.node_id}")
        target_node.compromised = True

        # Simulate data decryption
        stolen_data = {"decrypted_communications": "Sensitive data exposed"}

        # Check if any security server detected the attack
        for server in self.security_servers:
            if target_node in server.monitored_nodes:
                server.process_security_event(attack_event)


# Step 8: Implement Performance Metrics Collection

In [8]:
# Part 7: Add performance metrics collection to NetworkModel
def collect_performance_metrics(self):
    """Collect performance metrics for the network"""
    # Calculate average security level
    avg_security = sum(node.security_level for node in self.nodes) / len(self.nodes)
    self.performance_metrics["average_security_level"].append(avg_security)

    # Calculate network connectivity
    total_connections = sum(len(node.connections) for node in self.nodes)
    avg_connections = total_connections / (2 * len(self.nodes))  # Divide by 2 to avoid counting twice
    self.performance_metrics["average_connections"].append(avg_connections)

    # Calculate compromised node percentage
    compromised_nodes = sum(1 for node in self.nodes if node.compromised)
    compromised_percentage = (compromised_nodes / len(self.nodes)) * 100
    self.performance_metrics["compromised_percentage"].append(compromised_percentage)

    # Store current time
    self.performance_metrics["timestamp"].append(time.time())

def visualize_metrics(self):
    """Create visualizations of performance metrics"""
    plt.figure(figsize=(15, 10))

    # Plot average security level
    plt.subplot(2, 2, 1)
    plt.plot(self.performance_metrics["timestamp"],
            self.performance_metrics["average_security_level"])
    plt.title("Average Security Level")
    plt.xlabel("Time")
    plt.ylabel("Security Level (1-10)")

    # Plot average connections
    plt.subplot(2, 2, 2)
    plt.plot(self.performance_metrics["timestamp"],
            self.performance_metrics["average_connections"])
    plt.title("Average Node Connections")
    plt.xlabel("Time")
    plt.ylabel("Connections per Node")

    # Plot compromised percentage
    plt.subplot(2, 2, 3)
    plt.plot(self.performance_metrics["timestamp"],
            self.performance_metrics["compromised_percentage"])
    plt.title("Compromised Nodes Percentage")
    plt.xlabel("Time")
    plt.ylabel("Percentage")

    # Save the figure
    plt.tight_layout()
    plt.savefig(f"{self.name.replace(' ', '_')}_metrics.png")
    plt.close()


# Step 9: Implement Centralized Security Model

In [9]:
# Part 8: Implement centralized security model
class CentralizedSecurityModel(NetworkModel):
    """Centralized security architecture model"""
    def __init__(self, num_nodes=10):
        super().__init__("Centralized Security Model")

        # Create a single security server
        self.central_server = SecurityServer("CentralServer")
        self.add_node(self.central_server)

        # Create regular nodes
        for i in range(num_nodes):
            node = Node(f"Node_{i}")
            self.add_node(node)

            # Connect to central server
            node.connect(self.central_server)
            self.central_server.monitor_node(node)

            # Add some sample data
            node.data_storage = {
                "sensitive_data": f"Confidential information for Node_{i}",
                "access_level": random.randint(1, 5)
            }

        # Add some connections between regular nodes (star topology with some mesh elements)
        for i in range(num_nodes):
            for j in range(i+1, num_nodes):
                if random.random() < 0.3:  # 30% chance of connection
                    self.nodes[i+1].connect(self.nodes[j+1])  # +1 because central server is at index 0

        logger.info(f"Created centralized network with {num_nodes} nodes and 1 security server")


# Step 10: Implement Decentralized Security Model

In [10]:
# Part 9: Implement decentralized security model
class DecentralizedSecurityModel(NetworkModel):
    """Decentralized security architecture model"""
    def __init__(self, num_nodes=10, num_security_servers=3):
        super().__init__("Decentralized Security Model")

        # Create multiple security servers
        for i in range(num_security_servers):
            if i == 0:
                # Make one a quantum security server
                server = QuantumSecurityServer(f"QuantumSecServer_{i}")
            else:
                server = SecurityServer(f"SecServer_{i}")
            self.add_node(server)

        # Create regular nodes
        for i in range(num_nodes):
            node = Node(f"Node_{i}")
            self.add_node(node)

            # Assign each node to be monitored by one or more security servers
            num_monitoring_servers = random.randint(1, min(2, num_security_servers))
            monitoring_servers = random.sample(self.security_servers, num_monitoring_servers)

            for server in monitoring_servers:
                node.connect(server)
                server.monitor_node(node)

            # Add some sample data
            node.data_storage = {
                "sensitive_data": f"Confidential information for Node_{i}",
                "access_level": random.randint(1, 5)
            }

        # Create a mesh network with connections between nodes
        for i in range(len(self.nodes)):
            for j in range(i+1, len(self.nodes)):
                if random.random() < 0.4:  # 40% chance of connection
                    self.nodes[i].connect(self.nodes[j])

        logger.info(f"Created decentralized network with {num_nodes} nodes and {num_security_servers} security servers")


# Step 11: Implement Transition Mechanism

In [11]:
# Part 10: Implement security architecture transition
class SecurityArchitectureTransition:
    """Class to handle transition from centralized to decentralized security"""
    def __init__(self, centralized_model, num_security_servers=3):
        self.centralized_model = centralized_model
        self.decentralized_model = None
        self.transition_progress = 0  # 0-100%
        self.transition_complete = False
        self.num_security_servers = num_security_servers
        self.transition_metrics = defaultdict(list)

    def start_transition(self):
        """Begin the transition process"""
        logger.info("Starting transition from centralized to decentralized security")

        # Create a new decentralized model with the same number of nodes
        num_nodes = len(self.centralized_model.nodes) - 1  # Subtract 1 for the central server
        self.decentralized_model = DecentralizedSecurityModel(num_nodes, self.num_security_servers)

        # Initialize transition progress
        self.transition_progress = 0
        self.transition_metrics["timestamp"].append(time.time())
        self.transition_metrics["progress"].append(self.transition_progress)

        # Launch transition in a separate thread
        transition_thread = threading.Thread(target=self._execute_transition)
        transition_thread.start()

    def _execute_transition(self):
        """Execute the transition process step by step"""
        # Steps in transitioning:
        # 1. Deploy new security servers
        # 2. Gradually migrate node monitoring
        # 3. Establish new secure connections
        # 4. Decommission central server

        # Step 1: Deploy new security servers
        logger.info("Step 1: Deploying new security servers")
        time.sleep(1)  # Simulate deployment time
        self.transition_progress = 25
        self.transition_metrics["timestamp"].append(time.time())
        self.transition_metrics["progress"].append(self.transition_progress)

        # Step 2: Gradually migrate node monitoring
        logger.info("Step 2: Migrating node monitoring to decentralized servers")
        # This would involve transferring security policies, threat intelligence, etc.
        time.sleep(1.5)  # Simulate migration time
        self.transition_progress = 50
        self.transition_metrics["timestamp"].append(time.time())
        self.transition_metrics["progress"].append(self.transition_progress)

        # Step 3: Establish new secure connections
        logger.info("Step 3: Establishing new secure connections between nodes")
        # This involves updating routing tables, security associations, etc.
        time.sleep(1.2)  # Simulate connection establishment time
        self.transition_progress = 75
        self.transition_metrics["timestamp"].append(time.time())
        self.transition_metrics["progress"].append(self.transition_progress)

        # Step 4: Decommission central server
        logger.info("Step 4: Decommissioning central security server")
        time.sleep(0.8)  # Simulate decommissioning time
        self.transition_progress = 100
        self.transition_metrics["timestamp"].append(time.time())
        self.transition_metrics["progress"].append(self.transition_progress)

        logger.info("Transition complete")
        self.transition_complete = True

    def visualize_transition(self):
        """Create a visualization of the transition progress"""
        plt.figure(figsize=(10, 6))
        plt.plot(self.transition_metrics["timestamp"], self.transition_metrics["progress"],
                marker='o', linestyle='-', linewidth=2)
        plt.title("Security Architecture Transition Progress")
        plt.xlabel("Time")
        plt.ylabel("Completion Percentage")
        plt.yticks(range(0, 101, 10))
        plt.grid(True)
        plt.savefig("transition_progress.png")
        plt.close()


# Step 12: Implement QKD Simulation

In [12]:
# Part 11: Implement quantum key distribution simulation
class QKDSimulation:
    """Specialized simulation for Quantum Key Distribution"""
    def __init__(self, num_nodes=5):
        self.quantum_network = self._create_quantum_network(num_nodes)
        self.classical_network = self._create_classical_network(num_nodes)
        self.metrics = defaultdict(list)

    def _create_quantum_network(self, num_nodes):
        """Create a network with quantum security capabilities"""
        network = NetworkModel("Quantum Security Network")

        # Create a quantum security server
        quantum_server = QuantumSecurityServer("QuantumServer")
        network.add_node(quantum_server)

        # Create regular nodes
        for i in range(num_nodes):
            node = Node(f"QNode_{i}")
            network.add_node(node)

            # Connect to quantum server
            node.connect(quantum_server)
            quantum_server.monitor_node(node)

            # Add some sample data
            node.data_storage = {
                "military_data": f"Classified information for QNode_{i}",
                "clearance_level": random.randint(1, 5)
            }

        # Create connections between nodes
        for i in range(1, len(network.nodes)):
            for j in range(i+1, len(network.nodes)):
                if random.random() < 0.3:
                    network.nodes[i].connect(network.nodes[j])

        return network

    def _create_classical_network(self, num_nodes):
        """Create a network with classical security"""
        network = NetworkModel("Classical Security Network")

        # Create a classical security server
        server = SecurityServer("ClassicalServer")
        network.add_node(server)

        # Create regular nodes
        for i in range(num_nodes):
            node = Node(f"CNode_{i}")
            network.add_node(node)

            # Connect to server
            node.connect(server)
            server.monitor_node(node)

            # Same data as quantum network
            node.data_storage = {
                "military_data": f"Classified information for CNode_{i}",
                "clearance_level": random.randint(1, 5)
            }

        # Same connections as quantum network
        for i in range(1, len(network.nodes)):
            for j in range(i+1, len(network.nodes)):
                if random.random() < 0.3:
                    network.nodes[i].connect(network.nodes[j])

        return network

    def run_comparative_simulation(self, num_attacks=5):
        """Run simulation comparing quantum vs classical networks under attacks"""
        logger.info("Starting comparative simulation: Quantum vs Classical security")

        attack_types = ["ddos", "data_breach", "quantum_computing_attack"]
        quantum_compromised = 0
        classical_compromised = 0

        # Run attacks
        for i in range(num_attacks):
            attack_type = random.choice(attack_types)

            # Attack quantum network
            logger.info(f"Attack {i+1}/{num_attacks}: {attack_type} on quantum network")
            q_target_idx = random.randint(1, len(self.quantum_network.nodes)-1)
            q_target = self.quantum_network.nodes[q_target_idx]
            q_result = self.quantum_network.simulate_attack(attack_type, q_target)

            # Attack classical network (same node position)
            logger.info(f"Attack {i+1}/{num_attacks}: {attack_type} on classical network")
            c_target = self.classical_network.nodes[q_target_idx]
            c_result = self.classical_network.simulate_attack(attack_type, c_target)

            # Track results
            if q_result:
                quantum_compromised += 1
            if c_result:
                classical_compromised += 1

            # Collect metrics
            timestamp = time.time()
            self.metrics["timestamp"].append(timestamp)
            self.metrics["quantum_compromised_percentage"].append((quantum_compromised / (i+1)) * 100)
            self.metrics["classical_compromised_percentage"].append((classical_compromised / (i+1)) * 100)

            # Small delay between attacks
            time.sleep(0.5)

        # Calculate final statistics
        logger.info(f"Simulation complete. Quantum network: {quantum_compromised}/{num_attacks} compromised")
        logger.info(f"Simulation complete. Classical network: {classical_compromised}/{num_attacks} compromised")

        # Visualize networks
        self.quantum_network.visualize_network()
        self.classical_network.visualize_network()

        # Visualize comparison metrics
        self.visualize_comparison()

    def visualize_comparison(self):
        """Create visualizations comparing quantum vs classical security"""
        plt.figure(figsize=(12, 8))

        plt.plot(self.metrics["timestamp"], self.metrics["quantum_compromised_percentage"],
                label="Quantum Network", marker='o', linestyle='-', linewidth=2, color='blue')
        plt.plot(self.metrics["timestamp"], self.metrics["classical_compromised_percentage"],
                label="Classical Network", marker='s', linestyle='--', linewidth=2, color='red')

        plt.title("Security Performance Comparison: Quantum vs Classical")
        plt.xlabel("Time")
        plt.ylabel("Compromise Rate (%)")
        plt.legend()
        plt.grid(True)
        plt.savefig("quantum_vs_classical_comparison.png")
        plt.close()


# Step 13: Implement Main Simulation Framework

In [13]:
# Part 12: Implement main simulation framework
class SecuritySimulationFramework:
    """Main class to manage the entire simulation process"""
    def __init__(self):
        self.centralized_model = None
        self.decentralized_model = None
        self.transition = None
        self.qkd_simulation = None

    def initialize(self):
        """Initialize the simulation framework"""
        logger.info("Initializing security simulation framework")

        # Create centralized security model
        self.centralized_model = CentralizedSecurityModel(num_nodes=8)

        # Initialize transition mechanism
        self.transition = SecurityArchitectureTransition(self.centralized_model)

        # Initialize QKD simulation
        self.qkd_simulation = QKDSimulation(num_nodes=5)

    def run_centralized_simulation(self, num_attacks=5):
        """Run simulation on centralized model"""
        logger.info("Running centralized security model simulation")

        # Collect initial metrics
        self.centralized_model.collect_performance_metrics()

        # Visualize initial network
        self.centralized_model.visualize_network()

        # Run attack simulation
        for i in range(num_attacks):
            attack_type = random.choice(["ddos", "data_breach"])
            success = self.centralized_model.simulate_attack(attack_type)
            logger.info(f"Attack {i+1}/{num_attacks} ({attack_type}): {'Successful' if success else 'Failed'}")

            # Collect metrics after attack
            self.centralized_model.collect_performance_metrics()

            # Small delay between attacks
            time.sleep(0.5)

        # Visualize final network state
        self.centralized_model.visualize_network()

        # Visualize performance metrics
        self.centralized_model.visualize_metrics()

    def run_transition_simulation(self):
        """Run the transition simulation"""
        logger.info("Running transition simulation")

        # Start transition process
        self.transition.start_transition()

        # Monitor transition progress
        while not self.transition.transition_complete:
            logger.info(f"Transition progress: {self.transition.transition_progress}%")
            time.sleep(1)

        # Update decentralized model reference
        self.decentralized_model = self.transition.decentralized_model

        # Visualize transition
        self.transition.visualize_transition()

        # Visualize before/after networks
        self.centralized_model.visualize_network()
        self.decentralized_model.visualize_network()

    def run_decentralized_simulation(self, num_attacks=5):
        """Run simulation on decentralized model"""
        if not self.decentralized_model:
            logger.error("Decentralized model not initialized. Run transition first.")
            return

        logger.info("Running decentralized security model simulation")

        # Collect initial metrics
        self.decentralized_model.collect_performance_metrics()

        # Run attack simulation
        for i in range(num_attacks):
            attack_type = random.choice(["ddos", "data_breach", "quantum_computing_attack"])
            success = self.decentralized_model.simulate_attack(attack_type)
            logger.info(f"Attack {i+1}/{num_attacks} ({attack_type}): {'Successful' if success else 'Failed'}")

            # Collect metrics after attack
            self.decentralized_model.collect_performance_metrics()

            # Small delay between attacks
            time.sleep(0.5)

        # Visualize final network state
        self.decentralized_model.visualize_network()

        # Visualize performance metrics
        self.decentralized_model.visualize_metrics()

    def run_comparative_analysis(self):
        """Run comparative analysis between centralized and decentralized models"""
        if not self.decentralized_model:
            logger.error("Decentralized model not initialized. Run transition first.")
            return

        logger.info("Running comparative analysis")

        # Compare metrics
        centralized_security = np.mean(self.centralized_model.performance_metrics["average_security_level"])
        decentralized_security = np.mean(self.decentralized_model.performance_metrics["average_security_level"])

        centralized_compromised = np.mean(self.centralized_model.performance_metrics["compromised_percentage"])
        decentralized_compromised = np.mean(self.decentralized_model.performance_metrics["compromised_percentage"])

        logger.info(f"Centralized model average security level: {centralized_security:.2f}")
        logger.info(f"Decentralized model average security level: {decentralized_security:.2f}")
        logger.info(f"Centralized model average compromised percentage: {centralized_compromised:.2f}%")
        logger.info(f"Decentralized model average compromised percentage: {decentralized_compromised:.2f}%")

        # Create comparison visualization
        plt.figure(figsize=(12, 8))

        labels = ['Average Security Level', 'Compromise Rate (%)']
        centralized_values = [centralized_security, centralized_compromised]
        decentralized_values = [decentralized_security, decentralized_compromised]

        x = np.arange(len(labels))
        width = 0.35

        fig, ax = plt.subplots(figsize=(10, 6))
        rects1 = ax.bar(x - width/2, centralized_values, width, label='Centralized')
        rects2 = ax.bar(x + width/2, decentralized_values, width, label='Decentralized')

        ax.set_title('Comparison: Centralized vs Decentralized Security')
        ax.set_xticks(x)
        ax.set_xticklabels(labels)
        ax.legend()

        plt.savefig("security_model_comparison.png")
        plt.close()

    def run_qkd_simulation(self):
        """Run the QKD-specific simulation"""
        logger.info("Running QKD simulation")
        self.qkd_simulation.run_comparative_simulation(num_attacks=6)

    def run_complete_simulation(self):
        """Run the complete simulation pipeline"""
        # Initialize
        self.initialize()

        # Run centralized simulation
        self.run_centralized_simulation()

        # Run transition
        self.run_transition_simulation()

        # Run decentralized simulation
        self.run_decentralized_simulation()

        # Run comparative analysis
        self.run_comparative_analysis()

        # Run QKD simulation
        self.run_qkd_simulation()

        logger.info("Complete simulation finished")

        return {
            "centralized_model": self.centralized_model,
            "decentralized_model": self.decentralized_model,
            "qkd_simulation": self.qkd_simulation
        }


# Step 14: Create Main Execution Function

In [14]:
class NetworkModel:
    """Base class for network architecture models"""
    def __init__(self, name):
        self.name = name
        self.nodes = []
        self.security_servers = []
        self.performance_metrics = defaultdict(list)
        self.attack_resilience = 0
        self.communication_efficiency = 0

    def add_node(self, node):
        """Add a node to the network"""
        self.nodes.append(node)
        if isinstance(node, SecurityServer):
            self.security_servers.append(node)

    def visualize_network(self):
       """Create a visual representation of the network"""
        # (Rest of your function code)

    def simulate_attack(self, attack_type, target_node=None):
        """Simulate an attack on the network"""
        # (Rest of your function code)

# Step 15: Fix NetworkModel Class Methods

In [15]:
class NetworkModel:
    """Base class for network architecture models"""
    def __init__(self, name):
        self.name = name
        self.nodes = []
        self.security_servers = []
        self.performance_metrics = defaultdict(list)
        self.attack_resilience = 0
        self.communication_efficiency = 0

    def add_node(self, node):
        """Add a node to the network"""
        self.nodes.append(node)
        if isinstance(node, SecurityServer):
            self.security_servers.append(node)

    def visualize_network(self):
        """Create a visual representation of the network"""
        G = nx.Graph()

        # Add nodes
        for node in self.nodes:
            G.add_node(node.node_id,
                      node_type=node.node_type,
                      compromised=node.compromised)

        # Add edges
        for node in self.nodes:
            for conn in node.connections:
                G.add_edge(node.node_id, conn.node_id)

        # Create plot
        plt.figure(figsize=(12, 8))
        pos = nx.spring_layout(G)

        # Color nodes by type
        node_colors = []
        for node in G.nodes:
            node_type = G.nodes[node]['node_type']
            if 'security_server' in node_type:
                node_colors.append('red')
            elif G.nodes[node]['compromised']:
                node_colors.append('black')
            else:
                node_colors.append('blue')

        nx.draw(G, pos, with_labels=True, node_color=node_colors,
               node_size=700, font_size=8, font_color='white')

        plt.title(f"Network Architecture: {self.name}")
        plt.savefig(f"{self.name.replace(' ', '_')}_network.png")
        plt.close()

    def simulate_attack(self, attack_type, target_node=None):
        """Simulate an attack on the network"""
        if not target_node:
            target_node = random.choice(self.nodes)

        logger.info(f"Simulating {attack_type} attack on {target_node.node_id}")

        # Create attack event
        attack_event = SecurityEvent(
            source="external_attacker",
            event_type=attack_type,
            severity=5,
            data={"target_node": target_node.node_id}
        )

        # Process the attack
        if attack_type == "ddos":
            self._simulate_ddos_attack(target_node, attack_event)
        elif attack_type == "data_breach":
            self._simulate_data_breach(target_node, attack_event)
        elif attack_type == "quantum_computing_attack":
            self._simulate_quantum_computing_attack(target_node, attack_event)

        # Return whether the attack was successful
        return target_node.compromised

    def _simulate_ddos_attack(self, target_node, attack_event):
        """Simulate a DDoS attack"""
        # DDoS reduces processing power temporarily
        original_power = target_node.processing_power
        target_node.processing_power = max(1, original_power - 70)

        # Check if any security server detected the attack
        detected = False
        for server in self.security_servers:
            if target_node in server.monitored_nodes:
                if server.detect_attack(attack_event):
                    detected = True
                    server.respond_to_threat(attack_event)
                    break

        # If not detected, node is effectively compromised temporarily
        if not detected:
            target_node.compromised = True
            logger.info(f"DDoS attack successful on {target_node.node_id}")
        else:
            logger.info(f"DDoS attack detected and mitigated on {target_node.node_id}")

        # Restore processing power after attack (simulating recovery)
        time.sleep(0.5)  # Simulate time passing
        target_node.processing_power = original_power
        target_node.compromised = False

    def _simulate_data_breach(self, target_node, attack_event):
        """Simulate a data breach attempt"""
        # Success probability based on node security level
        breach_probability = 1 - (target_node.security_level / 10.0)
        breach_successful = random.random() < breach_probability

        if breach_successful:
            # Node is compromised
            target_node.compromised = True
            logger.info(f"Data breach successful on {target_node.node_id}")

            # Exfiltrate data
            stolen_data = {"node_id": target_node.node_id,
                          "data_sample": target_node.data_storage}

            # Check if any security server detected the breach after it happened
            for server in self.security_servers:
                if target_node in server.monitored_nodes:
                    detection_probability = server.security_level / 10.0
                    if random.random() < detection_probability:
                        server.respond_to_threat(attack_event)
                        logger.info(f"Data breach detected (after the fact) on {target_node.node_id}")
                        break
        else:
            # Attack prevented
            for server in self.security_servers:
                if target_node in server.monitored_nodes:
                    server.process_security_event(attack_event)
                    logger.info(f"Data breach prevented on {target_node.node_id}")
                    break

    def _simulate_quantum_computing_attack(self, target_node, attack_event):
        """Simulate a quantum computing attack on encryption"""
        # This type of attack targets encrypted communications

        # For non-quantum protected nodes, this attack is very effective
        has_quantum_protection = False

        # Check if this node is protected by quantum encryption
        for server in self.security_servers:
            if isinstance(server, QuantumSecurityServer) and server.qkd_enabled:
                if target_node in server.monitored_nodes:
                    has_quantum_protection = True
                    break

        if has_quantum_protection:
            # Quantum encryption is resistant to quantum computing attacks
            logger.info(f"Quantum computing attack failed against quantum-protected node {target_node.node_id}")
            target_node.compromised = False
        else:
            # Traditional encryption is vulnerable
            logger.info(f"Quantum computing attack successful against non-quantum node {target_node.node_id}")
            target_node.compromised = True

            # Simulate data decryption
            stolen_data = {"decrypted_communications": "Sensitive data exposed"}

            # Check if any security server detected the attack
            for server in self.security_servers:
                if target_node in server.monitored_nodes:
                    server.process_security_event(attack_event)

    def collect_performance_metrics(self):
        """Collect performance metrics for the network"""
        # Calculate average security level
        avg_security = sum(node.security_level for node in self.nodes) / len(self.nodes)
        self.performance_metrics["average_security_level"].append(avg_security)

        # Calculate network connectivity
        total_connections = sum(len(node.connections) for node in self.nodes)
        avg_connections = total_connections / (2 * len(self.nodes))  # Divide by 2 to avoid counting twice
        self.performance_metrics["average_connections"].append(avg_connections)

        # Calculate compromised node percentage
        compromised_nodes = sum(1 for node in self.nodes if node.compromised)
        compromised_percentage = (compromised_nodes / len(self.nodes)) * 100
        self.performance_metrics["compromised_percentage"].append(compromised_percentage)

        # Store current time
        self.performance_metrics["timestamp"].append(time.time())

    def visualize_metrics(self):
        """Create visualizations of performance metrics"""
        plt.figure(figsize=(15, 10))

        # Plot average security level
        plt.subplot(2, 2, 1)
        plt.plot(self.performance_metrics["timestamp"],
                self.performance_metrics["average_security_level"])
        plt.title("Average Security Level")
        plt.xlabel("Time")
        plt.ylabel("Security Level (1-10)")

        # Plot average connections
        plt.subplot(2, 2, 2)
        plt.plot(self.performance_metrics["timestamp"],
                self.performance_metrics["average_connections"])
        plt.title("Average Node Connections")
        plt.xlabel("Time")
        plt.ylabel("Connections per Node")

        # Plot compromised percentage
        plt.subplot(2, 2, 3)
        plt.plot(self.performance_metrics["timestamp"],
                self.performance_metrics["compromised_percentage"])
        plt.title("Compromised Nodes Percentage")
        plt.xlabel("Time")
        plt.ylabel("Percentage")

        # Save the figure
        plt.tight_layout()
        plt.savefig(f"{self.name.replace(' ', '_')}_metrics.png")
        plt.close()


In [16]:
!python run_simulation.py

In [17]:
class IoTDevice(Node):
    def __init__(self, node_id):
        super().__init__(node_id, node_type="iot_device")
        self.security_level = 3  # IoT devices often have lower security
        self.processing_power = 2


In [18]:
def _simulate_man_in_the_middle_attack(self, target_node, attack_event):
    # Implementation of MITM attack simulation
    pass


In [19]:
class MeshSecurityModel(NetworkModel):
    # Implementation of a fully meshed security model
    pass


In [20]:
!pip install qiskit # install the qiskit library

Collecting qiskit
  Downloading qiskit-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting rustworkx>=0.15.0 (from qiskit)
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting dill>=0.3 (from qiskit)
  Downloading dill-0.4.0-py3-none-any.whl.metadata (10 kB)
Collecting stevedore>=3.0.0 (from qiskit)
  Downloading stevedore-5.4.1-py3-none-any.whl.metadata (2.3 kB)
Collecting symengine<0.14,>=0.11 (from qiskit)
  Downloading symengine-0.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.2 kB)
Collecting pbr>=2.0.0 (from stevedore>=3.0.0->qiskit)
  Downloading pbr-6.1.1-py2.py3-none-any.whl.metadata (3.4 kB)
Downloading qiskit-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.4.0-py3-none-any.whl (119 k

In [21]:
! mkdir -p research/military_networks
! touch research/military_networks/topology_characteristics.md


In [22]:
# Add to security_simulation.py
class CommandControlNode(Node):
    """Represents a military command and control node"""
    def __init__(self, node_id):
        super().__init__(node_id, node_type="command_control")
        self.security_level = 9
        self.processing_power = 50
        self.classification_level = "TOP_SECRET"
        self.redundancy_systems = True

class TacticalFieldNode(Node):
    """Represents a tactical field deployment node"""
    def __init__(self, node_id):
        super().__init__(node_id, node_type="tactical_field")
        self.security_level = 7
        self.processing_power = 25
        self.classification_level = "SECRET"
        self.environmental_tolerance = 8  # 1-10 scale for harsh environments
        self.battery_life = 24  # hours


In [23]:
# Add to security_simulation.py
class MilitaryNetworkModel(NetworkModel):
    """Military network model with command hierarchy"""
    def __init__(self, num_command_nodes=2, num_tactical_nodes=8):
        super().__init__("Military Network Model")

        # Create command and control nodes
        for i in range(num_command_nodes):
            node = CommandControlNode(f"Command_{i}")
            self.add_node(node)

        # Create quantum security server
        qkd_server = QuantumSecurityServer("MilQKD_Server")
        self.add_node(qkd_server)

        # Connect command nodes to QKD server
        for i in range(num_command_nodes):
            self.nodes[i].connect(qkd_server)
            qkd_server.monitor_node(self.nodes[i])

        # Create tactical field nodes
        for i in range(num_tactical_nodes):
            node = TacticalFieldNode(f"Tactical_{i}")
            self.add_node(node)

            # Connect to command nodes (hierarchical structure)
            command_node = self.nodes[i % num_command_nodes]
            node.connect(command_node)

            # Connect some tactical nodes to QKD server (priority nodes)
            if i < num_tactical_nodes / 2:
                node.connect(qkd_server)
                qkd_server.monitor_node(node)

        logger.info(f"Created military network with {num_command_nodes} command nodes and {num_tactical_nodes} tactical nodes")


In [24]:
# Add realistic latency calculation to Node class in security_simulation.py
def send_message(self, target_node, message, encrypted=False):
    """Send a message to another node with realistic latency"""
    if target_node in self.connections:
        # Calculate realistic latency based on node types and distance
        base_latency = 0.05  # 50ms base latency

        # Add variable latency based on node types
        if self.node_type == "tactical_field" or target_node.node_type == "tactical_field":
            # Field communications often have higher latency
            base_latency += random.uniform(0.1, 0.3)  # 100-300ms additional

        # Simulate packet loss in tactical environments
        if self.node_type == "tactical_field" and random.random() < 0.05:  # 5% packet loss
            logger.warning(f"Packet loss in transmission from {self.node_id} to {target_node.node_id}")
            return False

        # Simulate transmission
        time.sleep(base_latency)

        if encrypted:
            # More sophisticated encryption simulation
            encryption_type = "AES-256-GCM" if not hasattr(self, "quantum_keys") else "QKD"
            message = f"{encryption_type}:{hashlib.sha256(message.encode()).hexdigest()}"

        target_node.receive_message(self, message)
        return True
    return False



In [25]:
# Create a new file qkd_protocols.py
import random
import numpy as np
import logging

logger = logging.getLogger("QKDProtocols")

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted


In [26]:
# Add to qkd_protocols.py
class BB84Protocol(QKDProtocol):
    """Implementation of BB84 QKD protocol"""
    def __init__(self, error_rate=0.05):
        super().__init__("BB84", error_rate)

    def generate_key(self, key_length):
        """Generate a key using BB84 protocol"""
        # This is a simplified simulation of BB84
        total_bits_needed = key_length * 4  # We need extra bits due to basis mismatch and error checking

        # Step 1: Alice generates random bits and bases
        alice_bits = [random.randint(0, 1) for _ in range(total_bits_needed)]
        alice_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]  # 0=rectilinear, 1=diagonal

        # Step 2: Alice prepares and sends qubits, Bob measures
        bob_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]
        bob_results = []

        for i in range(total_bits_needed):
            # If bases match, Bob gets correct bit (except for channel errors)
            if bob_bases[i] == alice_bases[i]:
                if random.random() > self.error_rate:
                    bob_results.append(alice_bits[i])
                else:
                    bob_results.append(1 - alice_bits[i])  # Flipped bit due to error
            else:
                # If bases don't match, Bob gets random result
                bob_results.append(random.randint(0, 1))

        # Step 3: Basis reconciliation (Alice and Bob announce bases)
        matching_indices = [i for i in range(total_bits_needed) if alice_bases[i] == bob_bases[i]]

        # Step 4: Error estimation and privacy amplification
        # For simplicity, we'll use a subset of the bits for error checking
        sample_size = min(len(matching_indices) // 4, 100)
        test_indices = random.sample(matching_indices, sample_size)
        remaining_indices = [i for i in matching_indices if i not in test_indices]

        # Check error rate in test bits
        errors = sum(alice_bits[i] != bob_results[i] for i in test_indices)
        measured_error_rate = errors / sample_size if sample_size > 0 else 0

        logger.info(f"BB84 test sample size: {sample_size}, measured error rate: {measured_error_rate:.4f}")

        # If error rate is too high, abort
        if measured_error_rate > 0.15:  # Typical threshold
            logger.warning("BB84 aborted: error rate too high, possible eavesdropping")
            return None

        # Step 5: Generate final key from remaining bits
        final_key = [alice_bits[i] for i in remaining_indices[:key_length]]

        # Update statistics
        self.bits_transmitted = total_bits_needed
        self.key_bits_generated = len(final_key)

        # Convert to hex string
        key_binary = ''.join(str(bit) for bit in final_key)
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        return key_hex


In [27]:
# Add to qkd_protocols.py
class BB84Protocol(QKDProtocol):
    """Implementation of BB84 QKD protocol"""
    def __init__(self, error_rate=0.05):
        super().__init__("BB84", error_rate)

    def generate_key(self, key_length):
        """Generate a key using BB84 protocol"""
        # This is a simplified simulation of BB84
        total_bits_needed = key_length * 4  # We need extra bits due to basis mismatch and error checking

        # Step 1: Alice generates random bits and bases
        alice_bits = [random.randint(0, 1) for _ in range(total_bits_needed)]
        alice_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]  # 0=rectilinear, 1=diagonal

        # Step 2: Alice prepares and sends qubits, Bob measures
        bob_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]
        bob_results = []

        for i in range(total_bits_needed):
            # If bases match, Bob gets correct bit (except for channel errors)
            if bob_bases[i] == alice_bases[i]:
                if random.random() > self.error_rate:
                    bob_results.append(alice_bits[i])
                else:
                    bob_results.append(1 - alice_bits[i])  # Flipped bit due to error
            else:
                # If bases don't match, Bob gets random result
                bob_results.append(random.randint(0, 1))

        # Step 3: Basis reconciliation (Alice and Bob announce bases)
        matching_indices = [i for i in range(total_bits_needed) if alice_bases[i] == bob_bases[i]]

        # Step 4: Error estimation and privacy amplification
        # For simplicity, we'll use a subset of the bits for error checking
        sample_size = min(len(matching_indices) // 4, 100)
        test_indices = random.sample(matching_indices, sample_size)
        remaining_indices = [i for i in matching_indices if i not in test_indices]

        # Check error rate in test bits
        errors = sum(alice_bits[i] != bob_results[i] for i in test_indices)
        measured_error_rate = errors / sample_size if sample_size > 0 else 0

        logger.info(f"BB84 test sample size: {sample_size}, measured error rate: {measured_error_rate:.4f}")

        # If error rate is too high, abort
        if measured_error_rate > 0.15:  # Typical threshold
            logger.warning("BB84 aborted: error rate too high, possible eavesdropping")
            return None

        # Step 5: Generate final key from remaining bits
        final_key = [alice_bits[i] for i in remaining_indices[:key_length]]

        # Update statistics
        self.bits_transmitted = total_bits_needed
        self.key_bits_generated = len(final_key)

        # Convert to hex string
        key_binary = ''.join(str(bit) for bit in final_key)
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        return key_hex


In [28]:
# Add to qkd_protocols.py
class E91Protocol(QKDProtocol):
    """Implementation of E91 (Ekert91) QKD protocol using entanglement"""
    def __init__(self, error_rate=0.05):
        super().__init__("E91", error_rate)

    def generate_key(self, key_length):
        """Generate a key using E91 protocol"""
        # This is a simplified simulation of E91
        total_pairs_needed = key_length * 5  # We need extra pairs for Bell test and basis mismatch

        # Step 1: Generate entangled pairs
        # For each pair, we'll simulate Bell states measurements

        # Step 2: Alice and Bob choose random measurement bases
        # Alice chooses from 3 angles (0, pi/4, pi/2)
        # Bob chooses from 3 angles (0, pi/4, -pi/4)
        alice_angles = [random.choice([0, 1, 2]) for _ in range(total_pairs_needed)]
        bob_angles = [random.choice([0, 1, 2]) for _ in range(total_pairs_needed)]

        # Step 3: Measure entangled pairs
        alice_results = []
        bob_results = []

        for i in range(total_pairs_needed):
            # Calculate probability of correlated results based on angles
            if alice_angles[i] == bob_angles[i]:
                # Same angle -> perfect correlation except for errors
                if random.random() > self.error_rate:
                    bit = random.randint(0, 1)
                    alice_results.append(bit)
                    bob_results.append(bit)
                else:
                    # Error case
                    bit = random.randint(0, 1)
                    alice_results.append(bit)
                    bob_results.append(1 - bit)
            else:
                # Different angles -> calculate quantum correlation
                alice_bit = random.randint(0, 1)
                alice_results.append(alice_bit)

                # Simplified quantum correlation
                angle_diff = abs(alice_angles[i] - bob_angles[i])
                if angle_diff == 1:  # 45 degree difference
                    corr_prob = 0.85  # ~cos^2(pi/4)
                else:  # 90 degree difference
                    corr_prob = 0.5   # ~cos^2(pi/2)

                if random.random() < corr_prob:
                    bob_results.append(alice_bit)
                else:
                    bob_results.append(1 - alice_bit)

        # Step 4: Announce measurement bases
        # Keep bits where both used compatible bases for key generation
        key_indices = [i for i in range(total_pairs_needed)
                      if (alice_angles[i] == bob_angles[i])]

        # Step 5: Use some measurements for Bell test (CHSH inequality)
        bell_test_indices = [i for i in range(total_pairs_needed)
                           if i not in key_indices][:min(200, total_pairs_needed//3)]

        # Simplified Bell test (in real E91, this would check for quantum correlations)
        bell_parameter = random.uniform(2.5, 2.8)  # Simulate a quantum value > 2
        logger.info(f"E91 Bell parameter: {bell_parameter} (Classical limit: 2.0)")

        if bell_parameter <= 2.0:
            logger.warning("E91 aborted: Bell test failed, possible eavesdropping")
            return None

        # Step 6: Generate final key from remaining bits
        remaining_key_indices = [i for i in key_indices if i not in bell_test_indices]
        final_key = [alice_results[i] for i in remaining_key_indices[:key_length]]

        # Update statistics
        self.bits_transmitted = total_pairs_needed
        self.key_bits_generated = len(final_key)

        # Convert to hex string
        key_binary = ''.join(str(bit) for bit in final_key)
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        return key_hex


In [29]:
# Create a new file qkd_protocols.py
import random
import numpy as np
import logging

logger = logging.getLogger("QKDProtocols")

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted

class BB84Protocol(QKDProtocol):
    """Implementation of BB84 QKD protocol"""
    def __init__(self, error_rate=0.05):
        super().__init__("BB84", error_rate)

    def generate_key(self, key_length):
        """Generate a key using BB84 protocol"""
        # This is a simplified simulation of BB84
        total_bits_needed = key_length * 4  # We need extra bits due to basis mismatch and error checking

        # Step 1: Alice generates random bits and bases
        alice_bits = [random.randint(0, 1) for _ in range(total_bits_needed)]
        alice_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]  # 0=rectilinear, 1=diagonal

        # Step 2: Alice prepares and sends qubits, Bob measures
        bob_bases = [random.randint(0, 1) for _ in range(total_bits_needed)]
        bob_results = []

        for i in range(total_bits_needed):
            # If bases match, Bob gets correct bit (except for channel errors)
            if bob_bases[i] == alice_bases[i]:
                if random.random() > self.error_rate:
                    bob_results.append(alice_bits[i])
                else:
                    bob_results.append(1 - alice_bits[i])  # Flipped bit due to error
            else:
                # If bases don't match, Bob gets random result
                bob_results.append(random.randint(0, 1))

        # Step 3: Basis reconciliation (Alice and Bob announce bases)
        matching_indices = [i for i in range(total_bits_needed) if alice_bases[i] == bob_bases[i]]

        # Step 4: Error estimation and privacy amplification
        # For simplicity, we'll use a subset of the bits for error checking
        sample_size = min(len(matching_indices) // 4, 100)
        test_indices = random.sample(matching_indices, sample_size)
        remaining_indices = [i for i in matching_indices if i not in test_indices]

        # Check error rate in test bits
        errors = sum(alice_bits[i] != bob_results[i] for i in test_indices)
        measured_error_rate = errors / sample_size if sample_size > 0 else 0

        logger.info(f"BB84 test sample size: {sample_size}, measured error rate: {measured_error_rate:.4f}")

        # If error rate is too high, abort
        if measured_error_rate > 0.15:  # Typical threshold
            logger.warning("BB84 aborted: error rate too high, possible eavesdropping")
            return None

        # Step 5: Generate final key from remaining bits
        final_key = [alice_bits[i] for i in remaining_indices[:key_length]]

        # Update statistics
        self.bits_transmitted = total_bits_needed
        self.key_bits_generated = len(final_key)

        # Convert to hex string
        key_binary = ''.join(str(bit) for bit in final_key)
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        return key_hex

class E91Protocol(QKDProtocol):
    """Implementation of E91 (Ekert91) QKD protocol using entanglement"""
    def __init__(self, error_rate=0.05):
        super().__init__("E91", error_rate)

    def generate_key(self, key_length):
        """Generate a key using E91 protocol"""
        # This is a simplified simulation of E91
        total_pairs_needed = key_length * 5  # We need extra pairs for Bell test and basis mismatch

        # Step 1: Generate entangled pairs
        # For each pair, we'll simulate Bell states measurements

        # Step 2: Alice and Bob choose random measurement bases
        # Alice chooses from 3 angles (0, pi/4, pi/2)
        # Bob chooses from 3 angles (0, pi/4, -pi/4)
        alice_angles = [random.choice([0, 1, 2]) for _ in range(total_pairs_needed)]
        bob_angles = [random.choice([0, 1, 2]) for _ in range(total_pairs_needed)]

        # Step 3: Measure entangled pairs
        alice_results = []
        bob_results = []

        for i in range(total_pairs_needed):
            # Calculate probability of correlated results based on angles
            if alice_angles[i] == bob_angles[i]:
                # Same angle -> perfect correlation except for errors
                if random.random() > self.error_rate:
                    bit = random.randint(0, 1)
                    alice_results.append(bit)
                    bob_results.append(bit)
                else:
                    # Error case
                    bit = random.randint(0, 1)
                    alice_results.append(bit)
                    bob_results.append(1 - bit)
            else:
                # Different angles -> calculate quantum correlation
                alice_bit = random.randint(0, 1)
                alice_results.append(alice_bit)

                # Simplified quantum correlation
                angle_diff = abs(alice_angles[i] - bob_angles[i])
                if angle_diff == 1:  # 45 degree difference
                    corr_prob = 0.85  # ~cos^2(pi/4)
                else:  # 90 degree difference
                    corr_prob = 0.5   # ~cos^2(pi/2)

                if random.random() < corr_prob:
                    bob_results.append(alice_bit)
                else:
                    bob_results.append(1 - alice_bit)

        # Step 4: Announce measurement bases
        # Keep bits where both used compatible bases for key generation
        key_indices = [i for i in range(total_pairs_needed)
                      if (alice_angles[i] == bob_angles[i])]

        # Step 5: Use some measurements for Bell test (CHSH inequality)
        bell_test_indices = [i for i in range(total_pairs_needed)
                           if i not in key_indices][:min(200, total_pairs_needed//3)]

        # Simplified Bell test (in real E91, this would check for quantum correlations)
        bell_parameter = random.uniform(2.5, 2.8)  # Simulate a quantum value > 2
        logger.info(f"E91 Bell parameter: {bell_parameter} (Classical limit: 2.0)")

        if bell_parameter <= 2.0:
            logger.warning("E91 aborted: Bell test failed, possible eavesdropping")
            return None

        # Step 6: Generate final key from remaining bits
        remaining_key_indices = [i for i in key_indices if i not in bell_test_indices]
        final_key = [alice_results[i] for i in remaining_key_indices[:key_length]]

        # Update statistics
        self.bits_transmitted = total_pairs_needed
        self.key_bits_generated = len(final_key)

        # Convert to hex string
        key_binary = ''.join(str(bit) for bit in final_key)
        key_hex = hex(int(key_binary, 2))[2:] if key_binary else "0"

        return key_hex

# Step 2.4: Update QuantumSecurityServer to Use Different Protocols

In [34]:
# Add this cell before your QuantumSecurityServer implementation

# --------------------------
# Quantum Protocol Implementations
# --------------------------

import random
import hashlib
import logging

logger = logging.getLogger("QKDProtocols")

class BB84Protocol:
    """Implementation of BB84 QKD protocol"""
    def __init__(self, error_rate=0.05):
        self.name = "BB84"
        self.error_rate = error_rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0

    def generate_key(self, key_length):
        """Generate a key using BB84 protocol"""
        key_size = key_length * 8  # Convert bytes to bits
        sender_bits = [random.randint(0, 1) for _ in range(key_size)]
        sender_bases = [random.randint(0, 1) for _ in range(key_size)]

        receiver_bases = [random.randint(0, 1) for _ in range(key_size)]

        matching_indices = [i for i in range(key_size)
                          if sender_bases[i] == receiver_bases[i]]

        final_key = [sender_bits[i] for i in matching_indices[:key_length]]

        key_hex = hex(int(''.join(map(str, final_key)), 2))[2:]
        return key_hex

class E91Protocol:
    """Implementation of E91 protocol using entanglement"""
    def __init__(self, error_rate=0.05):
        self.name = "E91"
        self.error_rate = error_rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0

    def generate_key(self, key_length):
        """Generate a key using E91 protocol"""
        key_size = key_length * 8
        alice_angles = [random.choice([0, 1, 2]) for _ in range(key_size)]
        bob_angles = [random.choice([0, 1, 2]) for _ in range(key_size)]

        key_indices = [i for i in range(key_size)
                      if alice_angles[i] == bob_angles[i]]

        final_key = [random.randint(0, 1) for _ in key_indices[:key_length]]
        key_hex = hex(int(''.join(map(str, final_key)), 2))[2:]
        return key_hex

# Now update your QuantumSecurityServer class
class QuantumSecurityServer(SecurityServer):
    def __init__(self, node_id, protocol_name="BB84"):
        super().__init__(node_id)
        self.node_type = "quantum_security_server"
        self.security_level = 10
        self.quantum_keys = {}
        self.qkd_enabled = True

        # Initialize QKD protocol
        if protocol_name == "BB84":
            self.protocol = BB84Protocol()
        elif protocol_name == "E91":
            self.protocol = E91Protocol()
        else:
            logger.warning(f"Unknown protocol {protocol_name}, defaulting to BB84")
            self.protocol = BB84Protocol()

        logger.info(f"Quantum security server initialized with {self.protocol.name} protocol")

    def generate_quantum_key(self, target_node, key_size=256):
        """Generate a quantum key using the selected protocol"""
        logger.info(f"Generating {self.protocol.name} key between {self.node_id} and {target_node.node_id}")
        key_hex = self.protocol.generate_key(key_size)

        if key_hex:
            self.quantum_keys[target_node.node_id] = key_hex
            return key_hex
        return None


In [35]:
# Initialize server with BB84 protocol
q_server = QuantumSecurityServer("MilitaryQKD", protocol_name="BB84")

# Generate keys
target_node = Node("FieldUnit_01")
key = q_server.generate_quantum_key(target_node, 32)  # 256-bit key


In [36]:
# Create a new file quantum_channel.py
import numpy as np
import random
import math

class QuantumChannel:
    """Models the physical quantum channel and its imperfections"""
    def __init__(self, channel_type="fiber", distance=10, loss_db_per_km=0.2):
        self.channel_type = channel_type  # "fiber" or "free_space"
        self.distance = distance  # in kilometers
        self.loss_db_per_km = loss_db_per_km  # typical fiber loss

        # Free space parameters (if applicable)
        self.weather_condition = "clear"  # clear, rain, fog, etc.
        self.turbulence_strength = 0.0  # 0.0 to 1.0

        # Calculate total channel loss
        self.calculate_loss()

    def calculate_loss(self):
        """Calculate total channel loss in dB"""
        if self.channel_type == "fiber":
            self.total_loss_db = self.distance * self.loss_db_per_km
        else:  # free_space
            # Base loss calculation for free space (simplified)
            self.total_loss_db = 20 * math.log10(self.distance) + 20  # Basic free space path loss

            # Add weather conditions
            if self.weather_condition == "rain":
                self.total_loss_db += 5
            elif self.weather_condition == "fog":
                self.total_loss_db += 10

            # Add turbulence effects
            self.total_loss_db += 15 * self.turbulence_strength

        # Convert to linear scale
        self.transmission_rate = 10 ** (-self.total_loss_db / 10)

        return self.total_loss_db

    def transmit_photon(self):
        """Simulate transmission of a single photon"""
        # Return True if photon arrived, False if lost
        return random.random() < self.transmission_rate

    def transmit_qubit(self, error_rate=0.01):
        """Simulate transmission of a qubit with possible state flip"""
        # First check if photon is lost
        if not self.transmit_photon():
            return None  # Photon lost

        # If photon arrives, check for state flip
        if random.random() < error_rate:
            return "flipped"  # State flipped

        return "preserved"  # State preserved

    def update_weather(self, condition):
        """Update weather condition for free space channel"""
        self.weather_condition = condition
        self.calculate_loss()

    def update_turbulence(self, strength):
        """Update turbulence strength for free space channel"""
        self.turbulence_strength = max(0.0, min(1.0, strength))
        self.calculate_loss()

    def get_status(self):
        """Get current channel status"""
        return {
            "type": self.channel_type,
            "distance": self.distance,
            "loss_db": self.total_loss_db,
            "transmission_rate": self.transmission_rate,
            "weather": self.weather_condition if self.channel_type == "free_space" else "N/A",
            "turbulence": self.turbulence_strength if self.channel_type == "free_space" else 0.0
        }


In [39]:
# Update the QKDProtocol class in qkd_protocols.py
import sys
import os

# Get the absolute path of the current directory
current_dir = os.path.dirname(os.path.abspath(__file__))

# Add the current directory to the Python path
sys.path.append(current_dir)

from quantum_channel import QuantumChannel  # Now this import should work
class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0
        self.channel = None  # Will be set when generating keys

    def set_quantum_channel(self, channel_type="fiber", distance=10):
        """Set up the quantum channel for this protocol"""
        self.channel = QuantumChannel(channel_type, distance)
        return self.channel

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted


NameError: name '__file__' is not defined

In [42]:
!touch quantum_channel.py # create the quantum_channel.py file in /content/

In [43]:
# Update the QKDProtocol class in qkd_protocols.py
import sys
import os
import importlib.util

!touch quantum_channel.py # create the quantum_channel.py file in /content/

# Try to get __file__ if defined
try:
    current_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # If __file__ is not defined, assume we are in the same directory
    current_dir = os.getcwd()

# Add the current directory to the Python path
sys.path.append(current_dir)

# Import QuantumChannel using importlib
spec = importlib.util.spec_from_file_location("quantum_channel", os.path.join(current_dir, "quantum_channel.py"))
quantum_channel = importlib.util.module_from_spec(spec)
spec.loader.exec_module(quantum_channel)
QuantumChannel = quantum_channel.QuantumChannel  # Now this import should work

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0
        self.channel = None  # Will be set when generating keys

    def set_quantum_channel(self, channel_type="fiber", distance=10):
        """Set up the quantum channel for this protocol"""
        self.channel = QuantumChannel(channel_type, distance)
        return self.channel

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted

AttributeError: module 'quantum_channel' has no attribute 'QuantumChannel'

In [44]:
# Update the QKDProtocol class in qkd_protocols.py
import sys
import os
import importlib.util

!touch quantum_channel.py # create the quantum_channel.py file in /content/

# Try to get __file__ if defined
try:
    current_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # If __file__ is not defined, assume we are in the same directory
    current_dir = os.getcwd()

# Add the current directory to the Python path
sys.path.append(current_dir)

# Import QuantumChannel using importlib
spec = importlib.util.spec_from_file_location("quantum_channel", os.path.join(current_dir, "quantum_channel.py"))
quantum_channel = importlib.util.module_from_spec(spec)
spec.loader.exec_module(quantum_channel)

# Access QuantumChannel class after execution
QuantumChannel = quantum_channel.QuantumChannel  # Now this import should work

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0
        self.channel = None  # Will be set when generating keys

    def set_quantum_channel(self, channel_type="fiber", distance=10):
        """Set up the quantum channel for this protocol"""
        self.channel = QuantumChannel(channel_type, distance)
        return self.channel

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted

AttributeError: module 'quantum_channel' has no attribute 'QuantumChannel'

In [48]:
# Update the QKDProtocol class in qkd_protocols.py
import sys
import os
import importlib.util

!touch quantum_channel.py # create the quantum_channel.py file in /content/

# Try to get __file__ if defined
try:
    current_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # If __file__ is not defined, assume we are in the same directory
    current_dir = os.getcwd()

# Add the current directory to the Python path
sys.path.append(current_dir)

# Import QuantumChannel using importlib
spec = importlib.util.spec_from_file_location("quantum_channel", os.path.join(current_dir, "quantum_channel.py"))
quantum_channel = importlib.util.module_from_spec(spec)
spec.loader.exec_module(quantum_channel)

# Access QuantumChannel class after execution
# The previous line loads the module.  To access the QuantumChannel class
# from the module, use getattr.
QuantumChannel = getattr(quantum_channel, 'QuantumChannel')  # Now this import should work

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0
        self.channel = None  # Will be set when generating keys

    def set_quantum_channel(self, channel_type="fiber", distance=10):
        """Set up the quantum channel for this protocol"""
        self.channel = QuantumChannel(channel_type, distance)
        return self.channel

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted

AttributeError: module 'quantum_channel' has no attribute 'QuantumChannel'

In [50]:
# Update the QKDProtocol class in qkd_protocols.py
import sys
import os
import importlib.util

!touch quantum_channel.py # create the quantum_channel.py file in /content/

# Try to get __file__ if defined
try:
    current_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # If __file__ is not defined, assume we are in the same directory
    current_dir = os.getcwd()

# Add the current directory to the Python path
sys.path.append(current_dir)

# Import QuantumChannel using importlib
spec = importlib.util.spec_from_file_location("quantum_channel", os.path.join(current_dir, "quantum_channel.py"))
quantum_channel = importlib.util.module_from_spec(spec)
spec.loader.exec_module(quantum_channel)

# Access QuantumChannel class after execution
QuantumChannel = getattr(quantum_channel, 'QuantumChannel')  # Now this import should work

class QKDProtocol:
    """Base class for QKD protocols"""
    def __init__(self, name, error_rate=0.05):
        self.name = name
        self.error_rate = error_rate  # Quantum channel error rate
        self.bits_transmitted = 0
        self.key_bits_generated = 0
        self.channel = None  # Will be set when generating keys

    def set_quantum_channel(self, channel_type="fiber", distance=10):
        """Set up the quantum channel for this protocol"""
        self.channel = QuantumChannel(channel_type, distance)
        return self.channel

    def generate_key(self, key_length):
        """Abstract method for key generation"""
        raise NotImplementedError("Each QKD protocol must implement this method")

    def calculate_efficiency(self):
        """Calculate protocol efficiency"""
        if self.bits_transmitted == 0:
            return 0
        return self.key_bits_generated / self.bits_transmitted

AttributeError: module 'quantum_channel' has no attribute 'QuantumChannel'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [46]:
# quantum_channel.py
class QuantumChannel:
    def __init__(self, channel_type, distance):
        self.channel_type = channel_type
        self.distance = distance
        print(f"Created QuantumChannel: type={channel_type}, distance={distance}")