[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Hawksight-AI/semantica/blob/main/cookbook/advanced/07_Pipeline_Orchestration.ipynb)

# Pipeline Orchestration

## Overview

Build complex pipelines, execute them, handle failures, enable parallel processing, and monitor execution.


**Documentation**: [API Reference](https://semantica.readthedocs.io/reference/pipeline/)

## Installation

Install Semantica from PyPI:

```bash
pip install semantica
# Or with all optional dependencies:
pip install semantica[all]
```

## Workflow: Build Pipelines → Execute → Handle Failures → Parallel Processing → Monitor


In [None]:
from semantica.pipeline import (
    PipelineBuilder,
    ExecutionEngine,
    FailureHandler,
    ParallelismManager,
    RetryPolicy,
    RetryStrategy
)
from semantica.ingest import FileIngestor
from semantica.parse import DocumentParser
from semantica.semantic_extract import NERExtractor
from semantica.kg import GraphBuilder
import time


## Step 1: Build Complex Pipelines


In [None]:
builder = PipelineBuilder()

file_ingestor = FileIngestor()
document_parser = DocumentParser()
ner_extractor = NERExtractor()
graph_builder = GraphBuilder()

# Define handlers for each pipeline step
def ingest_handler(data, **config):
    files = data.get("files", [])
    if files:
        # Ingest first file as example
        file_obj = file_ingestor.ingest_file(files[0], read_content=True)
        return {**data, "file": file_obj}
    return data

def parse_handler(data, **config):
    # If a file was ingested, try parsing; otherwise pass text through
    file_obj = data.get("file")
    if file_obj and getattr(file_obj, "path", None):
        parsed = document_parser.parse_document(file_obj.path)
        text = parsed.get("text") if isinstance(parsed, dict) else None
        return {**data, "text": text or data.get("text")}
    return data

def extract_handler(data, **config):
    text = data.get("text", "")
    entities = ner_extractor.extract_entities(text)
    # Normalize to dict list for graph builder
    entity_dicts = [
        {"id": f"e{i}", "name": e.text, "type": e.label} for i, e in enumerate(entities)
    ]
    return {**data, "entities": entity_dicts}

def build_graph_handler(data, **config):
    entities = data.get("entities", [])
    graph = graph_builder.build({"entities": entities})
    return {**data, "graph": graph}

# Build pipeline with proper handlers and dependencies
pipeline = (
    builder
    .add_step("ingest", "ingest", handler=ingest_handler)
    .add_step("parse", "parse", dependencies=["ingest"], handler=parse_handler)
    .add_step("extract", "extract", dependencies=["parse"], handler=extract_handler)
    .add_step("build_graph", "build_graph", dependencies=["extract"], handler=build_graph_handler)
).build()


## Step 2: Execute Pipeline


In [None]:
engine = ExecutionEngine()

input_data = {
    "text": "Alice works at Tech Corp. Bob is a friend of Alice.",
    "files": []
}

start_time = time.time()
result = engine.execute_pipeline(pipeline, input_data)
execution_time = result.metrics.get("execution_time", time.time() - start_time)


## Step 3: Handle Failures


In [None]:
# Configure retry policy for the 'extract' step type
engine.failure_handler.set_retry_policy(
    "extract",
    RetryPolicy(max_retries=3, backoff_factor=2.0, strategy=RetryStrategy.EXPONENTIAL)
)

result = engine.execute_pipeline(pipeline, input_data)
print("Pipeline executed with retry policy configured")


## Step 4: Parallel Processing


In [None]:
parallelism = ParallelismManager(max_workers=4)

# Identify groups of steps that can run in parallel
groups = parallelism.identify_parallelizable_steps(pipeline)

# Execute first parallelizable group as a demonstration
start_time = time.time()
parallel_results = []
for group in groups:
    parallel_results.extend(parallelism.execute_pipeline_steps_parallel(group, input_data, max_workers=4))
parallel_time = time.time() - start_time


## Step 5: Monitor Pipeline Execution


In [None]:
# Metrics from execution engine
metrics = result.metrics
progress = engine.get_progress(pipeline.name)

print(f"Duration: {metrics.get('execution_time', 0):.2f} seconds")
print(f"Steps Executed: {metrics.get('steps_executed', 0)}")
print(f"Steps Failed: {metrics.get('steps_failed', 0)}")
print(f"Progress: {progress.get('progress_percentage', 0):.1f}% (status: {progress.get('status')})")


## Summary

Pipeline orchestration workflow:
- Complex Pipeline Built
- Pipeline Executed
- Failure Handling Configured
- Parallel Processing Enabled
- Full Monitoring and Observability
