# Chapter 3 - Parallelization 
For this example, we will create a simple LangChain chain that takes a user query and simultaneously performs two independent "tasks" (simulated by simple chains or functions) before combining their results

In [1]:
import os
import asyncio
from langchain_openai import ChatOpenAI # Or import your preferred model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough


In [None]:
# --- Configuration ---
# Set your API key - replace with your actual environment variable or key management
# Ensure you have set the appropriate environment variable (e.g., OPENAI_API_KEY, GOOGLE_API_KEY)
# Example (replace with your actual method for managing API keys):
# os.environ["OPENAI_API_KEY"] = "YOUR_ACTUAL_OPENAI_API_KEY"
# Or for Google:
os.environ["GOOGLE_API_KEY"] = "YOUR_ACTUAL_GOOGLE_API_KEY"

In [2]:
## --- Initialize the language model ---
# Use the appropriate class and model name for your provider (e.g., ChatGoogleGenerativeAI(model="gemini-pro"))
# Setting temperature to control creativity (0.7 is a common balance)
try:
    # Example for OpenAI
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
    # Example for Google (uncomment and replace if using Google)
    # from langchain_google_genai import ChatGoogleGenerativeAI
    # llm = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.7)
    print(f"Language model initialized: {llm.model_name}")
except Exception as e:
    print(f"Error initializing language model: {e}")
    print("Please ensure your API key is set correctly and the model name is valid.")
    llm = None # Set llm to None if initialization fails

Language model initialized: gpt-4o-mini


In [3]:
# --- Define Independent Tasks (Simulated Workflows) ---
# These represent tasks that can be executed in parallel.
# In a real scenario, these might be calls to different APIs, tools, or separate chains.

# Task 1: Summarize the topic
summarize_prompt = ChatPromptTemplate.from_messages([
   ("system", "Summarize the following topic concisely:"),
   ("user", "{topic}")
])
summarize_chain = summarize_prompt | llm | StrOutputParser()

# Task 2: Generate related questions
questions_prompt = ChatPromptTemplate.from_messages([
   ("system", "Generate three interesting questions about the following topic:"),
   ("user", "{topic}")
])
questions_chain = questions_prompt | llm | StrOutputParser()

# Task 3: Identify key terms
terms_prompt = ChatPromptTemplate.from_messages([
   ("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
   ("user", "{topic}")
])
terms_chain = terms_prompt | llm | StrOutputParser()

In [4]:
# --- Combine Tasks for Parallel Execution using RunnableParallel ---
# RunnableParallel allows you to run multiple runnables in parallel.
# It takes a dictionary where keys are the output keys and values are the runnables to run.
# The input to RunnableParallel is passed to each runnable in the dictionary.
# The output is a dictionary containing the results of each runnable, keyed by the dictionary keys.

# The input to this parallel block will be the user's original query (topic).
# We use RunnablePassthrough() to pass the input directly to the parallel runnables.
parallel_tasks = RunnableParallel({
   "summary": summarize_chain,
   "questions": questions_chain,
   "key_terms": terms_chain
})


In [5]:
# --- Create a chain that combines the original input with parallel results ---
# This approach uses a dictionary that merges inputs and outputs correctly
def combine_inputs_and_parallel_results(inputs):
   # inputs here is just the topic string
   return {
       "topic": inputs,  # Keep the original topic
       # The rest will be filled by parallel_tasks
   }


In [6]:
# --- Define a Final Synthesis Step ---
# This step takes the results from the parallel tasks and combines them into a final answer.
# It requires the parallel tasks to complete before it can run.
synthesis_prompt = ChatPromptTemplate.from_messages([
   ("system", """Based on the following information about a topic:

   Summary: {summary}

   Related Questions: {questions}

   Key Terms: {key_terms}

   Synthesize a comprehensive answer that includes the summary, lists the related questions, and mentions the key terms."""),
   ("user", "Original topic: {topic}") # Include the original topic for context
])

In [7]:
# --- Build the Full Chain ---
# First, we run the parallel tasks
# Then, we combine the results with the original topic
# Finally, we use that combined data for synthesis
full_parallel_chain = (
   RunnableParallel({
       "topic": RunnablePassthrough(),  # Pass through the original topic
       "parallel_results": parallel_tasks  # Run all parallel tasks
   })
   .assign(  # Restructure the data for the synthesis prompt
       summary=lambda x: x["parallel_results"]["summary"],
       questions=lambda x: x["parallel_results"]["questions"],
       key_terms=lambda x: x["parallel_results"]["key_terms"]
   )
   | synthesis_prompt | llm | StrOutputParser()
)

In [8]:
# --- Run the Chain ---
async def run_parallel_example(topic: str):
   """Runs the parallel LangChain example with a given topic."""
   if not llm:
       print("LLM not initialized. Cannot run example.")
       return

   print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")
   # Invoke the chain asynchronously
   try:
       response = await full_parallel_chain.ainvoke(topic)
       print("\n--- Final Response ---")
       print(response)
   except Exception as e:
       print(f"\nAn error occurred during chain execution: {e}")



In [10]:
# Run the example with a test topic
# pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()

# Then modify the running code to use await
async def main():
    test_topic = "Artificial Intelligence"
    await run_parallel_example(test_topic)
    
    test_topic_2 = "Renewable Energy"
    await run_parallel_example(test_topic_2)

# Run the async function
await main()


--- Running Parallel LangChain Example for Topic: 'Artificial Intelligence' ---

--- Final Response ---
**Summary**: Artificial Intelligence (AI) refers to the simulation of human intelligence in machines that are programmed to think and learn. It encompasses various technologies, including machine learning, natural language processing, and robotics, and is used in applications ranging from virtual assistants to autonomous vehicles. AI aims to enhance efficiency, automate tasks, and provide insights from data, but it also raises ethical concerns regarding privacy, bias, and job displacement.

**Related Questions**:
1. How has the integration of artificial intelligence into various industries evolved since 2023, and what are the most significant advancements that have emerged in the last few years?
2. In what ways are ethical considerations surrounding artificial intelligence, including bias and accountability, being addressed by researchers and policymakers as we move further into the