# LAB 3: LANGCHAIN ORCHESTRATION - COMPLETE GUIDE

**Course:** Advanced Prompt Engineering Training  
**Session:** Session 3 - RAG & Advanced Retrieval (Day 2)  
**Duration:** 120 minutes (2 hours)  
**Type:** LangChain-Based Orchestration Workshop

## LAB OVERVIEW

This comprehensive lab teaches you to build **production-grade orchestrated AI workflows using LangChain**. You'll progress through four interconnected modules:

1. **Sequential Chains (LCEL)** - Linear multi-step workflows with LangChain Expression Language
2. **Parallel Processing (RunnableParallel)** - Concurrent execution for speed
3. **Conditional Workflows (RunnableBranch)** - Dynamic routing based on data
4. **Production Orchestration** - Error handling, retries, monitoring with LangChain

**Scenario:** You're building an AI-powered loan processing system for a bank. Starting with simple sequential workflows, you'll evolve to a production-ready system handling thousands of applications daily.

### Step 1: Import Libraries

In [None]:
# Lab 3: LangChain Orchestration
# Advanced Prompt Engineering Training - Session 3

import os
import json
import time
import re
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum

# LangChain imports
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableParallel,
    RunnableLambda,
    RunnableBranch,
    Runnable
)
from langchain_core.messages import HumanMessage, SystemMessage

import pandas as pd
import numpy as np

from dotenv import load_dotenv

load_dotenv(override=True)

print("✓ Libraries imported successfully")
print(f"✓ LangChain version ready")

### Step 2: Configure LangChain Models

In [None]:
# Initialize LangChain ChatOpenAI models
GPT4_MODEL = os.environ.get("MODEL_NAME", "gpt-4o")
GPT35_MODEL = os.environ.get("FAST_MODEL_NAME", "gpt-3.5-turbo")

# Create model instances
llm_gpt4 = ChatOpenAI(
    model=GPT4_MODEL,
    temperature=0,
    max_tokens=2000,
    model_kwargs={"top_p": 1.0}
)

llm_gpt35 = ChatOpenAI(
    model=GPT35_MODEL,
    temperature=0,
    max_tokens=2000,
    model_kwargs={"top_p": 1.0}
)

# Model with retry configuration
llm_gpt4_with_retry = ChatOpenAI(
    model=GPT4_MODEL,
    temperature=0,
    max_tokens=2000,
    max_retries=3,
    request_timeout=30
)

print(f"✓ LangChain ChatOpenAI configured")
print(f"✓ Primary model: {GPT4_MODEL}")
print(f"✓ Fast model: {GPT35_MODEL}")
print(f"✓ Retry-enabled model ready")

### Step 3: Create Helper Functions and Data Classes

In [None]:
@dataclass
class ChainMetrics:
    """Track metrics for chain execution"""
    chain_name: str
    start_time: float
    end_time: float = 0.0
    output: Any = None
    error: Optional[str] = None
    
    @property
    def duration(self) -> float:
        return self.end_time - self.start_time if self.end_time > 0 else 0.0
    
    def complete(self, output: Any = None, error: Optional[str] = None):
        self.end_time = time.time()
        self.output = output
        self.error = error


def print_chain_output(output: Any, title: str = "Chain Output", metrics: Optional[ChainMetrics] = None):
    """Pretty print chain output with optional metrics"""
    print(f"\n{'='*80}")
    print(f"{title}")
    print(f"{'='*80}")
    
    if metrics:
        print(f"Duration: {metrics.duration:.2f}s")
        print(f"")
    
    if isinstance(output, dict):
        print(json.dumps(output, indent=2))
    else:
        print(output)
    
    print(f"{'='*80}\n")


# Test the setup
test_chain = llm_gpt35 | StrOutputParser()
test_output = test_chain.invoke("Say 'LangChain setup complete!' if you receive this.")
print_chain_output(test_output, "Setup Test")

### Step 4: Load Sample Data

In [None]:
# Sample loan application data for the lab
SAMPLE_LOAN_APPLICATION = """
LOAN APPLICATION #LA-2024-5821

Applicant Information:
- Name: Sarah Johnson
- Age: 34
- Employment: Senior Software Engineer at TechCorp
- Annual Income: $145,000
- Employment Duration: 6 years
- Credit Score: 750

Loan Details:
- Loan Type: Home Mortgage
- Loan Amount: $485,000
- Property Value: $620,000
- Down Payment: $135,000 (22%)
- Loan Term: 30 years

Financial Information:
- Monthly Gross Income: $12,083
- Existing Debts: 
  * Car Loan: $425/month (12 months remaining)
  * Student Loan: $280/month (24 months remaining)
  * Credit Card: $150/month average
- Total Monthly Debt: $855

Property Information:
- Property Address: 245 Oak Street, San Francisco, CA
- Property Type: Single-family home
- Year Built: 2018
- Square Footage: 2,100 sq ft
- Condition: Excellent

Supporting Documents:
- 2 years tax returns: Provided
- 3 months pay stubs: Provided
- Bank statements: Provided
- Employment verification: Verified
- Property appraisal: $620,000
"""

# Additional test applications
SMALL_LOAN_APP = """
LOAN APPLICATION #LA-2024-5822
Applicant: Mike Chen
Loan Type: Personal Loan
Amount: $15,000
Income: $65,000
Credit Score: 720
Employment: 3 years
"""

LARGE_LOAN_APP = """
LOAN APPLICATION #LA-2024-5823
Applicant: Jennifer Martinez
Loan Type: Commercial Real Estate
Amount: $2,500,000
Business Revenue: $8,000,000/year
Credit Score: 780
Business Age: 12 years
Collateral: Commercial property valued at $3,200,000
"""

print("✓ Sample data loaded")
print(f"✓ Main application: {SAMPLE_LOAN_APPLICATION.split()[3]}")
print(f"✓ Test applications: 2 additional scenarios ready")

---

## PART 1: SEQUENTIAL CHAINS WITH LCEL (Lab 3.1)

**Duration:** 30 minutes  
**Objective:** Build linear multi-step workflows using LangChain Expression Language

### Theory: LCEL Sequential Pattern

**Concept:** Chain components together using the pipe operator `|`

```python
chain = prompt | llm | output_parser
result = chain.invoke(input)
```

**Advantages over manual implementation:**
- Composable and reusable
- Built-in streaming support
- Automatic retry handling
- Better debugging and visualization

### Challenge 1.1: Basic LCEL Sequential Chain (10 minutes)

**Scenario:** Process a loan application through 3 steps using LCEL:
1. Extract key information
2. Calculate financial ratios
3. Generate summary

In [None]:
# Step 1: Create extraction chain with JSON output
extraction_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a data extraction specialist. Always return valid JSON."),
    ("user", """Extract the following information from this loan application and return as JSON:
    
Required fields:
- applicant_name
- loan_amount (number only)
- annual_income (number only)
- credit_score (number only)
- monthly_debt (total monthly debt payments, number only)
- loan_type
- property_value (if applicable, number only)
- down_payment (if applicable, number only)

Loan Application:
{application}

Return ONLY valid JSON, no other text.""")
])

# Create the extraction chain
extraction_chain = extraction_prompt | llm_gpt35 | JsonOutputParser()

# Test extraction
print("Testing extraction chain...")
extraction_result = extraction_chain.invoke({"application": SAMPLE_LOAN_APPLICATION})
print_chain_output(extraction_result, "Step 1: Extraction Result")

In [None]:
# Step 2: Create ratio calculation chain
ratio_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a financial analyst. Always return valid JSON with accurate calculations."),
    ("user", """Given this extracted loan application data, calculate these financial metrics:
    
Extracted Data:
{extracted_data}

Calculate:
1. DTI (Debt-to-Income Ratio): (monthly_debt / monthly_income) × 100
   - monthly_income = annual_income / 12
   - Express as percentage

2. LTV (Loan-to-Value Ratio): (loan_amount / property_value) × 100
   - If property_value exists
   - Express as percentage

3. Estimated Monthly Payment: Rough estimate for 30-year mortgage at 7% interest
   - Use formula: P = L[c(1 + c)^n]/[(1 + c)^n - 1]
   - Where L = loan amount, c = monthly interest rate, n = number of payments

Return as JSON with:
{{
    "dti_ratio": <percentage>,
    "dti_assessment": "<Good/Fair/Poor based on: <28% = Good, 28-36% = Fair, >36% = Poor>",
    "ltv_ratio": <percentage or null>,
    "ltv_assessment": "<Good/Fair/Poor based on: <80% = Good, 80-90% = Fair, >90% = Poor>",
    "estimated_monthly_payment": <amount>,
    "affordability_check": "<Can afford based on 28% housing ratio rule>"
}}

Return ONLY valid JSON.""")
])

ratio_chain = ratio_prompt | llm_gpt35 | JsonOutputParser()

# Test ratio calculation
print("Testing ratio calculation chain...")
ratio_result = ratio_chain.invoke({"extracted_data": json.dumps(extraction_result)})
print_chain_output(ratio_result, "Step 2: Ratio Calculation Result")

In [None]:
# Step 3: Create summary generation chain
summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a senior loan officer writing executive summaries for the credit committee."),
    ("user", """Create an executive summary for this loan application.
    
Applicant Data:
{extracted_data}

Financial Analysis:
{ratios}

Write a concise executive summary (3-4 paragraphs) covering:

1. Applicant Overview
   - Name, income, credit score
   - Employment stability

2. Financial Strength
   - DTI and LTV ratios
   - Overall financial health

3. Risk Assessment
   - Key strengths
   - Potential concerns

4. Recommendation
   - APPROVE: Strong application, low risk
   - REVIEW: Acceptable but needs senior review
   - DENY: High risk, does not meet criteria

Be professional and concise. Focus on facts and ratios.""")
])

summary_chain = summary_prompt | llm_gpt4 | StrOutputParser()

# Test summary generation
print("Testing summary generation chain...")
summary_result = summary_chain.invoke({
    "extracted_data": json.dumps(extraction_result),
    "ratios": json.dumps(ratio_result)
})
print_chain_output(summary_result, "Step 3: Executive Summary")

In [None]:
# Now compose them into a complete sequential chain using LCEL
def format_for_ratios(data):
    """Helper function to format extraction output for ratio chain"""
    return {"extracted_data": json.dumps(data)}

def format_for_summary(data):
    """Helper function to format both outputs for summary chain"""
    return {
        "extracted_data": json.dumps(data.get("extraction")),
        "ratios": json.dumps(data.get("ratios"))
    }

# Complete sequential chain with LCEL
complete_sequential_chain = (
    {"application": RunnablePassthrough()}
    | extraction_prompt 
    | llm_gpt35 
    | JsonOutputParser()
    | RunnableLambda(lambda x: {"extraction": x})
    | RunnablePassthrough.assign(
        ratios=lambda x: ratio_chain.invoke({"extracted_data": json.dumps(x["extraction"])})
    )
    | RunnablePassthrough.assign(
        summary=lambda x: summary_chain.invoke({
            "extracted_data": json.dumps(x["extraction"]),
            "ratios": json.dumps(x["ratios"])
        })
    )
)

# Execute the complete chain
print("\n" + "="*80)
print("EXECUTING COMPLETE SEQUENTIAL CHAIN WITH LCEL")
print("="*80 + "\n")

start_time = time.time()
final_result = complete_sequential_chain.invoke(SAMPLE_LOAN_APPLICATION)
total_time = time.time() - start_time

print("\n" + "="*80)
print("SEQUENTIAL CHAIN COMPLETE")
print("="*80)
print(f"Total Time: {total_time:.2f}s")
print(f"Steps Executed: 3 (extraction → ratios → summary)")
print("="*80)

print("\n" + "="*80)
print("FINAL EXECUTIVE SUMMARY")
print("="*80)
print(final_result["summary"])
print("="*80)

### Challenge 1.2: Enhanced Sequential Chain with State Tracking (10 minutes)

**Objective:** Add comprehensive logging using LangChain's callback system

In [None]:
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult

class LoanProcessingCallbackHandler(BaseCallbackHandler):
    """Custom callback handler to track chain execution"""
    
    def __init__(self):
        self.logs = []
        self.total_tokens = 0
        self.step_count = 0
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs) -> None:
        self.step_count += 1
        timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        log_msg = f"[{timestamp}] Step {self.step_count}: LLM call started"
        self.logs.append(log_msg)
        print(log_msg)
    
    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        if response.llm_output and "token_usage" in response.llm_output:
            tokens = response.llm_output["token_usage"].get("total_tokens", 0)
            self.total_tokens += tokens
            log_msg = f"[{timestamp}] ✓ Step {self.step_count} completed ({tokens} tokens)"
        else:
            log_msg = f"[{timestamp}] ✓ Step {self.step_count} completed"
        self.logs.append(log_msg)
        print(log_msg)
    
    def on_llm_error(self, error: Exception, **kwargs) -> None:
        timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        log_msg = f"[{timestamp}] ✗ Step {self.step_count} failed: {str(error)}"
        self.logs.append(log_msg)
        print(log_msg)
    
    def get_summary(self) -> Dict[str, Any]:
        return {
            "total_steps": self.step_count,
            "total_tokens": self.total_tokens,
            "logs": self.logs
        }

# Execute chain with callback handler
print("\n" + "="*80)
print("STATEFUL SEQUENTIAL WORKFLOW WITH CALLBACKS")
print("="*80 + "\n")

callback_handler = LoanProcessingCallbackHandler()

start_time = time.time()
result_with_tracking = complete_sequential_chain.invoke(
    SAMPLE_LOAN_APPLICATION,
    config={"callbacks": [callback_handler]}
)
total_time = time.time() - start_time

# Display summary
summary = callback_handler.get_summary()
print("\n" + "="*80)
print("PROCESSING SUMMARY")
print("="*80)
print(f"Total Steps: {summary['total_steps']}")
print(f"Total Tokens: {summary['total_tokens']}")
print(f"Total Time: {total_time:.2f}s")
print("\nProcessing Log:")
for log in summary['logs']:
    print(f"  {log}")
print("="*80)

---

## PART 2: PARALLEL PROCESSING WITH RUNNABLEPARALLEL (Lab 3.2)

**Duration:** 30 minutes  
**Objective:** Execute independent tasks concurrently using RunnableParallel

### Theory: RunnableParallel Pattern

**Concept:** Execute multiple chains concurrently and collect results

```python
parallel_chain = RunnableParallel(
    task_a=chain_a,
    task_b=chain_b,
    task_c=chain_c
)
results = parallel_chain.invoke(input)  # All run concurrently!
```

**Expected speedup:** ~3x for 3 parallel tasks

### Challenge 2.1: Parallel Credit Checks with RunnableParallel (15 minutes)

**Scenario:** Verify loan application across 3 independent sources:
1. Credit bureau check
2. Employment verification
3. Identity verification

All run in parallel with RunnableParallel!

In [None]:
# Define verification prompts
credit_bureau_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a credit bureau API returning verification results."),
    ("user", """Perform credit bureau verification for this applicant:
    
Applicant Data:
{applicant_data}

Verify and return JSON with:
{{
    "credit_score": <number>,
    "credit_score_valid": <true/false>,
    "payment_history": "<Excellent/Good/Fair/Poor>",
    "outstanding_debts": <total amount>,
    "bankruptcy_history": <true/false>,
    "verification_status": "VERIFIED" or "FAILED",
    "notes": "<any concerns or all clear>"
}}

Simulate realistic credit bureau response. Use the credit score from applicant_data.""")
])

employment_verification_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an employment verification service API."),
    ("user", """Verify employment for this applicant:
    
Applicant Data:
{applicant_data}

Return JSON with:
{{
    "employer": "<company name>",
    "position": "<job title>",
    "employment_verified": <true/false>,
    "tenure_years": <number>,
    "annual_income_verified": <number>,
    "income_matches_stated": <true/false>,
    "employment_stable": <true/false>,
    "verification_status": "VERIFIED" or "FAILED",
    "notes": "<any discrepancies or all clear>"
}}

Use data from applicant_data. Simulate realistic employment verification.""")
])

identity_verification_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an identity verification service API."),
    ("user", """Verify identity for this applicant:
    
Applicant Data:
{applicant_data}

Return JSON with:
{{
    "identity_verified": <true/false>,
    "name_matches": <true/false>,
    "address_verified": <true/false>,
    "fraud_flags": <number of flags, 0 = none>,
    "risk_level": "<LOW/MEDIUM/HIGH>",
    "verification_status": "VERIFIED" or "FAILED",
    "notes": "<any red flags or all clear>"
}}

Simulate realistic identity verification check.""")
])

# Create individual verification chains
credit_check_chain = credit_bureau_prompt | llm_gpt35 | JsonOutputParser()
employment_check_chain = employment_verification_prompt | llm_gpt35 | JsonOutputParser()
identity_check_chain = identity_verification_prompt | llm_gpt35 | JsonOutputParser()

print("✓ Verification chains created")

In [None]:
# Create parallel verification chain using RunnableParallel
parallel_verification_chain = RunnableParallel(
    credit_bureau=credit_check_chain,
    employment=employment_check_chain,
    identity=identity_check_chain
)

# Test data
test_applicant = {
    "name": "Sarah Johnson",
    "annual_income": 145000,
    "credit_score": 750,
    "employer": "TechCorp",
    "position": "Senior Software Engineer",
    "employment_years": 6,
    "address": "245 Oak Street, San Francisco, CA"
}

# Execute parallel verifications
print("\n" + "="*80)
print("PARALLEL VERIFICATION CHECKS")
print("="*80)
print("Starting parallel execution...\n")

start_time = time.time()
verification_results = parallel_verification_chain.invoke({
    "applicant_data": json.dumps(test_applicant, indent=2)
})
wall_clock_time = time.time() - start_time

print("\n" + "="*80)
print("PARALLEL VERIFICATION COMPLETE")
print("="*80)
print(f"Wall Clock Time: {wall_clock_time:.2f}s")
print(f"\nNote: If run sequentially, this would take ~3x longer!")
print("="*80)

# Display results
print("\n" + "="*80)
print("VERIFICATION RESULTS SUMMARY")
print("="*80)

for check_name, result in verification_results.items():
    print(f"\n{check_name.replace('_', ' ').title()}:")
    for key, value in result.items():
        print(f"  {key}: {value}")

print("="*80)

### Challenge 2.2: Hybrid Sequential + Parallel Chain (15 minutes)

**Objective:** Combine sequential and parallel patterns in one chain

```
Step 1: Extract data (sequential)
    ↓
Step 2: Fork into parallel checks → Join
    ↓
Step 3: Synthesize results (sequential)
```

In [None]:
# Create the synthesis chain
synthesis_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a senior credit officer making final loan decisions."),
    ("user", """Based on the following loan application data and verification results, 
provide a final credit decision.

Applicant Data:
{applicant_data}

Verification Results:
- Credit Bureau: {credit_bureau}
- Employment: {employment}
- Identity: {identity}

Provide final decision in this format:

DECISION: [APPROVE/DENY/MANUAL_REVIEW]

CONFIDENCE: [HIGH/MEDIUM/LOW]

KEY FACTORS:
- [List 3-5 key factors influencing this decision]

REASONING:
[2-3 sentences explaining the decision]

CONDITIONS (if applicable):
[Any conditions for approval, or N/A]""")
])

synthesis_chain = synthesis_prompt | llm_gpt4 | StrOutputParser()

# Build hybrid chain: Sequential → Parallel → Sequential
hybrid_loan_processing_chain = (
    # Step 1: Extract applicant data (sequential)
    {"application": RunnablePassthrough()}
    | extraction_prompt
    | llm_gpt35
    | JsonOutputParser()
    | RunnableLambda(lambda x: {"extracted_data": x})
    # Step 2: Parallel verifications
    | RunnablePassthrough.assign(
        verifications=RunnableLambda(
            lambda x: parallel_verification_chain.invoke({
                "applicant_data": json.dumps(x["extracted_data"])
            })
        )
    )
    # Step 3: Synthesize final decision (sequential)
    | RunnablePassthrough.assign(
        final_decision=lambda x: synthesis_chain.invoke({
            "applicant_data": json.dumps(x["extracted_data"], indent=2),
            "credit_bureau": json.dumps(x["verifications"]["credit_bureau"], indent=2),
            "employment": json.dumps(x["verifications"]["employment"], indent=2),
            "identity": json.dumps(x["verifications"]["identity"], indent=2)
        })
    )
)

# Execute hybrid workflow
print("\n" + "="*80)
print("HYBRID WORKFLOW: Sequential + Parallel + Sequential")
print("="*80)

step_times = []

print("\n[Step 1/3] Extracting applicant data (sequential)...")
step1_start = time.time()

overall_start = time.time()
hybrid_result = hybrid_loan_processing_chain.invoke(SAMPLE_LOAN_APPLICATION)
total_time = time.time() - overall_start

print("\n" + "="*80)
print("HYBRID WORKFLOW COMPLETE")
print("="*80)
print(f"Total Wall Clock Time: {total_time:.2f}s")
print(f"\nWorkflow Pattern: Sequential → Parallel (3 tasks) → Sequential")
print(f"Advantage: Parallel processing where possible, maintaining logical flow")
print("="*80)

# Display final decision
print("\n" + "="*80)
print("FINAL CREDIT DECISION")
print("="*80)
print(hybrid_result["final_decision"])
print("="*80)

---

## PART 3: CONDITIONAL WORKFLOWS WITH RUNNABLEBRANCH (Lab 3.3)

**Duration:** 30 minutes  
**Objective:** Build dynamic workflows that route based on data using RunnableBranch

### Theory: RunnableBranch Pattern

**Concept:** Route inputs to different chains based on conditions

```python
branch = RunnableBranch(
    (condition_1, chain_1),
    (condition_2, chain_2),
    default_chain  # fallback
)
```

**When to use:**
- Different inputs need different handling
- Want to optimize processing
- Clear decision criteria exist

### Challenge 3.1: Risk-Based Routing with RunnableBranch (20 minutes)

**Scenario:** Route loans to different approval workflows based on amount:
- **Small loans** (<$100K): Fast-track (2 steps)
- **Medium loans** ($100K-$500K): Standard (4 steps)
- **Large loans** (>$500K): Enhanced due diligence (6 steps)

In [None]:
# First, extract loan amount for routing decision
loan_amount_extraction_prompt = ChatPromptTemplate.from_messages([
    ("system", "You extract loan amounts."),
    ("user", """Extract only the loan amount from this application.
Return as JSON: {{"loan_amount": <number>}}

Application:
{application}""")
])

loan_amount_chain = loan_amount_extraction_prompt | llm_gpt35 | JsonOutputParser()

# Define threshold constants
SMALL_LOAN_THRESHOLD = 100000
LARGE_LOAN_THRESHOLD = 500000

In [None]:
# Small loan fast-track chain
small_loan_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a fast-track credit assessor for small loans."),
    ("user", """Perform quick credit assessment for small loan:

Application:
{application}

Provide quick decision based on:
- Credit score (must be >650)
- Income stability
- Basic debt check

Return as text with:
DECISION: [APPROVE/DENY/NEEDS_REVIEW]
REASON: [brief explanation]

Keep it concise - this is fast-track processing.""")
])

small_loan_chain = small_loan_prompt | llm_gpt35 | StrOutputParser()

# Medium loan standard chain (reuse our previous chains)
medium_loan_chain = (
    {"application": RunnablePassthrough()}
    | extraction_prompt
    | llm_gpt35
    | JsonOutputParser()
    | RunnableLambda(lambda x: {"extracted_data": x})
    | RunnablePassthrough.assign(
        ratios=lambda x: ratio_chain.invoke({"extracted_data": json.dumps(x["extracted_data"])})
    )
    | RunnablePassthrough.assign(
        summary=lambda x: summary_chain.invoke({
            "extracted_data": json.dumps(x["extracted_data"]),
            "ratios": json.dumps(x["ratios"])
        })
    )
    | RunnableLambda(lambda x: x["summary"])
)

# Large loan enhanced due diligence chain
large_loan_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a senior financial analyst for large commercial loans."),
    ("user", """Perform comprehensive enhanced due diligence for large loan (>$500,000):

Application:
{application}

Provide comprehensive analysis including:
1. Complete financial analysis (DTI, LTV, DSCR, cash flow)
2. Collateral assessment
3. Market risk factors
4. Stress testing scenarios (best/base/stress cases)
5. Default probability estimate
6. Senior review requirements
7. Credit committee decision framework

This requires SENIOR REVIEW + COMMITTEE APPROVAL.

Be thorough - this is a large loan requiring enhanced due diligence.""")
])

large_loan_chain = large_loan_prompt | llm_gpt4 | StrOutputParser()

print("✓ All workflow-specific chains created")

In [None]:
# Create routing conditions
def is_small_loan(data: Dict) -> bool:
    """Check if loan amount is less than $100K"""
    loan_amount = data.get("loan_info", {}).get("loan_amount", 0)
    return loan_amount < SMALL_LOAN_THRESHOLD

def is_large_loan(data: Dict) -> bool:
    """Check if loan amount is greater than $500K"""
    loan_amount = data.get("loan_info", {}).get("loan_amount", 0)
    return loan_amount >= LARGE_LOAN_THRESHOLD

# Extract application for routing
def extract_application(data: Dict) -> Dict:
    """Extract the application text for downstream chains"""
    return {"application": data.get("application", "")}

# Create the routing branch
loan_routing_branch = RunnableBranch(
    (
        is_small_loan,
        RunnableLambda(extract_application) | small_loan_chain
    ),
    (
        is_large_loan,
        RunnableLambda(extract_application) | large_loan_chain
    ),
    # Default: medium loan
    RunnableLambda(extract_application) | medium_loan_chain
)

# Complete conditional routing chain
conditional_routing_chain = (
    {"application": RunnablePassthrough()}
    | RunnablePassthrough.assign(
        loan_info=lambda x: loan_amount_chain.invoke({"application": x["application"]})
    )
    | RunnablePassthrough.assign(
        result=loan_routing_branch
    )
)

print("✓ Conditional routing chain with RunnableBranch created")

In [None]:
# Test the conditional router with different loan sizes
print("\n" + "="*80)
print("TESTING CONDITIONAL ROUTING WITH RUNNABLEBRANCH")
print("="*80)

test_cases = [
    ("Small Loan", SMALL_LOAN_APP),
    ("Medium Loan", SAMPLE_LOAN_APPLICATION),
    ("Large Loan", LARGE_LOAN_APP)
]

results = []

for test_name, application in test_cases:
    print(f"\n### TEST: {test_name.upper()} ###")
    start_time = time.time()
    
    result = conditional_routing_chain.invoke(application)
    
    processing_time = time.time() - start_time
    loan_amount = result["loan_info"]["loan_amount"]
    
    # Determine category
    if loan_amount < SMALL_LOAN_THRESHOLD:
        category = "SMALL (Fast-Track)"
    elif loan_amount < LARGE_LOAN_THRESHOLD:
        category = "MEDIUM (Standard)"
    else:
        category = "LARGE (Enhanced DD)"
    
    print(f"Loan Amount: ${loan_amount:,}")
    print(f"Category: {category}")
    print(f"Processing Time: {processing_time:.2f}s")
    print(f"\nResult Preview:")
    print(str(result["result"])[:300] + "...")
    
    results.append({
        "name": test_name,
        "amount": loan_amount,
        "category": category,
        "time": processing_time
    })

# Summary comparison
print("\n" + "="*80)
print("ROUTING EFFICIENCY COMPARISON")
print("="*80)
for r in results:
    print(f"{r['name']:12} | ${r['amount']:>12,} | {r['category']:20} | {r['time']:.2f}s")
print("\nConditional routing optimizes processing based on loan size and risk!")
print("="*80)

---

## PART 4: PRODUCTION ORCHESTRATION WITH LANGCHAIN (Lab 3.4)

**Duration:** 30 minutes  
**Objective:** Add production-grade features using LangChain's built-in capabilities

### Production Features in LangChain:

- ✓ **Automatic Retries** - Built into ChatOpenAI with `max_retries`
- ✓ **Error Handling** - Try/catch at chain level
- ✓ **Callbacks** - Comprehensive logging and monitoring
- ✓ **Streaming** - Real-time output
- ✓ **Fallbacks** - Use `with_fallbacks()` method

### Challenge 4.1: Production-Ready Chain with Error Handling (15 minutes)

In [None]:
from langchain_core.runnables import RunnableConfig
from typing import Iterator

class ProductionCallbackHandler(BaseCallbackHandler):
    """Production-grade callback handler with comprehensive logging"""
    
    def __init__(self, workflow_name: str):
        self.workflow_name = workflow_name
        self.logs = []
        self.errors = []
        self.total_tokens = 0
        self.step_count = 0
        self.start_time = time.time()
    
    def log(self, message: str, level: str = "INFO"):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
        log_entry = f"[{timestamp}] [{level}] {message}"
        self.logs.append(log_entry)
        print(log_entry)
    
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs) -> None:
        self.log(f"Chain started: {self.workflow_name}", "INFO")
    
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> None:
        duration = time.time() - self.start_time
        self.log(f"Chain completed in {duration:.2f}s", "SUCCESS")
    
    def on_chain_error(self, error: Exception, **kwargs) -> None:
        self.errors.append(str(error))
        self.log(f"Chain error: {str(error)}", "ERROR")
    
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs) -> None:
        self.step_count += 1
        self.log(f"LLM call {self.step_count} started", "INFO")
    
    def on_llm_end(self, response: LLMResult, **kwargs) -> None:
        if response.llm_output and "token_usage" in response.llm_output:
            tokens = response.llm_output["token_usage"].get("total_tokens", 0)
            self.total_tokens += tokens
            self.log(f"LLM call {self.step_count} completed ({tokens} tokens)", "SUCCESS")
    
    def on_llm_error(self, error: Exception, **kwargs) -> None:
        self.errors.append(f"LLM error at step {self.step_count}: {str(error)}")
        self.log(f"LLM error: {str(error)}", "ERROR")
    
    def on_retry(self, retry_state, **kwargs) -> None:
        self.log(f"Retrying after error...", "WARNING")
    
    def get_metrics(self) -> Dict[str, Any]:
        return {
            "workflow_name": self.workflow_name,
            "total_duration": time.time() - self.start_time,
            "total_steps": self.step_count,
            "total_tokens": self.total_tokens,
            "error_count": len(self.errors),
            "logs": self.logs,
            "errors": self.errors
        }

print("✓ Production callback handler created")

In [None]:
# Create a production chain with retry and fallback
# Primary model with retries
primary_llm = ChatOpenAI(
    model=GPT4_MODEL,
    temperature=0,
    max_retries=3,
    request_timeout=30
)

# Fallback to faster model if primary fails
fallback_llm = ChatOpenAI(
    model=GPT35_MODEL,
    temperature=0,
    max_retries=2
)

# Create chain with fallback
production_llm = primary_llm.with_fallbacks([fallback_llm])

# Production extraction chain
production_extraction_chain = extraction_prompt | production_llm | JsonOutputParser()

# Production complete workflow
production_workflow_chain = (
    {"application": RunnablePassthrough()}
    | extraction_prompt
    | production_llm
    | JsonOutputParser()
    | RunnableLambda(lambda x: {"extraction": x})
    | RunnablePassthrough.assign(
        ratios=lambda x: (ratio_prompt | production_llm | JsonOutputParser()).invoke({
            "extracted_data": json.dumps(x["extraction"])
        })
    )
    | RunnablePassthrough.assign(
        summary=lambda x: (summary_prompt | production_llm | StrOutputParser()).invoke({
            "extracted_data": json.dumps(x["extraction"]),
            "ratios": json.dumps(x["ratios"])
        })
    )
)

print("✓ Production workflow with retry and fallback created")

In [None]:
# Execute production workflow with monitoring
print("\n" + "="*80)
print("PRODUCTION WORKFLOW EXECUTION")
print("="*80 + "\n")

callback_handler = ProductionCallbackHandler("Loan Processing Pipeline")

try:
    result = production_workflow_chain.invoke(
        SAMPLE_LOAN_APPLICATION,
        config={"callbacks": [callback_handler]}
    )
    
    # Display metrics
    metrics = callback_handler.get_metrics()
    
    print("\n" + "="*80)
    print("PRODUCTION METRICS DASHBOARD")
    print("="*80)
    print(f"Workflow: {metrics['workflow_name']}")
    print(f"Status: {'✓ SUCCESS' if metrics['error_count'] == 0 else '✗ FAILED'}")
    print(f"Duration: {metrics['total_duration']:.2f}s")
    print(f"Total Steps: {metrics['total_steps']}")
    print(f"Total Tokens: {metrics['total_tokens']}")
    print(f"Errors: {metrics['error_count']}")
    print("\nExecution Log:")
    for log in metrics['logs'][-10:]:  # Show last 10 log entries
        print(f"  {log}")
    print("="*80)
    
    if metrics['error_count'] == 0:
        print("\n" + "="*80)
        print("FINAL SUMMARY")
        print("="*80)
        print(result["summary"])
        print("="*80)

except Exception as e:
    print(f"\n✗ Workflow failed: {str(e)}")
    metrics = callback_handler.get_metrics()
    print(f"\nErrors encountered:")
    for error in metrics['errors']:
        print(f"  - {error}")

### Challenge 4.2: Streaming Support (15 minutes)

**Objective:** Add real-time streaming to see results as they're generated

In [None]:
# Create a streaming chain for the final summary
streaming_summary_chain = summary_prompt | llm_gpt4 | StrOutputParser()

print("\n" + "="*80)
print("STREAMING EXECUTION DEMO")
print("="*80)
print("\nGenerating executive summary with real-time streaming...\n")
print("-" * 80)

# First, get the required data
extraction_data = extraction_chain.invoke({"application": SAMPLE_LOAN_APPLICATION})
ratio_data = ratio_chain.invoke({"extracted_data": json.dumps(extraction_data)})

# Stream the summary generation
for chunk in streaming_summary_chain.stream({
    "extracted_data": json.dumps(extraction_data),
    "ratios": json.dumps(ratio_data)
}):
    print(chunk, end="", flush=True)

print("\n" + "-" * 80)
print("\n✓ Streaming complete!")
print("\nNote: In a production web app, each chunk would be sent to the client immediately.")
print("="*80)

---

## CAPSTONE: ENTERPRISE LOAN PROCESSOR WITH LANGCHAIN

**Objective:** Complete production system combining all LangChain patterns

In [None]:
class EnterpriseLoanProcessor:
    """
    Complete production loan processing system using LangChain:
    - Conditional routing (RunnableBranch)
    - Parallel processing (RunnableParallel)
    - Sequential workflows (LCEL)
    - Error handling (retry + fallback)
    - Monitoring (callbacks)
    """
    
    def __init__(self):
        # Initialize the conditional routing chain we built earlier
        self.routing_chain = conditional_routing_chain
        
        # Statistics
        self.stats = {
            'total_processed': 0,
            'successful': 0,
            'failed': 0,
            'total_time': 0.0,
            'total_tokens': 0
        }
    
    def process_application(self, application: str, app_id: str) -> Dict[str, Any]:
        """
        Process single loan application through complete LangChain system
        """
        print(f"\n{'='*80}")
        print(f"ENTERPRISE PROCESSOR - Application {app_id}")
        print(f"{'='*80}")
        
        callback = ProductionCallbackHandler(f"Application {app_id}")
        
        try:
            start_time = time.time()
            
            result = self.routing_chain.invoke(
                application,
                config={"callbacks": [callback]}
            )
            
            processing_time = time.time() - start_time
            metrics = callback.get_metrics()
            
            # Update statistics
            self.stats['total_processed'] += 1
            self.stats['successful'] += 1
            self.stats['total_time'] += processing_time
            self.stats['total_tokens'] += metrics['total_tokens']
            
            loan_amount = result['loan_info']['loan_amount']
            if loan_amount < SMALL_LOAN_THRESHOLD:
                workflow = "Fast-Track"
            elif loan_amount < LARGE_LOAN_THRESHOLD:
                workflow = "Standard"
            else:
                workflow = "Enhanced DD"
            
            print(f"\n✓ Processing complete")
            print(f"  Loan Amount: ${loan_amount:,}")
            print(f"  Workflow: {workflow}")
            print(f"  Time: {processing_time:.2f}s")
            print(f"  Tokens: {metrics['total_tokens']}")
            
            return {
                'app_id': app_id,
                'success': True,
                'loan_amount': loan_amount,
                'workflow': workflow,
                'processing_time': processing_time,
                'result': result['result'],
                'metrics': metrics
            }
            
        except Exception as e:
            self.stats['total_processed'] += 1
            self.stats['failed'] += 1
            
            print(f"\n✗ Processing failed: {str(e)}")
            
            return {
                'app_id': app_id,
                'success': False,
                'error': str(e)
            }
    
    def get_statistics(self) -> Dict[str, Any]:
        """Return processing statistics"""
        avg_time = (
            self.stats['total_time'] / self.stats['total_processed']
            if self.stats['total_processed'] > 0 else 0
        )
        
        success_rate = (
            self.stats['successful'] / self.stats['total_processed'] * 100
            if self.stats['total_processed'] > 0 else 0
        )
        
        return {
            'total_processed': self.stats['total_processed'],
            'successful': self.stats['successful'],
            'failed': self.stats['failed'],
            'success_rate': success_rate,
            'average_processing_time': avg_time,
            'total_tokens': self.stats['total_tokens']
        }

print("✓ Enterprise Loan Processor class created")

In [None]:
# Initialize and test enterprise processor
processor = EnterpriseLoanProcessor()

# Process multiple applications
applications = [
    (SMALL_LOAN_APP, "LA-2024-001"),
    (SAMPLE_LOAN_APPLICATION, "LA-2024-002"),
    (LARGE_LOAN_APP, "LA-2024-003")
]

print("\n" + "="*80)
print("ENTERPRISE BATCH PROCESSING")
print("="*80)

results = []
for app, app_id in applications:
    result = processor.process_application(app, app_id)
    results.append(result)
    time.sleep(0.5)  # Brief pause between applications

# Display final statistics
print("\n" + "="*80)
print("ENTERPRISE PROCESSING STATISTICS")
print("="*80)
stats = processor.get_statistics()
print(f"Total Processed: {stats['total_processed']}")
print(f"Successful: {stats['successful']}")
print(f"Failed: {stats['failed']}")
print(f"Success Rate: {stats['success_rate']:.1f}%")
print(f"Average Processing Time: {stats['average_processing_time']:.2f}s")
print(f"Total Tokens Used: {stats['total_tokens']}")
print("="*80)