# Mastering LangChain Runnables: Modular AI Workflow Architecture


## Overview
This notebook provides an in-depth exploration of the Runnable interface in LangChain, demonstrating how to create flexible, composable, and efficient AI workflows. By showcasing various composition techniques and execution methods, we highlight the power of standardized component interactions.

## Key Features:
- Standardized AI component interface
- Flexible workflow composition
- Efficient processing strategies
- Modular AI pipeline design
- Advanced execution methods

## Runnable Composition Strategies:
- Sequential Runnable Chaining
- Parallel Runnable Execution
- Dynamic Workflow Construction
- Async and Sync Processing
- Stream-based Execution

## Technologies Used:
- LangChain
- Ollama LLM
- LCEL (LangChain Expression Language)
- Prompt Templates
- Runnable Interfaces
- Batch and Stream Processing

## Use Cases:
- Complex AI workflow design
- Modular AI pipeline construction
- Efficient data processing
- Flexible component integration
- Real-time AI interactions

## Comparative Runnable Execution Methods

| **Execution Method** | **Characteristics**    | **Use Case**                          | **Performance**     |
|---------------------|----------------------|---------------------------------------|---------------------|
| Invoke              | Single input processing | Individual task execution            | Low latency         |
| Batch               | Multiple input processing | Large dataset analysis               | High throughput     |
| Stream              | Incremental output generation | Real-time, progressive tasks         | Low memory overhead |

## Learning Objectives:
- Understand Runnable interface fundamentals
- Implement complex AI workflows
- Master composition techniques
- Explore different execution strategies
- Create modular and reusable AI components

## Key Challenges Addressed:
- Workflow complexity management
- Component interoperability
- Efficient AI task processing
- Standardized AI interaction patterns
- Scalable AI pipeline design

## Advanced Composition Techniques:
- RunnableSequence
- RunnableParallel
- Dynamic Runnable Creation
- Async Execution Support
- Streaming Capabilities



### Composition Primitives  
LangChain provides two main composition primitives for creating Runnables:  

#### 1. RunnableSequence  
- Allows chaining multiple Runnables sequentially  
- The output of one Runnable becomes the input for the next  
- Can be created using the pipe operator (`|`) or explicitly with `RunnableSequence`  

**Example:**  
```python
chain = runnable1 | runnable2  # Sequential execution
# Output of runnable1 feeds into runnable2
```

#### 2. RunnableParallel  
- Runs multiple Runnables concurrently with the same input  
- Returns a dictionary with results from each Runnable  
- Executes Runnables in parallel, significantly reducing processing time  

**Example:**  
```python
parallel_chain = RunnableParallel({
    "key1": runnable1,
    "key2": runnable2
})
# Both runnables run concurrently with the same input
```

### Additional Composition Features  
- Automatic type coercion (functions become `RunnableLambda`, dictionaries become `RunnableParallel`)  
- Support for async and sync execution  
- Optimized parallel processing using thread pools  

## Reference
For more detailed information on Runnable interfaces:
- LangChain Documentation
- LCEL Composition Patterns



In [7]:
#from dotenv import load_dotenv

#load_dotenv('../env')

In [8]:
from langchain_ollama import ChatOllama

base_url = "http://localhost:11434"
model = 'llama3.2:1b'
llm = ChatOllama(
    base_url=base_url,
    model=model,
    temperature=0.5,
    num_predict=512
)

# Runnable Interface
Runnable is a standardized interface in LangChain that enables the creation of modular, composable, and reusable AI components. It allows chaining multiple operations, where the output of one step becomes the input for the next, facilitating streamlined workflows.

A chain is a sequence of Runnables connected together, enabling complex workflows by passing data through multiple processing steps. Chains can be constructed using primitives like `RunnableSequence` for sequential execution or `RunnableParallel` for concurrent execution.

In [9]:
from langchain.prompts import PromptTemplate
prompt_template = PromptTemplate(
    input_variables=["topic", "number"],
    template="Please explain {topic} using exactly {number} detailed and distinct use cases."
)

chain = (
    prompt_template 
    | llm
)
type(chain)


langchain_core.runnables.base.RunnableSequence

In [10]:
chain

PromptTemplate(input_variables=['number', 'topic'], input_types={}, partial_variables={}, template='Please explain {topic} using exactly {number} detailed and distinct use cases.')
| ChatOllama(model='llama3.2:1b', num_predict=512, temperature=0.5, base_url='http://localhost:11434')

In [11]:
# Invoke the chain with input values
response = chain.invoke({
    "topic": "langchain", 
    "number": 2
})

# Print the response content
print(response.content)

LangChain is a blockchain-based knowledge graph database that enables the creation of scalable, secure, and decentralized knowledge graphs. It allows users to store, query, and share knowledge in a structured and organized manner, making it an attractive solution for various applications.

Here are two detailed and distinct use cases for LangChain:

**Use Case 1: Knowledge Graph-based Content Recommendation**

In this scenario, LangChain is used as a knowledge graph database to build a content recommendation system. The system consists of a large corpus of text data, which includes information about entities (people, organizations, locations), relationships between them, and concepts related to those entities.

The LangChain database is populated with this text data, which serves as the foundation for creating a knowledge graph. The graph represents various entities, their relationships, and associated concepts, allowing users to query and retrieve relevant content based on specific cr


### Additional Examples of Runnable Interface Methods

#### 1. Batch Processing
The `batch_transaction_analysis` function demonstrates how to process multiple transactions simultaneously using the `batch` method of the `transaction_chain`. This is particularly useful for analyzing large datasets efficiently.

#### 2. Single Invoke
The `single_transaction_invoke` function showcases the `invoke` method, which processes a single transaction input. This is ideal for scenarios where individual transaction analysis is required.

#### 3. Streaming Transaction Processing
The `stream_transaction_processing` function illustrates the `stream` method, which processes transactions incrementally and outputs results in real-time. This is beneficial for handling high-value or time-sensitive transactions.


## Create a prompt template for transaction analysis

In [14]:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template(
    "Analyze the following payment transaction and provide insights: {transaction}"
)

## Define a function to process transaction details

In [15]:
def process_transaction_details(transaction):
    """
    Simulate additional transaction processing logic
    In a real-world scenario, this might involve:
    - Fraud detection checks
    - Compliance verification
    - Additional metadata enrichment
    """
    return f"Processed transaction: {transaction}"

## Construct the processing chain
The processing chain is constructed by combining multiple Runnables sequentially. The `transaction_chain` is built using the following components:

1. **Prompt Template (`prompt`)**: This defines the structure of the input prompt for analyzing transactions. It ensures that the input transaction details are formatted appropriately for the LLM.

2. **LLM (`llm`)**: The language model processes the formatted prompt and generates insights or analysis based on the transaction details.

3. **Processing Function (`process_transaction_details`)**: This function applies additional logic to the LLM's output, such as simulating fraud detection, compliance checks, or metadata enrichment.

The chain is created using the pipe operator (`|`), which sequentially connects these components. The output of one component becomes the input for the next, enabling streamlined and modular processing of transaction data.


In [16]:
transaction_chain = (
    prompt |  # Apply the prompt template
    llm |  # Generate analysis using LLM
    process_transaction_details  # Additional processing
)

In [17]:
transaction_chain

ChatPromptTemplate(input_variables=['transaction'], input_types={}, partial_variables={}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['transaction'], input_types={}, partial_variables={}, template='Analyze the following payment transaction and provide insights: {transaction}'), additional_kwargs={})])
| ChatOllama(model='llama3.2:1b', num_predict=512, temperature=0.5, base_url='http://localhost:11434')
| RunnableLambda(process_transaction_details)

## Demonstration of different Runnable Interface methods

### 1. Batch Processing
The `batch_transaction_analysis` function demonstrates how to process multiple transactions simultaneously using the `batch` method of the `transaction_chain`. This method is particularly useful for analyzing large datasets efficiently. The function takes a list of transactions, processes them in a batch, and prints the analysis results for each transaction.


In [18]:
def batch_transaction_analysis():
    print("--- Batch Processing Transactions ---")
    transactions = [
        "Credit card payment of $500 from customer ABC",
        "International wire transfer of $10,000",
        "Recurring subscription payment of $49.99"
    ]
    
    # Batch process multiple transactions
    batch_results = transaction_chain.batch(transactions)
    
    for i, result in enumerate(batch_results, 1):
        print(f"Transaction {i} Analysis: {result}")

### 2. Single Invoke
The `single_transaction_invoke` function demonstrates the `invoke` method of the `transaction_chain`. This method processes a single transaction input and generates an analysis result. It is particularly useful for scenarios where individual transaction analysis is required. The function takes a single transaction as input, invokes the chain, and prints the resulting analysis.


In [19]:
def single_transaction_invoke():
    print("\n--- Single Transaction Invoke ---")
    transaction = "Refund processing for order #12345"
    
    # Invoke the chain for a single transaction
    result = transaction_chain.invoke(transaction)
    print("Transaction Analysis:", result)

### 3. Streaming Transaction Processing
The `stream_transaction_processing` function demonstrates the `stream` method of the `transaction_chain`. This method processes a transaction incrementally and outputs results in real-time. It is particularly useful for handling high-value or time-sensitive transactions. The function takes a single transaction as input, streams the analysis results chunk by chunk, and prints them as they are generated.

In [20]:
def stream_transaction_processing():
    print("\n--- Streaming Transaction Processing ---")
    transaction = "High-value international payment of $250,000"
    
    print("Streaming analysis for transaction:")
    for chunk in transaction_chain.stream(transaction):
        print(chunk, end="", flush=True)
    print()  # New line after streaming

In [21]:

if __name__ == "__main__":
    print("=== Batch Transaction Analysis ===")
    batch_transaction_analysis()

    print("\n=== Single Transaction Invoke ===")
    single_transaction_invoke()

    print("\n=== Streaming Transaction Processing ===")
    stream_transaction_processing()

=== Batch Transaction Analysis ===
--- Batch Processing Transactions ---
Transaction 1 Analysis: Processed transaction: content='I\'ll analyze the payment transaction for you.\n\n**Transaction Details:**\n\n* Payment Method: Credit Card\n* Payment Amount: $500\n* Customer Name: ABC (assuming this is the customer\'s name, but it could be a pseudonym or alias)\n* Date: [Insert Date]\n* Time: [Insert Time]\n\n**Insights:**\n\n1. **Payment Success:** The transaction was successful and the payment amount of $500 was processed successfully.\n2. **Customer Information:** The customer\'s information is publicly available (ABC), but it may not be accurate or up-to-date. It\'s essential to verify this information through additional means, such as contacting the customer directly or using credit card verification services.\n3. **Payment Method:** Credit cards are a common payment method, and they offer convenience and flexibility. However, they also come with associated risks, such as potential f

# Runnable Sequences

**Runnable Sequences**  
- A `RunnableSequence` is a pipeline that chains multiple operations, where the output of one step becomes the input for the next.  
- It is used to create workflows for tasks like text analysis, generation, or transformation.

**Activities in This Notebook**  
- **Model Initialization**: `ChatOllama` model is initialized with parameters like base URL, model name, and temperature.  
- **Prompt Template Creation**: A `ChatPromptTemplate` is defined for analyzing payment transaction details.  
- **Runnable Sequence Definition**: A sequence is created to format the prompt, process it with the model, and parse the output.  
- **Transaction Analysis**: A function is implemented to analyze transactions and print risk assessments and fraud indicators.


In [22]:
# from dotenv import load_dotenv

# load_dotenv('../env')

In [23]:
from langchain_ollama import ChatOllama

base_url = "http://localhost:11434"
model = 'llama3.2:1b'
# Initialize the Llama model 
llm = ChatOllama(
    base_url=base_url,
    model = model,
    temperature = 0.8,
    num_predict = 256
)

In [24]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableSequence



# Create a prompt template for financial transaction analysis
transaction_analysis_prompt = ChatPromptTemplate.from_template(
    """Analyze the following payment transaction details:
    - Transaction Amount: {amount}
    - Merchant: {merchant}
    - Transaction Type: {transaction_type}

    Provide a detailed risk assessment and potential fraud indicators."""
)

# Create a RunnableSequence to process the transaction
transaction_analysis_chain = (
    transaction_analysis_prompt  # First, format the prompt
    | llm  # Then, pass to the Llama model for analysis
    | StrOutputParser()  # Convert the model output to a string
)

# Example usage
def analyze_transaction(amount, merchant, transaction_type):
    result = transaction_analysis_chain.invoke({
        "amount": amount,
        "merchant": merchant,
        "transaction_type": transaction_type
    })
    return result

# Demonstrate the chain
transaction_result = analyze_transaction(
    amount="$1,250.75", 
    merchant="TechCorp Online Store", 
    transaction_type="E-commerce Purchase"
)

print("Transaction Analysis:")
print(transaction_result)


Transaction Analysis:
**Payment Transaction Details Analysis**

### Transaction Overview

* **Transaction Amount:** $1,250.75 (indicative of a significant purchase)
* **Merchant:** TechCorp Online Store (established reputation for legitimate business practices)

### Risk Assessment

Based on the provided transaction details, I have identified potential risk indicators that may indicate fraudulent activity:

**Risk Factors:**

1. **High Transaction Amount**: The $1,250.75 transaction amount is significantly higher than average e-commerce purchases.
2. **Merchant Reputation**: TechCorp Online Store has an established reputation for legitimate business practices. However, without further information on the merchant's financials or compliance history, it cannot be confirmed whether this is a genuine business operation.
3. **Lack of Addressed Information**: The transaction does not include any information about the customer's address, phone number, or other sensitive details that could prov

# RunnableParallel


`RunnableParallel` is a utility in LangChain that allows us to execute multiple tasks or chains in parallel. It is particularly useful when you need to perform independent operations simultaneously, such as validation, fraud detection, and compliance checks, as seen in this notebook.

### Tasks in This Notebook

1. **Validation**: A simple validation function will check the payment information to ensure the payment amount is valid.
2. **Fraud Detection**: Using a language model, the notebook will analyze payment transactions for potential fraud risks and provide a detailed assessment in JSON format.
3. **Compliance Check**: The notebook will perform a compliance check on payment transactions, listing any potential compliance concerns in a comma-separated format.
4. **Payment Processing Chain**: All the above tasks will be executed in parallel using `RunnableParallel`, and the results will be aggregated for further analysis.


In [25]:
# from dotenv import load_dotenv

# load_dotenv('../env')

In [28]:
from langchain_ollama import ChatOllama

base_url = "http://localhost:11434"
model = 'llama3.2:3b'
llm = ChatOllama(
    base_url=base_url,
    model = model,
    temperature = 0.8,
    num_predict = 256
)

In [29]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain.output_parsers import (
    CommaSeparatedListOutputParser, 
    StructuredOutputParser, 
    ResponseSchema
)

# Create output parsers
fraud_detection_parser = StructuredOutputParser.from_response_schemas([
    ResponseSchema(
        name="risk_level", 
        description="Fraud risk level (low/medium/high)"
    ),
    ResponseSchema(
        name="reasoning", 
        description="Brief explanation of fraud risk assessment"
    )
])

compliance_check_parser = CommaSeparatedListOutputParser()

# Modify prompts to include format instructions
fraud_detection_prompt = ChatPromptTemplate.from_template(
    "Analyze the payment transaction for fraud risk:\n"
    "Amount: ${amount}\n"
    "Merchant: {merchant}\n"
    "Card Type: {card_type}\n\n"
    "Provide a detailed fraud assessment in STRICT JSON format. Details need not be accurate. IMPORTANT: Ensure the output is a valid JSON object with EXACTLY these keys:\n"
    "{format_instructions}\n"
    "EXAMPLE FORMAT: {{\n"
    "  \"risk_level\": \"low\",\n"
    "  \"reasoning\": \"Detailed explanation here\"\n"
    "}}"
).partial(format_instructions=fraud_detection_parser.get_format_instructions())

compliance_check_prompt = ChatPromptTemplate.from_template(
    "Perform a compliance check on this payment transaction. "
    "Payment amount: ${amount}, Merchant: {merchant}, Country: {country}\n"
    "List any compliance concerns. Details need not be accurate. Respond in a comma-separated list:\n"
    "{format_instructions}"
).partial(format_instructions=compliance_check_parser.get_format_instructions())

# Validation function using a simple string output parser
def validate_payment(payment_info):
    if payment_info['amount'] <= 0:
        return "Validation Failed: Invalid payment amount"
    return "Validation Passed"

# Payment processing chain using RunnableParallel
payment_processing_chain = RunnableParallel({
    # Validation step
    "validation": RunnableLambda(validate_payment)
    ,
    
    # Fraud detection using LLM with output parser
     "fraud_detection": fraud_detection_prompt | llm | fraud_detection_parser
    ,

    # Compliance check using LLM with output parser
     "compliance_check": compliance_check_prompt | llm | compliance_check_parser
})

# Example usage
payment_info = {
    "amount": 5000,
    "merchant": "Global Tech Solutions",
    "card_type": "Corporate Credit Card",
    "country": "United States"
}

# Invoke the chain
result = payment_processing_chain.invoke(payment_info)
print("Payment Processing Results:")
for key, value in result.items():
    print(f"{key.replace('_', ' ').title()}: {value}")


Payment Processing Results:
Validation: Validation Passed
Fraud Detection: {'risk_level': 'high', 'reasoning': 'Transaction amount exceeds 3x average daily spend for Global Tech Solutions ($1500). Card type Corporate Credit Card is used by high-value purchasing accounts. Additionally, IP address is flagged as suspicious due to recent spikes in malicious activity.'}
Compliance Check: ['US Bank Account', 'Anti-Money Laundering (AML) reporting required', 'High Transaction Value exceeds $5', '000 threshold', 'Global Tech Solutions listed as High Risk Merchant in Country Profile.', 'Note: Please note that this is a hypothetical compliance check and actual compliance concerns may vary depending on the specific payment processor', 'merchant', 'and transaction details.']
