# v1

In [1]:
from typing import TypedDict, List, Optional, Annotated, Set, Dict, Any
import operator
import uuid
import random # For random topic assignment in examples

from langgraph.graph import StateGraph, END

from IPython.display import Image, display


ModuleNotFoundError: No module named 'langgraph'

In [6]:

# --- 0. Configuration for Entities ---
NUM_PUBLISHERS = 3
NUM_SUBSCRIBERS = 4
NUM_TOPICS = 5 # Total number of unique topics in the system

def format_id(prefix: str, num: int, total_items: int) -> str:
    """Helper to format IDs like P_01, T_02 etc."""
    padding = 2 if total_items >= 10 else 1
    return f"{prefix}_{num:0{padding}d}"

# Generate names for topics, publishers, subscribers
TOPIC_NAMES = [format_id("T", i+1, NUM_TOPICS) for i in range(NUM_TOPICS)]
PUBLISHER_IDS = [format_id("P", i+1, NUM_PUBLISHERS) for i in range(NUM_PUBLISHERS)]
SUBSCRIBER_IDS = [format_id("S", i+1, NUM_SUBSCRIBERS) for i in range(NUM_SUBSCRIBERS)]

# Subscriber Interests: Define which topics each subscriber is interested in
# Example: S_01 interested in T_01, T_03; S_02 interested in T_02, T_04, etc.
SUBSCRIBER_INTERESTS: Dict[str, List[str]] = {}
for i, sub_id in enumerate(SUBSCRIBER_IDS):
    # Simple assignment: each subscriber gets 2-3 random topics
    num_interests = random.randint(2, 3)
    SUBSCRIBER_INTERESTS[sub_id] = random.sample(TOPIC_NAMES, k=min(num_interests, NUM_TOPICS))

print("--- System Configuration ---")
print(f"Publisher IDs: {PUBLISHER_IDS}")
print(f"Subscriber IDs: {SUBSCRIBER_IDS}")
print(f"Topic Names: {TOPIC_NAMES}")
print("Subscriber Interests:")
for sub_id, topics in SUBSCRIBER_INTERESTS.items():
    print(f"  {sub_id}: {topics}")
print("--------------------------")


# --- 1. Define the State for Batch Processing (remains largely the same structure) ---
class EventSummary(TypedDict):
    event_id: str # M_uuid
    original_payload_source: str # e.g., P_01 (from initial_batch_of_events.source_type)
    original_payload_details: Dict[str, Any]
    publisher_identity_in_graph: Optional[str] # p_01 (node name)
    published_message_content: Optional[str]
    published_message_topics: List[str] # List of T_xx
    matched_subscribers: List[str] # List of S_xx
    completed_subscribers: Set[str] # Set of S_xx
    subscriber_logs: Dict[str, List[str]] # Key: S_xx

class BatchPubSubState(TypedDict):
    initial_batch_of_events: List[Dict[str, Any]] # Each dict has "source_type": "P_xx", "details": {...}
    current_event_index: int
    
    current_event_id: Optional[str] # M_uuid
    current_event_source_publisher_id: Optional[str] # P_01 (from the event payload)
    current_event_details: Optional[Dict[str, Any]]
    
    # Fields populated during single event processing
    graph_publisher_node_name: Optional[str] # e.g., p_01 (the node that ran)
    published_message_content: Optional[str]
    published_message_topics: List[str]
    
    subscribers_matched_for_event: List[str] 
    subscribers_completed_for_event: Set[str] 
    subscriber_logs: Annotated[Dict[str, List[str]], operator.ior] 

    processed_event_summaries: Annotated[List[EventSummary], operator.add]

# --- Helper to initialize/reset logs for an event ---
def get_initial_subscriber_logs_for_event() -> Dict[str, List[str]]:
    return {sub_id: [] for sub_id in SUBSCRIBER_IDS}

# --- Utility to reset state fields FOR A SINGLE EVENT ---
def reset_fields_for_new_event_in_batch() -> Dict:
    return {
        "graph_publisher_node_name": None,
        "published_message_content": None,
        "published_message_topics": [],
        "subscribers_matched_for_event": [],
        "subscribers_completed_for_event": set(),
        "subscriber_logs": get_initial_subscriber_logs_for_event(),
        "current_event_id": None,
        "current_event_source_publisher_id": None,
        "current_event_details": None,
    }

# --- 2. Batch Control Nodes (structurally same, naming updated in logs) ---
def batch_initializer_node(state: BatchPubSubState) -> Dict:
    print("\n--- Batch Initializer Node ---")
    batch_len = len(state.get("initial_batch_of_events", []))
    print(f"Initializing batch processing for {batch_len} events.")
    return {"current_event_index": 0, "processed_event_summaries": []}

def select_next_event_from_batch_node(state: BatchPubSubState) -> Dict:
    print("\n--- Selecting Next Event from Batch Node ---")
    idx = state["current_event_index"]
    batch = state.get("initial_batch_of_events", [])
    
    update_dict = reset_fields_for_new_event_in_batch()

    if idx >= len(batch):
        print(f"select_next_event: Index {idx} out of bounds for batch size {len(batch)}. No event selected.")
        return update_dict 
    
    current_event_raw_payload = batch[idx]
    event_id = f"M_{str(uuid.uuid4())}" # Generate message ID
    source_publisher_id = current_event_raw_payload.get("source_type") # Should be "P_xx"
    details = current_event_raw_payload.get("details", {})
    
    print(f"Processing event {idx + 1}/{len(batch)}: ID {event_id}, Source Publisher {source_publisher_id}, Details: {details}")
    
    update_dict.update({
        "current_event_id": event_id,
        "current_event_source_publisher_id": source_publisher_id,
        "current_event_details": details,
    })
    return update_dict

def aggregate_event_result_node(state: BatchPubSubState) -> Dict:
    print("\n--- Aggregating Event Result Node ---")
    event_summary: EventSummary = {
        "event_id": state.get("current_event_id", "ERROR_NO_ID"),
        "original_payload_source": state.get("current_event_source_publisher_id", "ERROR_NO_SOURCE"),
        "original_payload_details": state.get("current_event_details", {}),
        "publisher_identity_in_graph": state.get("graph_publisher_node_name"),
        "published_message_content": state.get("published_message_content"),
        "published_message_topics": list(state.get("published_message_topics", [])),
        "matched_subscribers": list(state.get("subscribers_matched_for_event", [])),
        "completed_subscribers": set(state.get("subscribers_completed_for_event", set())),
        "subscriber_logs": dict(state.get("subscriber_logs", {}))
    }
    next_index = state["current_event_index"] + 1
    print(f"Finished processing Message ID {event_summary['event_id']}. Aggregated summary. Next index: {next_index}.")
    return {
        "processed_event_summaries": [event_summary],
        "current_event_index": next_index
    }

def batch_finalization_node(state: BatchPubSubState) -> Dict:
    print("\n--- Batch Finalization Node ---")
    summaries = state.get("processed_event_summaries", [])
    print(f"Batch processing complete. Total events processed: {len(summaries)}")
    for i, summary in enumerate(summaries):
        print(f"\n  Summary for Batch Event {i+1} (Message ID: {summary['event_id']}):")
        print(f"    Original Source: {summary['original_payload_source']}")
        print(f"    Processed by Graph Publisher Node: {summary['publisher_identity_in_graph']}")
        print(f"    Message Content: '{summary['published_message_content']}'")
        print(f"    Published Topics: {summary['published_message_topics']}")
        print(f"    Matched Subscribers: {summary['matched_subscribers']}")
    print("--------------------------------------")
    return {}

# --- 3. Generic Publisher Node Factory ---
def create_publisher_node(publisher_graph_node_name: str, actual_publisher_id: str):
    """
    Creates a publisher node function.
    - publisher_graph_node_name: e.g., "p_01" (how it's named in the graph)
    - actual_publisher_id: e.g., "P_01" (the logical ID of the publisher)
    """
    def publisher_node_logic(state: BatchPubSubState) -> Dict:
        message_id = state["current_event_id"]
        details = state["current_event_details"]
        
        print(f"\n--- Publisher Node: {publisher_graph_node_name} (for {actual_publisher_id}, Message ID: {message_id}) ---")
        
        # Generic message content generation
        content = f"Message from {actual_publisher_id} regarding {details.get('data', 'generic_event')}."
        
        # Generic topic assignment (e.g., publisher P_01 tends to publish on T_01, T_02)
        # This is a simple example; could be more complex
        num_topics_to_publish = random.randint(1, 2)
        published_topics = random.sample(TOPIC_NAMES, k=min(num_topics_to_publish, NUM_TOPICS))
        
        # Ensure the publisher's "primary" topic (if any) is included, e.g., P_01 always includes T_01
        primary_topic_index = int(actual_publisher_id.split('_')[1]) - 1
        if primary_topic_index < len(TOPIC_NAMES):
            primary_topic = TOPIC_NAMES[primary_topic_index]
            if primary_topic not in published_topics:
                if len(published_topics) < num_topics_to_publish:
                    published_topics.append(primary_topic)
                elif published_topics: # Replace one if full
                    published_topics[0] = primary_topic

        print(f"Node {publisher_graph_node_name} publishing: '{content}' with Topics: {published_topics}")
        
        return {
            "graph_publisher_node_name": publisher_graph_node_name,
            "published_message_content": content,
            "published_message_topics": published_topics,
        }
    return publisher_node_logic

# --- 4. Publisher Router Node (maps P_xx to p_xx) ---
def route_to_correct_publisher_node(state: BatchPubSubState) -> str:
    print("\n--- Publisher Router (Selecting graph node for current event) ---")
    source_publisher_id = state["current_event_source_publisher_id"] # This is "P_xx"
    message_id = state["current_event_id"]

    if not source_publisher_id or source_publisher_id not in PUBLISHER_IDS:
        print(f"Warning: Unknown source_publisher_id '{source_publisher_id}' for Message ID {message_id}. Defaulting.")
        # Default to the first publisher's graph node name
        default_node_name = format_id("p", 1, NUM_PUBLISHERS)
        return default_node_name

    # Convert "P_01" to "p_01"
    publisher_graph_node_name = source_publisher_id.lower() 
    print(f"Routing Message ID {message_id} (from {source_publisher_id}) to graph publisher node: {publisher_graph_node_name}")
    return publisher_graph_node_name

# --- 5. Subscriber Logic (Router and Generic Subscriber Node Factory) ---
def subscriber_router_node(state: BatchPubSubState) -> Dict:
    message_id = state["current_event_id"]
    print(f"\n--- Subscriber Router Node (Message ID: {message_id}) ---")
    
    published_topics = state.get("published_message_topics", [])
    if not published_topics:
        print(f"Error for Message ID {message_id}: No topics found.")
        return {"subscribers_matched_for_event": []}

    matched_subscriber_ids = [] # Stores "S_01", "S_02"
    for sub_id, interests in SUBSCRIBER_INTERESTS.items():
        if any(topic in published_topics for topic in interests):
            matched_subscriber_ids.append(sub_id)
            
    print(f"Message ID {message_id} - Topics: {published_topics}, Matched Subscribers: {matched_subscriber_ids}")
    return {"subscribers_matched_for_event": matched_subscriber_ids}

def create_subscriber_node(subscriber_graph_node_name: str, actual_subscriber_id: str):
    """
    - subscriber_graph_node_name: "s_01"
    - actual_subscriber_id: "S_01"
    """
    def subscriber_node_logic(state: BatchPubSubState) -> Dict:
        message_id = state['current_event_id']
        print(f"\n--- Subscriber Node: {subscriber_graph_node_name} (for {actual_subscriber_id}, Message ID: {message_id}) ---")
        
        message_content = state["published_message_content"]
        msg_topics = state["published_message_topics"]
        publisher_node = state.get("graph_publisher_node_name", "Unknown Publisher Node")
        
        log_entry = (f"{actual_subscriber_id} (via node {subscriber_graph_node_name}): Processed '{message_content}' "
                     f"(from {publisher_node}, Topics: {msg_topics})")
        print(log_entry)
        
        # subscriber_logs is for the current event (Message ID)
        # operator.ior merges this dict into the existing logs for this event
        current_event_log_update = { actual_subscriber_id: [log_entry] } 
        
        updated_completed_set = set(state.get("subscribers_completed_for_event", set()))
        updated_completed_set.add(actual_subscriber_id) # Track completion by S_xx ID
        
        return {
            "subscriber_logs": current_event_log_update,
            "subscribers_completed_for_event": updated_completed_set
        }
    return subscriber_node_logic

# --- 6. Conditional Logic for Dispatch (structurally same) ---
def check_if_event_selected_for_processing(state: BatchPubSubState) -> str:
    if state.get("current_event_id") is not None: # current_event_id is set if payload is selected
        print("check_if_event_selected: Event selected. Routing to publisher_router_hub.")
        return "publisher_router_hub"
    else:
        print("check_if_event_selected: No event selected. Batch processing will be finalized.")
        return "batch_finalize"

def route_to_next_subscriber_or_aggregate(state: BatchPubSubState) -> str:
    message_id = state['current_event_id']
    matched_subscriber_ids = state.get("subscribers_matched_for_event", []) # List of S_xx
    completed_subscriber_ids = state.get("subscribers_completed_for_event", set()) # Set of S_xx
    
    if not matched_subscriber_ids:
        print(f"Sub Dispatch (Msg ID {message_id}): No subscribers matched. Routing to aggregate_result.")
        return "aggregate_result"

    for sub_id_to_run in matched_subscriber_ids: # sub_id_to_run is "S_xx"
        if sub_id_to_run not in completed_subscriber_ids:
            subscriber_graph_node_name = sub_id_to_run.lower() # Convert "S_01" to "s_01"
            print(f"Sub Dispatch (Msg ID {message_id}): Routing to subscriber node: {subscriber_graph_node_name} (for {sub_id_to_run})")
            return subscriber_graph_node_name # Return "s_xx"
            
    print(f"Sub Dispatch (Msg ID {message_id}): All matched subscribers processed. Routing to aggregate_result.")
    return "aggregate_result"

def route_after_event_aggregation(state: BatchPubSubState) -> str:
    idx = state["current_event_index"] 
    batch_size = len(state.get("initial_batch_of_events", []))
    if idx < batch_size:
        print(f"Batch Route: More events in batch (next index {idx}). Routing to select_next_event.")
        return "select_next_event"
    else:
        print(f"Batch Route: All {batch_size} events processed. Routing to batch_finalize.")
        return "batch_finalize"

# --- 7. Define the Graph Workflow ---
workflow = StateGraph(BatchPubSubState)

# Add batch control nodes
workflow.add_node("batch_initialize", batch_initializer_node)
workflow.add_node("select_next_event", select_next_event_from_batch_node)
workflow.add_node("aggregate_result", aggregate_event_result_node)
workflow.add_node("batch_finalize", batch_finalization_node)

# Publisher Router Hub node
def publisher_router_hub_node(state: BatchPubSubState) -> Dict:
    print("\n--- Publisher Router Hub (Triggering publisher selection) ---")
    return {}
workflow.add_node("publisher_router_hub", publisher_router_hub_node)

# Dynamically add publisher nodes ("p_01", "p_02", ...)
publisher_graph_node_names = []
for i, pub_id in enumerate(PUBLISHER_IDS): # pub_id is "P_01"
    node_name = pub_id.lower() # "p_01"
    publisher_graph_node_names.append(node_name)
    workflow.add_node(node_name, create_publisher_node(node_name, pub_id))

# Add subscriber router
workflow.add_node("subscriber_router", subscriber_router_node)

# Dynamically add subscriber nodes ("s_01", "s_02", ...)
subscriber_graph_node_names = []
for i, sub_id in enumerate(SUBSCRIBER_IDS): # sub_id is "S_01"
    node_name = sub_id.lower() # "s_01"
    subscriber_graph_node_names.append(node_name)
    workflow.add_node(node_name, create_subscriber_node(node_name, sub_id))

# --- Define Edges and Control Flow ---
workflow.set_entry_point("batch_initialize")
workflow.add_edge("batch_initialize", "select_next_event")

workflow.add_conditional_edges(
    "select_next_event",
    check_if_event_selected_for_processing,
    {"publisher_router_hub": "publisher_router_hub", "batch_finalize": "batch_finalize"}
)

# Conditional routing from hub to specific publisher graph nodes ("p_xx")
publisher_routing_map = {name: name for name in publisher_graph_node_names}
workflow.add_conditional_edges(
    "publisher_router_hub",
    route_to_correct_publisher_node, # Returns "p_xx"
    publisher_routing_map
)

# All publisher graph nodes ("p_xx") route to the common subscriber_router
for pub_node_name in publisher_graph_node_names:
    workflow.add_edge(pub_node_name, "subscriber_router")

# Conditional routing from subscriber_router to specific subscriber graph nodes ("s_xx")
subscriber_routing_map = {name: name for name in subscriber_graph_node_names}
subscriber_routing_map["aggregate_result"] = "aggregate_result" # Add the aggregate case
workflow.add_conditional_edges(
    "subscriber_router",
    route_to_next_subscriber_or_aggregate, # Returns "s_xx" or "aggregate_result"
    subscriber_routing_map
)

# After each subscriber graph node ("s_xx") finishes, loop back to subscriber_router
for sub_node_name in subscriber_graph_node_names:
    workflow.add_edge(sub_node_name, "subscriber_router")

workflow.add_conditional_edges(
    "aggregate_result",
    route_after_event_aggregation,
    {"select_next_event": "select_next_event", "batch_finalize": "batch_finalize"}
)
workflow.add_edge("batch_finalize", END)

# --- 8. Compile the Graph ---
app = workflow.compile()

# --- 9. Run the Graph with a Batch of Events ---
def run_batch_through_pubsub(batch_of_event_payloads: List[Dict[str, Any]]):
    print(f"\n\n<<<<< STARTING NEW BATCH OF {len(batch_of_event_payloads)} EVENTS >>>>>")
    initial_state_for_batch = {"initial_batch_of_events": batch_of_event_payloads}
    final_graph_state = None
    for step_output in app.stream(initial_state_for_batch, {"recursion_limit": 200*(NUM_SUBSCRIBERS+1)}): # Adjust recursion limit based on entities
        node_name = list(step_output.keys())[0]
        final_graph_state = step_output[node_name]
    print("\n--- Final State After Batch Processing ---")
    if final_graph_state:
        print(f"Total events processed and summarized: {len(final_graph_state.get('processed_event_summaries', []))}")
    else: print("No final state captured.")
    print(f"<<<<< FINISHED BATCH PROCESSING >>>>>")



--- System Configuration ---
Publisher IDs: ['P_1', 'P_2', 'P_3']
Subscriber IDs: ['S_1', 'S_2', 'S_3', 'S_4']
Topic Names: ['T_1', 'T_2', 'T_3', 'T_4', 'T_5']
Subscriber Interests:
  S_1: ['T_5', 'T_3', 'T_2']
  S_2: ['T_3', 'T_2', 'T_1']
  S_3: ['T_2', 'T_3', 'T_5']
  S_4: ['T_3', 'T_1']
--------------------------


In [7]:
print("\n\n--- TESTING WITH EMPTY BATCH ---")
run_batch_through_pubsub([])




--- TESTING WITH EMPTY BATCH ---


<<<<< STARTING NEW BATCH OF 0 EVENTS >>>>>

--- Batch Initializer Node ---
Initializing batch processing for 0 events.

--- Selecting Next Event from Batch Node ---
select_next_event: Index 0 out of bounds for batch size 0. No event selected.
check_if_event_selected: No event selected. Batch processing will be finalized.

--- Batch Finalization Node ---
Batch processing complete. Total events processed: 0
--------------------------------------

--- Final State After Batch Processing ---
No final state captured.
<<<<< FINISHED BATCH PROCESSING >>>>>


In [8]:
print("\n\n--- TESTING WITH SINGLE EVENT BATCH ---")
run_batch_through_pubsub([
    {"source_type": PUBLISHER_IDS[0], "details": {"data": "single test event"}}
])



--- TESTING WITH SINGLE EVENT BATCH ---


<<<<< STARTING NEW BATCH OF 1 EVENTS >>>>>

--- Batch Initializer Node ---
Initializing batch processing for 1 events.

--- Selecting Next Event from Batch Node ---
Processing event 1/1: ID M_078112e3-aab8-4115-9ab9-c993a1d07330, Source Publisher P_1, Details: {'data': 'single test event'}
check_if_event_selected: Event selected. Routing to publisher_router_hub.

--- Publisher Router Hub (Triggering publisher selection) ---

--- Publisher Router (Selecting graph node for current event) ---
Routing Message ID M_078112e3-aab8-4115-9ab9-c993a1d07330 (from P_1) to graph publisher node: p_1

--- Publisher Node: p_1 (for P_1, Message ID: M_078112e3-aab8-4115-9ab9-c993a1d07330) ---
Node p_1 publishing: 'Message from P_1 regarding single test event.' with Topics: ['T_1', 'T_4']

--- Subscriber Router Node (Message ID: M_078112e3-aab8-4115-9ab9-c993a1d07330) ---
Message ID M_078112e3-aab8-4115-9ab9-c993a1d07330 - Topics: ['T_1', 'T_4'], Matched Subscr

KeyError: 'subscribers_completed_for_event'

In [5]:
# Example Batch of Events:
# Using the generated PUBLISHER_IDS
example_batch_events = [
    {"source_type": PUBLISHER_IDS[0], "details": {"data": "critical system alert", "severity": "high"}},
    {"source_type": PUBLISHER_IDS[1], "details": {"data": "user login detected", "user": "admin"}},
    {"source_type": PUBLISHER_IDS[0], "details": {"data": "database backup completed"}}, # P_01 again
    {"source_type": PUBLISHER_IDS[2 % NUM_PUBLISHERS], "details": {"data": "new software update available"}},
    {"source_type": PUBLISHER_IDS[1 % NUM_PUBLISHERS], "details": {"data": "minor sensor fluctuation", "value": 0.5}},
]
run_batch_through_pubsub(example_batch_events)





<<<<< STARTING NEW BATCH OF 5 EVENTS >>>>>

--- Batch Initializer Node ---
Initializing batch processing for 5 events.

--- Selecting Next Event from Batch Node ---
Processing event 1/5: ID M_a9a89170-2b21-4dce-945c-e86e6638fa50, Source Publisher P_1, Details: {'data': 'critical system alert', 'severity': 'high'}
check_if_event_selected: Event selected. Routing to publisher_router_hub.

--- Publisher Router Hub (Triggering publisher selection) ---

--- Publisher Router (Selecting graph node for current event) ---
Routing Message ID M_a9a89170-2b21-4dce-945c-e86e6638fa50 (from P_1) to graph publisher node: p_1

--- Publisher Node: p_1 (for P_1, Message ID: M_a9a89170-2b21-4dce-945c-e86e6638fa50) ---
Node p_1 publishing: 'Message from P_1 regarding critical system alert.' with Topics: ['T_1']

--- Subscriber Router Node (Message ID: M_a9a89170-2b21-4dce-945c-e86e6638fa50) ---
Message ID M_a9a89170-2b21-4dce-945c-e86e6638fa50 - Topics: ['T_1'], Matched Subscribers: ['S_1', 'S_2']
Sub Di