## 📌 Running `Runnables` in Parallel in LangChain

LangChain allows executing multiple **`Runnable` components in parallel** using `RunnableParallel`. This is useful when performing **independent operations simultaneously** to improve efficiency.

---

## **🔹 How Does `RunnableParallel` Work?**
✅ Executes multiple `Runnable` components **at the same time**.  
✅ Each runnable receives the **same input** and returns **separate outputs**.  
✅ Returns a dictionary where keys match the provided runnable names.  


- https://python.langchain.com/docs/how_to/parallel/

- https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableParallel.html

---

## 📌 When to Use `RunnableParallel`?

| **Use Case** | **Why Parallel Execution?** |
|-------------|---------------------------|
| **Text processing (e.g., uppercase & word count)** | Runs independent operations at once |
| **LLM queries (summary & keyword extraction)** | Reduces API calls and improves efficiency |
| **API calls (e.g., stock prices & news summarization)** | Fetches and processes data simultaneously |
| **Multi-modal AI (e.g., text + image processing)** | Handles different data types in one workflow |

📌 **`RunnableParallel` speeds up workflows by running independent tasks in parallel!** 🚀😊


### **👀 Example 1: Running Simple Functions in Parallel**
#### **Scenario:** Duplicate the input text and count words at the same time.

In [None]:
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough, RunnableSequence

# Function 1: Convert text to uppercase
duplicate_runnable = RunnableLambda(lambda x: x["text"]*2)


# Function 2: Count words
word_count_runnable = RunnableLambda(lambda x: len(x["text"].split()))



# Create parallel execution
parallel_workflow = RunnableParallel(
                                    duplicate = duplicate_runnable, # these given names become the keys to the dictionary output
                                    word_count = word_count_runnable
                                    )


# Run the pipeline
output = parallel_workflow.invoke({"text": "Hello LangChain!"})
print(output)   # Output is a dictionary


{'duplicate': 'Hello LangChain!Hello LangChain!', 'word_count': 2}


### ☕️ Same Logic, Different Syntax

In [None]:
# give each step name a key in dictionary
parallel_workflow = RunnableParallel({
                                    'duplicate': duplicate_runnable, # these given names become the keys to the dictionary output
                                    'word_count': word_count_runnable
                                     })


# Run the pipeline
output = parallel_workflow.invoke({"text": "Hello LangChain!"})
print(output)   # Output is a dictionary


{'duplicate': 'Hello LangChain!Hello LangChain!', 'word_count': 2}


### **👀 Example 2: Running LLM Queries in Parallel**
**Scenario: Generate a text summary and extract keywords simultaneously.**

In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# Load LLM
llm = ChatOpenAI(model="gpt-3.5-turbo")

# Runnable 1: Summarize text
summary_prompt = ChatPromptTemplate.from_template("Summarize this: {text}")
summary_runnable = summary_prompt | llm



# Runnable 2: Extract keywords
keywords_prompt = ChatPromptTemplate.from_template("Extract keywords from: {text}")
keywords_runnable = keywords_prompt | llm



# Parallel execution
parallel_chain = RunnableParallel(
    summary = summary_runnable, # these given names can be used to extract different part of the final output
    keywords = keywords_runnable
)



# Run the pipeline
output = parallel_chain.invoke({"text": "LangChain makes AI development easy and modular."})
print(output)

{'summary': AIMessage(content='LangChain simplifies the development of AI by breaking it down into smaller, manageable modules, making the process easier for developers.', response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 21, 'total_tokens': 47, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-506c4332-c61c-432c-bd07-6285c3927a90-0'), 'keywords': AIMessage(content='LangChain, AI development, easy, modular', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 20, 'total_tokens': 30, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_t

In [None]:
print(output['summary'].content)

LangChain simplifies the development of AI by breaking it down into smaller, manageable modules, making the process easier for developers.


In [None]:
print(output['keywords'].content)

LangChain, AI development, easy, modular


### 🕵🏿 Compare `RunnableParallel` with `RunnableSequence`

In [None]:
sequential_chain = summary_runnable | keywords_runnable
output = sequential_chain.invoke({"text": "LangChain makes AI development easy and modular."})
print(output.content)

LangChain, AI development, manageable, interchangeable modules.


In [None]:
parallel_chain.get_graph().print_ascii()

            +---------------------------------+            
            | Parallel<summary,keywords>Input |            
            +---------------------------------+            
                   ***               ***                   
                ***                     ***                
              **                           **              
+--------------------+              +--------------------+ 
| ChatPromptTemplate |              | ChatPromptTemplate | 
+--------------------+              +--------------------+ 
           *                                   *           
           *                                   *           
           *                                   *           
    +------------+                      +------------+     
    | ChatOpenAI |                      | ChatOpenAI |     
    +------------+*                     +------------+     
                   ***               ***                   
                      ***         ***   

In [None]:
sequential_chain.get_graph().print_ascii()

    +-------------+    
    | PromptInput |    
    +-------------+    
           *           
           *           
           *           
+--------------------+ 
| ChatPromptTemplate | 
+--------------------+ 
           *           
           *           
           *           
    +------------+     
    | ChatOpenAI |     
    +------------+     
           *           
           *           
           *           
 +------------------+  
 | ChatOpenAIOutput |  
 +------------------+  
           *           
           *           
           *           
+--------------------+ 
| ChatPromptTemplate | 
+--------------------+ 
           *           
           *           
           *           
    +------------+     
    | ChatOpenAI |     
    +------------+     
           *           
           *           
           *           
 +------------------+  
 | ChatOpenAIOutput |  
 +------------------+  


### 🧵 Combining `RunnableSequence` and `RunnableParallel`

In [None]:
# read api_key from file
with open('../api_keys.txt', 'r') as file:
    api_key = file.read()


# Set loaded api_key as OPENAI_API_KEY environmental variable
import os
os.environ["OPENAI_API_KEY"] = api_key

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings



embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")

# Wrap OpenAIEmbeddings inside RunnableLambda
embedding_runnable = RunnableLambda(lambda x: embedding_function.embed_query(x["cleaned_text"]))


clean_text_runnable = RunnableLambda(lambda x: {"cleaned_text": x["topic"].strip().replace("\n", "")}) 


text_embeddings_chain = RunnableParallel(text = RunnablePassthrough(),
                                         embeddings = embedding_runnable)


# Combine sequential & parallel execution
workflow = clean_text_runnable | text_embeddings_chain

workflow.invoke({"topic": "   LangChain makes AI development easy and modular\n."})

{'text': {'cleaned_text': 'LangChain makes AI development easy and modular.'},
 'embeddings': [-0.016039820807190765,
  -0.02519965237678059,
  0.03828512604762319,
  0.022811890244669727,
  0.06745089021787166,
  -0.012309787185364799,
  -0.02362130271542358,
  0.013975824594704042,
  -0.026710551930230714,
  -0.007621950669125305,
  -0.007797322905460673,
  -0.03634253984310435,
  -0.011561082978223985,
  -2.1974234761266294e-05,
  -0.02557737819646572,
  0.0005644796330870312,
  -0.03545218798792031,
  0.007412852913175194,
  0.020410638836035565,
  0.024943339359031137,
  -0.005197091163930174,
  -0.00631677608683317,
  0.0196821694750023,
  0.01213441541469073,
  0.0022916440519628683,
  -0.023810163762620944,
  0.014313078859528882,
  0.027250160244066613,
  -0.04249405971967203,
  -0.017429309421350413,
  0.05191020430701131,
  -0.037313831082718574,
  0.022056440467944666,
  0.02673753234592251,
  -0.0036356030982272847,
  0.03601877485480281,
  0.006144776169628626,
  0.017213