## Scatter-Gather Pattern

This notebook demonstrates the **Scatter-Gather** Enterprise Integration Pattern (EIP) using Rustic AI agents. 

The Scatter-Gather pattern sends a message to multiple recipients and aggregates the responses back into a single message. This is useful for:
- Collecting data from multiple sources
- Running parallel analysis tasks  
- Distributed processing with consolidation

### Pattern Flow

**1→3→1 Message Flow:**

```
[ProbeAgent] --AnalysisRequest--> (analysis_requests) 
                    ↓ SCATTER PHASE
    [ScatterAgent] routes to 3 specialized agents:
    ├── StatisticsAgent: Computes mean, median, std dev
    ├── TrendAgent: Analyzes data trends and slopes  
    └── AnomalyAgent: Detects outliers using 2-sigma rule
                    ↓ GATHER PHASE
    [AggregatingAgent] collects all 3 responses by correlation_id
                    ↓ 
    [ReportAgent] generates comprehensive analysis report
```

### Key Components

- **ScatterAgent**: Routes analysis requests to multiple specialized agents
- **AggregatingAgent**: Collects and correlates responses using the new EIP component
- **Collector Strategies**: Uses `DictCollector` to deduplicate by analysis type
- **Correlation Tracking**: Maintains request-response correlation across the flow

In [None]:
# Import required libraries and modules
import uuid

from rustic_ai.core.agents.eip.aggregating_agent import (
    AggregatedMessages,
    AggregatingAgent,
    AggregatorConf,
    CountingAggregator,
    DictCollector,
)
from rustic_ai.core.agents.eip.basic_wiring_agent import BasicWiringAgent
from rustic_ai.core.agents.testutils.probe_agent import ProbeAgent
from rustic_ai.core.guild.builders import AgentBuilder, GuildBuilder, RouteBuilder
from rustic_ai.core.utils.basic_class_utils import get_qualified_class_name
from rustic_ai.core.utils.jexpr import JExpr, JObj, JxScript

### Message Types

First, let's define the message models for our scatter-gather pattern. We have:
- **Request messages**: For scattering to different analysis agents
- **Result messages**: For gathering responses back
- **Report message**: For the final comprehensive analysis

Look at [scatter_gather_models.py](./helpers/scatter_gather_models.py) for the messages.

In [None]:
from helpers.scatter_gather_models import (
    AnalysisRequest,
    AnomalyRequest,
    AnomalyResult,
    ComprehensiveAnalysisReport,
    StatisticsRequest,
    StatisticsResult,
    TrendRequest,
    TrendResult,
)

### Analysis and Report Generation Agents

These are the specialized domain specific agents that perform duties for the overall analysis task.

There are 3 Analysis Agents:
- Statistical Analysis
- Trend Analysis
- Anomaly Analysis

And one Reporting Agent.

Look at [scatter_gather_agents.py](./helpers/scatter_gather_agents.py) for details.

In [None]:
from helpers.scatter_gather_agents import (
    AnomalyAgent,
    ReportAgent,
    StatisticsAgent,
    TrendAgent,
)

### Agent Specifications

Now let's create the agent specifications that will be used in our guild.

In [None]:
# Create analysis agent specifications
from rustic_ai.core.guild.dsl import JSONataPredicate

statistics_agent = (
    AgentBuilder(StatisticsAgent)
    .set_id("StatisticsAgent")
    .set_name("Statistics Analysis Agent")
    .set_description("Analyzes statistical properties of datasets")
    .add_additional_topic("statistics_analysis")
    .listen_to_default_topic(False)
    .build_spec()
)

trend_agent = (
    AgentBuilder(TrendAgent)
    .set_id("TrendAgent")
    .set_name("Trend Analysis Agent")
    .set_description("Analyzes trends in datasets")
    .add_additional_topic("trend_analysis")
    .listen_to_default_topic(False)
    .build_spec()
)

anomaly_agent = (
    AgentBuilder(AnomalyAgent)
    .set_id("AnomalyAgent")
    .set_name("Anomaly Detection Agent")
    .set_description("Detects anomalies in datasets")
    .add_additional_topic("anomaly_analysis")
    .listen_to_default_topic(False)
    .build_spec()
)

# Create scatter agent (for routing to multiple destinations)
scatter_filter = JxScript(JExpr("message").format == get_qualified_class_name(AnalysisRequest)).serialize()

scatter_agent = (
    AgentBuilder(BasicWiringAgent)
    .set_id("ScatterAgent")
    .set_name("Scatter Agent")
    .set_description("Routes requests to multiple analysis agents")
    .add_additional_topic("analysis_requests")
    .add_predicate(
        BasicWiringAgent.wire_message.__name__,
        JSONataPredicate(expression=scatter_filter),
    )
    .build_spec()
)

In [None]:
# Create aggregating agent for the gather phase
from rustic_ai.core.agents.eip.aggregating_agent import CorrelationLocation

aggregator_agent = (
    AgentBuilder(AggregatingAgent)
    .set_id("ResultAggregator")
    .set_name("Result Aggregator")
    .set_description("Aggregates results from multiple analysis agents")
    .set_properties(
        AggregatorConf(
            correlation_location=CorrelationLocation.PAYLOAD,
            correlation_id_path="correlation_id",  # Extract correlation ID from message data
            collector=DictCollector(key_field="analysis_type"),  # Group by analysis type to avoid duplicates
            aggregator=CountingAggregator(count=3),  # Wait for exactly 3 results (statistics, trends, anomalies)
        )
    )
    .add_additional_topic("analysis_results")  # Subscribe to analysis results from all agents
    .listen_to_default_topic(False)  # Do not listen to default topic, only to specific analysis results
    .build_spec()
)

# Create report agent for final processing
report_agent = (
    AgentBuilder(ReportAgent)
    .set_id("ReportAgent")
    .set_name("Report Generator Agent")
    .set_description("Generates comprehensive reports from aggregated analysis results")
    .add_additional_topic("aggregated_results")  # Subscribe to aggregated results from AggregatingAgent
    .listen_to_default_topic(False)  # Do not listen to default topic, only to aggregated results
    .build_spec()
)

### Routing Rules - The Scatter Phase

These routing rules define how the initial `AnalysisRequest` gets scattered to the three different analysis agents with appropriate message transformations.

In [None]:
# Statistics routing rule - transforms AnalysisRequest to StatisticsRequest
statistics_route = (
    RouteBuilder(scatter_agent)
    .filter_on_origin(origin_message_format=get_qualified_class_name(AnalysisRequest))
    .set_payload_transformer(
        StatisticsRequest,
        JxScript(
            JObj(
                {
                    "correlation_id": JExpr("request_id"),
                    "data": JExpr("dataset"),
                    "analysis_type": "statistics",
                }
            )
        ),
    )
    .set_destination_topics("statistics_analysis")
    .build()
)

# Trend routing rule - transforms AnalysisRequest to TrendRequest
trend_route = (
    RouteBuilder(scatter_agent)
    .filter_on_origin(origin_message_format=get_qualified_class_name(AnalysisRequest))
    .set_payload_transformer(
        TrendRequest,
        JxScript(
            JObj(
                {
                    "correlation_id": JExpr("request_id"),
                    "data": JExpr("dataset"),
                    "analysis_type": "trends",
                }
            )
        ),
    )
    .set_destination_topics("trend_analysis")
    .build()
)

# Anomaly routing rule - transforms AnalysisRequest to AnomalyRequest
anomaly_route = (
    RouteBuilder(scatter_agent)
    .filter_on_origin(origin_message_format=get_qualified_class_name(AnalysisRequest))
    .set_payload_transformer(
        AnomalyRequest,
        JxScript(
            JObj(
                {
                    "correlation_id": JExpr("request_id"),
                    "data": JExpr("dataset"),
                    "analysis_type": "anomalies",
                }
            )
        ),
    )
    .set_destination_topics("anomaly_analysis")
    .build()
)


# Define routes for the responses from the analyst agents
statistics_response_route = (
    RouteBuilder(statistics_agent)
    .on_message_format(StatisticsResult)
    .set_destination_topics("analysis_results")
    .build()
)

trend_response_route = (
    RouteBuilder(trend_agent).on_message_format(TrendResult).set_destination_topics("analysis_results").build()
)

anomaly_response_route = (
    RouteBuilder(anomaly_agent).on_message_format(AnomalyResult).set_destination_topics("analysis_results").build()
)

# Define route for the aggregated results
aggregated_results_route = (
    RouteBuilder(aggregator_agent)
    .on_message_format("rustic_ai.core.agents.eip.aggregating_agent.AggregatedMessages")
    .set_destination_topics("aggregated_results")
    .build()
)

### Guild Creation

Now let's create the guild that orchestrates our scatter-gather pattern with all agents and routing rules.

In [None]:
# Build the scatter-gather guild with bootstrap for state management
import os

from rustic_ai.core.guild.metastore.database import Metastore

guild_builder = (
    GuildBuilder(
        guild_id="ScatterGatherGuild",
        guild_name="Scatter-Gather Pattern Demo",
        guild_description="Complete scatter-gather EIP demonstration with aggregation",
    )
    .add_agent_spec(scatter_agent)  # Scatter phase agent
    .add_agent_spec(statistics_agent)  # Analysis agents
    .add_agent_spec(trend_agent)
    .add_agent_spec(anomaly_agent)
    .add_agent_spec(aggregator_agent)  # Gather phase agent
    .add_agent_spec(report_agent)  # Final report generation
    .add_route(statistics_route)  # Scatter routing rules
    .add_route(trend_route)
    .add_route(anomaly_route)
    .add_route(statistics_response_route)  # Response routing rules
    .add_route(trend_response_route)
    .add_route(anomaly_response_route)
    .add_route(aggregated_results_route)  # Route for aggregated results
    #.set_execution_engine(
    #    execution_engine_clz=MultiThreadedEngine.get_qualified_class_name()
    #)  # Use multiprocess execution engine for state management
)

# Bootstrap the guild so we have GuildManager for state management
db = "sqlite:///scatter_gather_demo.db"
# if file exists db delete it


if os.path.exists("scatter_gather_demo.db"):
    os.remove("scatter_gather_demo.db")

Metastore.initialize_engine(db)
Metastore.get_engine(db)
Metastore.create_db()
guild = guild_builder.bootstrap(metastore_database_url=db, organization_id="myorg")  # Use SQLite for simplicity

### Probe Agent Setup

Let's create a probe agent to monitor the entire scatter-gather flow and test the pattern.

In [None]:
import time

time.sleep(2)

# Create probe agent to monitor the entire flow
probe_agent = (
    AgentBuilder(ProbeAgent)
    .set_id("TestProbe")
    .set_name("Test Probe Agent")
    .set_description("Monitors the entire scatter-gather flow")
    .add_additional_topic("analysis_requests")  # Initial requests
    .add_additional_topic("statistics_analysis")  # Scattered requests
    .add_additional_topic("trend_analysis")
    .add_additional_topic("anomaly_analysis")
    .add_additional_topic("analysis_results")  # Individual analysis results
    .add_additional_topic("aggregated_results")  # Aggregated results
    .add_additional_topic("final_reports")  # Final comprehensive reports
    .build()
)

# Add probe agent to the bootstrapped guild
guild._add_local_agent(probe_agent)

In [None]:
from rustic_ai.core.agents.testutils.probe_agent import EssentialProbeAgent
from rustic_ai.core.guild.dsl import GuildTopics


system_probe = (
    AgentBuilder(EssentialProbeAgent)
    .set_id("SystemProbe")
    .set_name("System Probe Agent")
    .set_description("Monitors the system topic")
    .add_additional_topic(GuildTopics.SYSTEM_TOPIC)  # Subscribe to system messages
    .build()
)

### Testing the Scatter-Gather Pattern

Now let's test our complete scatter-gather implementation with sample data.

In [None]:
# Create test data for analysis
test_request = AnalysisRequest(
    request_id=str(uuid.uuid4()), 
    dataset=[100.0, 120.0, 95.0, 110.0, 130.0, 85.0, 140.0, 105.0], 
    query="Comprehensive data analysis"
)

print(f"Sending AnalysisRequest with ID: {test_request.request_id}")
print(f"Dataset: {test_request.dataset}")

# Send through probe agent to trigger the scatter-gather flow
probe_agent.publish_with_guild_route(topic="analysis_requests", payload=test_request)

In [None]:
# Let's check the message history to see the complete flow
probe_agent.print_all_history()

In [None]:
probe_agent.get_messages()

In [None]:
probe_agent._client._messaging.get_messages_for_topic_since(
    topic=GuildTopics.STATE_TOPIC,
    msg_id_since=0
)

In [None]:
guild

### Pattern Summary

This notebook demonstrated the **Scatter-Gather** Enterprise Integration Pattern using Rustic AI:

#### Key Features Implemented:
1. **Scatter Phase**: Single `AnalysisRequest` routed to 3 specialized analysis agents
2. **Parallel Processing**: Statistics, trend, and anomaly analysis run concurrently  
3. **Gather Phase**: `AggregatingAgent` correlates and collects all responses
4. **Final Processing**: `ReportAgent` creates comprehensive analysis report

#### EIP Components Used:
- **BasicWiringAgent**: For message routing and scattering
- **AggregatingAgent**: For correlation-based message aggregation
- **DictCollector**: For deduplication by analysis type
- **CountingAggregator**: To wait for exactly 3 responses
- **Payload Transformers**: JxScript transformations for message adaptation

#### Message Flow:
```
1 Request → 3 Parallel Analyses → 1 Aggregated Result → 1 Final Report
```

This pattern is essential for distributed processing scenarios where you need to:
- Collect data from multiple sources
- Run parallel computations
- Aggregate results maintaining correlation
- Generate consolidated reports

The implementation showcases reusable EIP components that can be adapted for various scatter-gather scenarios.

In [None]:
from rustic_ai.core.agents.system.models import StopGuildRequest


probe_agent.publish_with_guild_route(
    topic=GuildTopics.SYSTEM_TOPIC, payload=StopGuildRequest(guild_id=guild.id)
)  # Send the test request to start the flow

guild.shutdown()