[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Hawksight-AI/semantica/blob/main/cookbook/use_cases/intelligence/02_Intelligence_Analysis_Orchestrator_Worker.ipynb)

# Intelligence Analysis Orchestrator-Worker - Parallel Processing

## Overview

This notebook demonstrates **intelligence analysis using orchestrator-worker pattern** with focus on **parallel processing**, **pipeline orchestration**, and **multi-source integration**. The pipeline uses the Orchestrator-Worker pattern to coordinate parallel processing of OSINT feeds, threat intelligence, and geospatial data.

### Key Features

- **Parallel Processing**: Coordinates parallel processing of multiple intelligence sources
- **Pipeline Orchestration**: Uses pipeline module for workflow management
- **Multi-Source Integration**: Integrates OSINT feeds, threat intelligence, and geospatial data
- **Orchestrator-Worker Pattern**: Demonstrates distributed processing architecture
- **Hybrid RAG**: Combines multiple intelligence sources for comprehensive analysis

### Pipeline Architecture

1. **Phase 0**: Setup & Configuration
2. **Phase 1**: Multi-Source Intelligence Ingestion (OSINT, Threat Intel, Geospatial)
3. **Phase 2**: Parallel Processing Setup (Orchestrator-Worker)
4. **Phase 3**: Entity Extraction (Source, Entity, Event, Location, Timeframe)
5. **Phase 4**: Pipeline Orchestration
6. **Phase 5**: Multi-Source Correlation
7. **Phase 6**: Hybrid RAG Query System
8. **Phase 7**: Visualization & Export

---

## Installation


In [None]:
%pip install -qU semantica networkx matplotlib plotly pandas groq


---

## Phase 0: Setup & Configuration


In [None]:
import os
from semantica.core import Semantica, ConfigManager
from semantica.pipeline import PipelineOrchestrator

os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "your-key")

config_dict = {
    "project_name": "Intelligence_Analysis_Orchestrator_Worker",
    "extraction": {"provider": "groq", "model": "llama-3.1-8b-instant"},
    "knowledge_graph": {"backend": "networkx"}
}

config = ConfigManager().load_from_dict(config_dict)
core = Semantica(config=config)
orchestrator = PipelineOrchestrator()
print("Configured for intelligence analysis with orchestrator-worker pattern focus")


---

## Phase 1: Real Data Ingestion (Multiple RSS Feeds with Pipeline Orchestrator)

Ingest intelligence data from multiple RSS feeds using PipelineOrchestrator.


In [None]:
from semantica.ingest import FeedIngestor, FileIngestor
import os

os.makedirs("data", exist_ok=True)

# Define multiple RSS feeds for parallel ingestion
intelligence_feeds = {
    "osint": "https://www.us-cert.gov/ncas/alerts.xml",  # OSINT feed
    "threat": "https://www.us-cert.gov/ncas/alerts.xml",  # Threat intel feed (example)
    # Add more feeds as needed
}

# Use PipelineOrchestrator for parallel ingestion
documents = []
for feed_name, feed_url in intelligence_feeds.items():
    try:
        feed_ingestor = FeedIngestor()
        # Add worker to orchestrator
        orchestrator.add_worker(f"{feed_name}_worker", feed_ingestor, source=feed_url)
    except Exception as e:
        print(f"Failed to add worker for {feed_name}: {e}")

# Execute parallel ingestion
try:
    results = orchestrator.execute_parallel()
    for result in results:
        documents.extend(result.get("documents", []))
    print(f"Ingested {len(documents)} documents from parallel feeds")
except Exception as e:
    print(f"Parallel ingestion failed: {e}")
    # Fallback: Sequential ingestion
    for feed_name, feed_url in intelligence_feeds.items():
        try:
            feed_ingestor = FeedIngestor()
            feed_documents = feed_ingestor.ingest(feed_url, method="rss")
            documents.extend(feed_documents)
            print(f"Ingested {len(feed_documents)} documents from {feed_name}")
        except Exception as e2:
            print(f"Feed ingestion failed for {feed_name}: {e2}")

# Fallback: Sample data
if not documents:
    osint_data = "OSINT: Public records show connection between Entity A and Location X."
    threat_data = "Threat Intel: Threat actor group Y operates in Region Z."
    geo_data = "Geospatial: Activity detected at coordinates 40.7128, -74.0060."
    with open("data/intelligence.txt", "w") as f:
        f.write(f"{osint_data}\n{threat_data}\n{geo_data}")
    documents = FileIngestor().ingest("data/intelligence.txt")
    print(f"Ingested {len(documents)} documents from sample data")


---

## Phase 2: Text Normalization & Conflict Detection

Normalize multi-source intelligence data and detect conflicts.


In [None]:
from semantica.normalize import TextNormalizer
from semantica.conflicts import ConflictDetector

# Normalize multi-source intelligence data
normalizer = TextNormalizer()
normalized_documents = []
for doc in documents:
    normalized_text = normalizer.normalize(
        doc.content if hasattr(doc, 'content') else str(doc),
        clean_html=True,
        normalize_entities=True,
        remove_extra_whitespace=True
    )
    normalized_documents.append(normalized_text)

print(f"Normalized {len(normalized_documents)} documents")

# Build intelligence knowledge graph
result = core.build_knowledge_base(
    sources=normalized_documents,
    custom_entity_types=["Source", "Entity", "Event", "Location", "Timeframe"],
    graph=True
)

kg = result["knowledge_graph"]
entities = result["entities"]

# Detect conflicts from multiple intelligence sources
detector = ConflictDetector()
conflicts = detector.detect_conflicts(entities, kg.get("relationships", []))

print(f"Built intelligence KG with {len(kg.get('entities', []))} entities")
print(f"Detected {len(conflicts)} conflicts from multiple sources")
if conflicts:
    resolved = detector.resolve_conflicts(conflicts, strategy="highest_confidence")
    print(f"Resolved {len(resolved)} conflicts")
print("Focus: Parallel processing, pipeline orchestration, multi-source integration")


In [None]:
# Correlate multi-source intelligence
from semantica.reasoning import GraphReasoner

reasoner = GraphReasoner(kg)
correlations = reasoner.find_correlations(
    source_types=["Source"],
    target_types=["Entity", "Location"]
)

print(f"Multi-source correlation: {len(correlations)} correlations found")
print("\n=== Pipeline Summary ===")
print(f"✓ Ingested {len(documents)} documents from multiple RSS feeds using PipelineOrchestrator")
print(f"✓ Normalized {len(normalized_documents)} documents")
print(f"✓ Detected and resolved {len(conflicts)} conflicts")
print(f"✓ This cookbook emphasizes parallel processing, PipelineOrchestrator, and multi-source integration")


---

## Phase 7: Visualization


In [None]:
from semantica.visualization import KGVisualizer

visualizer = KGVisualizer()
visualizer.visualize(kg, output_path="intelligence_analysis.html")

print("Intelligence analysis (orchestrator-worker) complete")
print("Emphasizes: Parallel processing, pipeline orchestration, multi-source integration")
