# Distributed State Synchronization

This notebook implements a mechanism for synchronizing state across distributed agents. We'll use a simple key-value store as our shared state and implement a basic conflict resolution strategy.

## References:
1. DeCandia, G., et al. (2007). Dynamo: amazon's highly available key-value store. ACM SIGOPS operating systems review, 41(6), 205-220.
2. Lamport, L. (1978). Time, clocks, and the ordering of events in a distributed system. Communications of the ACM, 21(7), 558-565.
3. Shapiro, M., et al. (2011). Conflict-free replicated data types. In Symposium on Self-Stabilizing Systems (pp. 386-400).

In [None]:
import asyncio
import random
import time
from typing import Dict, List, Tuple

print("Distributed State Synchronization Simulation")

## Shared State

We'll start by defining a `SharedState` class that represents our key-value store. Each value will be associated with a version number to help with conflict resolution.

In [None]:
class SharedState:
    def __init__(self):
        self.data: Dict[str, Tuple[int, any]] = {}  # (version, value)
    
    def update(self, key: str, value: any, version: int) -> bool:
        if key not in self.data or self.data[key][0] < version:
            self.data[key] = (version, value)
            return True
        return False
    
    def get(self, key: str) -> Tuple[int, any]:
        return self.data.get(key, (0, None))
    
    def __str__(self):
        return str({k: v for k, (_, v) in self.data.items()})

## State Agent

Now, let's create a `StateAgent` class that represents an individual agent in our distributed system. Each agent will have its own copy of the shared state and methods for propagating updates.

In [None]:
class StateAgent:
    def __init__(self, id: int):
        self.id = id
        self.state = SharedState()
        self.peers: List[int] = []  # List of peer IDs
    
    async def update_local_state(self, key: str, value: any):
        version, _ = self.state.get(key)
        new_version = version + 1
        if self.state.update(key, value, new_version):
            print(f"Agent {self.id} updated local state: {key} = {value} (v{new_version})")
            await self.propagate_update(key, value, new_version)
    
    async def propagate_update(self, key: str, value: any, version: int):
        for peer_id in self.peers:
            await self.send_update(peer_id, key, value, version)
    
    async def send_update(self, to_id: int, key: str, value: any, version: int):
        # In a real system, this would use actual network communication
        # For our simulation, it will be handled by the NetworkSimulator
        print(f"Agent {self.id} sending update to Agent {to_id}: {key} = {value} (v{version})")
        await asyncio.sleep(0.1)  # Simulate network delay
    
    async def receive_update(self, from_id: int, key: str, value: any, version: int):
        if self.state.update(key, value, version):
            print(f"Agent {self.id} received update from Agent {from_id}: {key} = {value} (v{version})")
            await self.propagate_update(key, value, version)
    
    async def run(self):
        while True:
            key = random.choice(['A', 'B', 'C'])
            value = random.randint(1, 100)
            await self.update_local_state(key, value)
            await asyncio.sleep(random.uniform(1, 3))  # Random delay between updates

## Network Simulator

Let's create a `NetworkSimulator` class to manage our agents and simulate the network communication between them.

In [None]:
class NetworkSimulator:
    def __init__(self):
        self.agents: Dict[int, StateAgent] = {}
    
    def add_agent(self, agent: StateAgent):
        self.agents[agent.id] = agent
    
    def setup_peers(self):
        for agent in self.agents.values():
            agent.peers = [id for id in self.agents.keys() if id != agent.id]
    
    async def simulate_network(self):
        while True:
            sender = random.choice(list(self.agents.values()))
            receiver = self.agents[random.choice(sender.peers)]
            key = random.choice(['A', 'B', 'C'])
            version, value = sender.state.get(key)
            await receiver.receive_update(sender.id, key, value, version)
            await asyncio.sleep(0.5)  # Simulate network interval
    
    async def run_simulation(self, duration: int):
        agent_tasks = [asyncio.create_task(agent.run()) for agent in self.agents.values()]
        network_task = asyncio.create_task(self.simulate_network())
        
        await asyncio.sleep(duration)
        
        for task in agent_tasks:
            task.cancel()
        network_task.cancel()
        
        await asyncio.gather(*agent_tasks, network_task, return_exceptions=True)
        
        print("Simulation completed.")

## Running the Simulation

Now let's set up our network and run a simulation.

In [None]:
async def main():
    simulator = NetworkSimulator()
    
    # Create and add agents
    for i in range(5):
        simulator.add_agent(StateAgent(i))
    
    # Setup peer relationships
    simulator.setup_peers()
    
    # Run simulation for 20 seconds
    await simulator.run_simulation(20)
    
    # Print final state for each agent
    for agent in simulator.agents.values():
        print(f"Agent {agent.id} final state: {agent.state}")

# Run the simulation
asyncio.run(main())

## Analysis and Observations

Let's analyze some aspects of our distributed state synchronization system:

In [None]:
def analyze_synchronization(simulator: NetworkSimulator):
    # Check if all agents have the same final state
    states = [str(agent.state) for agent in simulator.agents.values()]
    if len(set(states)) == 1:
        print("All agents have synchronized to the same state.")
        print(f"Final state: {states[0]}")
    else:
        print("Agents have not fully synchronized. Final states:")
        for agent in simulator.agents.values():
            print(f"Agent {agent.id}: {agent.state}")
    
    # Analyze versions for each key
    for key in ['A', 'B', 'C']:
        versions = [agent.state.get(key)[0] for agent in simulator.agents.values()]
        print(f"Key {key} - Max version: {max(versions)}, Min version: {min(versions)}")

# Run analysis
analyze_synchronization(simulator)

## Conclusion

In this notebook, we implemented a basic distributed state synchronization system. Key points:

1. We used a simple key-value store with version numbers for shared state.
2. Agents propagate their local updates to peers.
3. A basic "last writer wins" conflict resolution strategy is used, based on version numbers.
4. The system demonstrates eventual consistency in a distributed environment.

Areas for potential improvement and expansion:
1. Implement more sophisticated conflict resolution strategies (e.g., vector clocks, CRDTs).
2. Add support for concurrent updates and partial replication.
3. Implement gossip protocols for more efficient state propagation.
4. Introduce network partitions and reconciliation mechanisms.
5. Add security features like access control and encrypted state transfers.

This implementation serves as a foundation for exploring distributed state management and can be integrated with other components, such as the compositional learning system and distributed communication system we developed earlier.