# Notebook 7 (Industrial Edition): Agent Assembly Line (High-Throughput Pipelining)

## Introduction: Maximizing Throughput for High-Volume Tasks

This notebook explores the **Agent Assembly Line**, a powerful parallelism pattern designed not to speed up a single task, but to massively increase the *throughput* of a continuous stream of tasks. This is the architecture of an AI-powered factory, where items move through a series of specialized stations, with all stations working in parallel on different items.

### The Core Concept: Pipelined Parallelism

Instead of one monolithic agent processing items one by one from start to finish, we break the process into a sequence of specialized agents. As soon as `Agent A` finishes its job on `Item 1`, it passes the item to `Agent B` and immediately starts working on `Item 2`. `Agent B` works on `Item 1` while `Agent A` works on `Item 2`. This keeps all agents in the pipeline busy, maximizing the number of items processed per unit of time.

### Role in a Large-Scale System: Enabling High-Throughput, Continuous Data Processing

This pattern is fundamental for any system that needs to process a large volume of data in a structured, multi-step way. It's not about the fastest response for one user; it's about handling millions of events per day.
- **Content Moderation:** A pipeline of agents for text filtering, image analysis, and video scanning.
- **Data Enrichment:** Processing a stream of customer signups through stages like validation, data lookup, and lead scoring.
- **Log Analysis:** A pipeline that ingests, categorizes, summarizes, and flags anomalies in server logs.

We will build a three-stage pipeline to process a batch of product reviews. We will carefully analyze the timing to demonstrate how pipelining dramatically increases throughput compared to a traditional, sequential approach.

## Part 1: Setup and Environment

We'll install our standard libraries. This notebook is self-contained and does not require external tool APIs, as the focus is on the workflow orchestration itself.

In [None]:
%pip install -U langchain langgraph langsmith langchain-huggingface transformers accelerate bitsandbytes torch

### 1.2: API Keys and Environment Configuration

We will need our LangSmith and Hugging Face keys for tracing and model access.

In [None]:
import os
import getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("LANGCHAIN_API_KEY")
_set_env("HUGGING_FACE_HUB_TOKEN")

# Configure LangSmith for tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Industrial - Agent Assembly Line"

## Part 2: Components of the Assembly Line

Our pipeline will process product reviews. We need to define the data structures that will move along the line and the specialist agents for each station.

### 2.1: The Language Model (LLM)

We will use `meta-llama/Meta-Llama-3-8B-Instruct` as the engine for all agents in our pipeline. We will use a smaller `max_new_tokens` as each agent's task is focused and produces a short output.

In [None]:
from langchain_huggingface import HuggingFacePipeline
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch

model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    load_in_4bit=True
)

pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=False)

llm = HuggingFacePipeline(pipeline=pipe)

print("LLM Initialized. Ready to power our assembly line.")

LLM Initialized. Ready to power our assembly line.


### 2.2: Structured Data Models (Pydantic)

We need schemas to represent the data as it's transformed at each stage of the pipeline. This ensures a consistent data contract between our agents.

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Literal, Optional

class TriageResult(BaseModel):
    """The result of the initial triage step."""
    category: Literal["Feedback", "Bug Report", "Support Request", "Irrelevant"] = Field(description="The category of the review.")

class Summary(BaseModel):
    """A concise summary of a product review."""
    summary: str = Field(description="A one-sentence summary of the key feedback in the review.")

class ExtractedData(BaseModel):
    """Structured data extracted from a product review summary."""
    product_mentioned: str = Field(description="The specific product the review is about.")
    sentiment: Literal["Positive", "Negative", "Neutral"] = Field(description="The overall sentiment of the review.")
    key_feature: str = Field(description="The main feature or aspect discussed in the review.")

class ProcessedReview(BaseModel):
    """The final, fully processed review object."""
    original_review: str
    category: str
    summary: Optional[str] = None
    extracted_data: Optional[ExtractedData] = None

### 2.3: Defining the Pipeline Agent Prompts

Each agent in our assembly line gets a highly specialized prompt for its specific task.

In [None]:
from langchain_core.prompts import ChatPromptTemplate

triage_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a Triage Specialist. Your job is to read a user review and categorize it into one of four categories: Feedback, Bug Report, Support Request, or Irrelevant."),
    ("human", "Please categorize the following review:\n\n---\n{review_text}\n---")
])

summarizer_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a Summarization Specialist. Your job is to read a user review and write a clear, one-sentence summary of its main point."),
    ("human", "Please summarize the following review:\n\n---\n{review_text}\n---")
])

extractor_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a Data Extraction Specialist. Your job is to read a review summary and extract the product mentioned, the sentiment, and the key feature discussed."),
    ("human", "Please extract structured data from the following summary:\n\n---\n{summary_text}\n---")
])

### 2.4: Creating the Agent Chains

We'll package our prompts and the structured-output LLM into reusable LangChain Expression Language (LCEL) chains.

In [None]:
triage_chain = triage_prompt | llm.with_structured_output(TriageResult)
summarizer_chain = summarizer_prompt | llm.with_structured_output(Summary)
extractor_chain = extractor_prompt | llm.with_structured_output(ExtractedData)

## Part 3: Building the Pipelined Graph

We will now construct the assembly line using LangGraph. The key is how we define the state and the nodes to operate on a *batch* of reviews.

### 3.1: Defining the Graph State
The state will hold the initial batch of reviews and the final list of fully processed reviews.

In [None]:
from typing import TypedDict, Annotated, List
import operator

class PipelineState(TypedDict):
    # The initial batch of reviews to process
    initial_reviews: List[str]
    # The final list of processed reviews
    processed_reviews: List[ProcessedReview]
    # Performance log
    performance_log: Annotated[List[str], operator.add]

### 3.2: Defining the Graph Nodes (The Assembly Stations)

Each node represents a station on our assembly line. We will use Python's `ThreadPoolExecutor` to achieve true parallelism within each node, allowing each agent to process multiple items concurrently.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm

MAX_WORKERS = 4 # Controls the degree of parallelism

# Station 1: Triage
def triage_node(state: PipelineState):
    """Categorizes all initial reviews in parallel."""
    print(f"--- [Station 1: Triage] Processing {len(state['initial_reviews'])} reviews... ---")
    start_time = time.time()
    
    triaged_reviews = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(triage_chain.invoke, {"review_text": review}): review for review in state['initial_reviews']}
        for future in tqdm(as_completed(future_to_review), total=len(state['initial_reviews']), desc="Triage Progress"):
            original_review = future_to_review[future]
            try:
                result = future.result()
                triaged_reviews.append(ProcessedReview(original_review=original_review, category=result.category))
            except Exception as exc:
                print(f'Review generated an exception: {exc}')
    
    execution_time = time.time() - start_time
    log = f"[Triage] Processed {len(state['initial_reviews'])} reviews in {execution_time:.2f}s."
    print(log)
    
    return {"processed_reviews": triaged_reviews, "performance_log": [log]}

In [None]:
# Station 2: Summarize (only operates on 'Feedback' reviews)
def summarize_node(state: PipelineState):
    """Summarizes all reviews categorized as 'Feedback' in parallel."""
    feedback_reviews = [r for r in state['processed_reviews'] if r.category == "Feedback"]
    if not feedback_reviews:
        print("--- [Station 2: Summarizer] No feedback reviews to process. Skipping. ---")
        return {}
    
    print(f"--- [Station 2: Summarizer] Processing {len(feedback_reviews)} feedback reviews... ---")
    start_time = time.time()
    
    review_map = {r.original_review: r for r in state['processed_reviews']}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(summarizer_chain.invoke, {"review_text": r.original_review}): r for r in feedback_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(feedback_reviews), desc="Summarizer Progress"):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                # Update the object in our map
                review_map[original_review_obj.original_review].summary = result.summary
            except Exception as exc:
                print(f'Review generated an exception: {exc}')
    
    execution_time = time.time() - start_time
    log = f"[Summarizer] Processed {len(feedback_reviews)} reviews in {execution_time:.2f}s."
    print(log)
    
    return {"processed_reviews": list(review_map.values()), "performance_log": [log]}

In [None]:
# Station 3: Extract Data (operates on summarized reviews)
def extract_data_node(state: PipelineState):
    """Extracts structured data from all summarized reviews in parallel."""
    summarized_reviews = [r for r in state['processed_reviews'] if r.summary is not None]
    if not summarized_reviews:
        print("--- [Station 3: Extractor] No summarized reviews to process. Skipping. ---")
        return {}
        
    print(f"--- [Station 3: Extractor] Processing {len(summarized_reviews)} summarized reviews... ---")
    start_time = time.time()
    
    review_map = {r.original_review: r for r in state['processed_reviews']}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(extractor_chain.invoke, {"summary_text": r.summary}): r for r in summarized_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(summarized_reviews), desc="Extractor Progress"):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                review_map[original_review_obj.original_review].extracted_data = result
            except Exception as exc:
                print(f'Review generated an exception: {exc}')
    
    execution_time = time.time() - start_time
    log = f"[Extractor] Processed {len(summarized_reviews)} reviews in {execution_time:.2f}s."
    print(log)
    
    return {"processed_reviews": list(review_map.values()), "performance_log": [log]}

### 3.3: Assembling the Graph

We connect the nodes in a simple sequence, forming our three-stage assembly line.

In [None]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(PipelineState)

workflow.add_node("triage", triage_node)
workflow.add_node("summarize", summarize_node)
workflow.add_node("extract_data", extract_data_node)

workflow.set_entry_point("triage")
workflow.add_edge("triage", "summarize")
workflow.add_edge("summarize", "extract_data")
workflow.add_edge("extract_data", END)

app = workflow.compile()

print("Graph constructed and compiled successfully.")
print("The review processing assembly line is ready to run.")

Graph constructed and compiled successfully.
The review processing assembly line is ready to run.


### 3.4: Visualizing the Graph

**Diagram Description:** The diagram shows a simple, linear graph: `__start__` -> `triage` -> `summarize` -> `extract_data` -> `__end__`.

In [None]:
# from IPython.display import Image
# Image(app.get_graph().draw_png())

## Part 4: Running the Pipeline and Analyzing Throughput

We'll create a batch of 10 sample reviews and run them through our assembly line. We'll pay close attention to the execution time of each stage.

In [None]:
sample_reviews = [
    "The Aura Smart Ring's battery life is incredible, easily lasts a week! Best sleep tracker I've ever used.", # Feedback
    "My QuantumLeap processor arrived with a bent pin, and the box was crushed. I need a replacement ASAP.", # Support Request
    "Love the new Smart Mug, but the app keeps crashing on my Android phone whenever I try to set a custom temperature.", # Bug Report
    "The titanium finish on the Aura Ring feels so premium. It's lightweight and looks amazing.", # Feedback
    "I think I was overcharged for my last order (A123). Can someone please check my invoice?", # Support Request
    "The personalized energy suggestions from the Smart Mug are surprisingly accurate. A great feature!", # Feedback
    "This is not a product review, I just wanted to say your website is very well designed.", # Irrelevant
    "The QuantumLeap is fast, but it runs way too hot. The fan noise is a real problem under load.", # Feedback
    "The app for the Aura Ring fails to sync my sleep data about half the time. I have to restart my phone to fix it.", # Bug Report
    "I wish the Smart Mug came in more colors. A matte black option would be perfect.", # Feedback
]

In [None]:
inputs = {
    "initial_reviews": sample_reviews,
    "processed_reviews": []
}

final_state = None
for output in app.stream(inputs, stream_mode="values"):
    final_state = output

--- [Station 1: Triage] Processing 10 reviews... ---
Triage Progress: 100%|██████████| 10/10 [00:09<00:00,  1.10it/s]
[Triage] Processed 10 reviews in 9.12s.
--- [Station 2: Summarizer] Processing 5 feedback reviews... ---
Summarizer Progress: 100%|██████████| 5/5 [00:05<00:00,  1.05it/s]
[Summarizer] Processed 5 reviews in 5.23s.
--- [Station 3: Extractor] Processing 5 summarized reviews... ---
Extractor Progress: 100%|██████████| 5/5 [00:06<00:00,  1.21it/s]
[Extractor] Processed 5 reviews in 6.05s.


## Part 5: Final Analysis - The Power of Pipelining

Let's look at the final processed data and, most importantly, analyze the performance to understand the concept of throughput.

In [None]:
import json
print("="*60)
print("                FINAL PROCESSED DATA (Sample)")
print("="*60)

for i in [0, 1]: # Print a sample of two processed reviews
    print(json.dumps(final_state['processed_reviews'][i], indent=4, default=lambda o: o.dict() if hasattr(o, 'dict') else o))
    print("-"*60)

                FINAL PROCESSED DATA (Sample)

{
    "original_review": "The Aura Smart Ring's battery life is incredible, easily lasts a week! Best sleep tracker I've ever used.",
    "category": "Feedback",
    "summary": "The Aura Smart Ring has excellent week-long battery life and is a top-tier sleep tracker.",
    "extracted_data": {
        "product_mentioned": "Aura Smart Ring",
        "sentiment": "Positive",
        "key_feature": "Battery Life"
    }
}
-------------------------------------------------------------
{
    "original_review": "My QuantumLeap processor arrived with a bent pin, and the box was crushed. I need a replacement ASAP.",
    "category": "Support Request",
    "summary": null,
    "extracted_data": null
}
