In [1]:
from langchain_core.runnables import RunnablePassthrough,RunnableLambda, Runnable, RunnableParallel,RunnableConfig,RunnableGenerator
from langchain_core.messages import AIMessage
from dotenv import load_dotenv,find_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate,SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import JsonOutputParser,StrOutputParser
from operator import itemgetter
from langchain.embeddings import SentenceTransformerEmbeddings
import json
from langchain_community.vectorstores import FAISS,Chroma
from operator import itemgetter
import time
import grandalf
from typing import Iterator,List,AsyncIterator
from langchain_core.runnables import ConfigurableField
from langchain.runnables.hub import HubRunnable

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv(find_dotenv("D:\LLM Courses\Master Langchain Udemy\.env"))

True

In [3]:
llmGemini=ChatGoogleGenerativeAI(model="gemini-1.5-flash")
llmGPT=ChatOpenAI(model="gpt-3.5-turbo")

<H3>JSON Output Parser</H3>

In [31]:
prompt=ChatPromptTemplate.from_template(
    template=("""
        output a list of the countries {countries} and their populations in JSON format.
        Use a dict with an outer key of "countries" which contains a list of countries.
        Each country should have the key `name`,`population`and `GDP Per Capita` in the following format
        {{'countries':[{{'name':'Country A'}},{{'name':'Country B'}}]  }}
        GDP Per Capita should be in Dollars
        """
    )
)

In [32]:
chain=prompt|llmGemini|JsonOutputParser()

In [33]:
chain.invoke(input={"countries":["Israel","Iran","India"]})

{'countries': [{'name': 'Israel',
   'population': 9500000,
   'GDP Per Capita': 45000},
  {'name': 'Iran', 'population': 88000000, 'GDP Per Capita': 8000},
  {'name': 'India', 'population': 1400000000, 'GDP Per Capita': 2200}]}

In [35]:
for text in chain.stream(input={"countries":["Israel","Iran","India"]}):
    print(text,flush=True)

{'countries': [{'name': ''}]}
{'countries': [{'name': 'Israel', 'population': 9500000}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 8}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 85000000, 'GDP Per Capita': 8000}, {'name': ''}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 85000000, 'GDP Per Capita': 8000}, {'name': 'India', 'population': 1400000000, 'GDP Per Capita': 2200}]}


In [39]:
async for text in chain.astream(input={"countries":["Israel","Iran","India"]}):
    print(text,flush=True)

{'countries': [{'name': ''}]}
{'countries': [{'name': 'Israel', 'population': 9500000}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 85000}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 85000000, 'GDP Per Capita': 8000}, {'name': 'India'}]}
{'countries': [{'name': 'Israel', 'population': 9500000, 'GDP Per Capita': 45000}, {'name': 'Iran', 'population': 85000000, 'GDP Per Capita': 8000}, {'name': 'India', 'population': 1400000000, 'GDP Per Capita': 2200}]}


In [47]:
def extractCountryAndPopulationStream(inputStream):
    seen=set()
    streamingResults=[]  
    if not isinstance(inputStream,dict):
        return ""
    if "countries" not in inputStream:
        return ""
    countries=inputStream['countries']
    if not isinstance(countries,list):
        return ""
    for country in countries:
        name=country.get("name")
        population=country.get("population")
        gdpPerCapita=country.get("GDP Per Capita")
        if name is None or population is None or gdpPerCapita is None:
            continue
        if (name,population,gdpPerCapita) not in seen:
            seen.add((name,population,gdpPerCapita))
            streamingResults.append((name,population,gdpPerCapita))
    return streamingResults

In [51]:
chain=prompt|llmGemini|JsonOutputParser()|RunnableLambda(extractCountryAndPopulationStream)

In [52]:
for text in chain.stream(input={"countries":["Israel","Iran","India"]}):
    print(text,flush=True)

[('Israel', 9500000, 45000), ('Iran', 85000000, 8000), ('India', 1400000000, 2200)]


<h3> Fixing with Generator Async Function to get country and population pair in tuple</h3>

In [53]:
async def extractCountryAndPopulation(inputStream):
    seen=set()
    async for inputs in inputStream:  
        if not isinstance(inputs,dict):
            continue
        if "countries" not in inputs:
            continue
        countries=inputs['countries']
        if not isinstance(countries,list):
            continue
        for country in countries:
            name=country.get("name")
            population=country.get("population")
            gdpPerCapita=country.get("GDP Per Capita")
            if name is None or population is None or gdpPerCapita is None:
                continue
            if (name,population,gdpPerCapita) not in seen:
                seen.add((name,population,gdpPerCapita))
                yield((name,population,gdpPerCapita))

In [54]:
chain=prompt|llmGemini|JsonOutputParser()|RunnableGenerator(extractCountryAndPopulation)

In [57]:
async for text in chain.astream(input={"countries":["Israel","Iran","India"]}):
    print(text,flush=True)

('Israel', 9500000, 45000)
('Iran', 88000000, 8000)
('India', 1400000000, 2000)


<h3>Astream Events</h3>

In [58]:
chain=prompt|llmGemini|JsonOutputParser()

In [59]:
async for event in chain.astream_events(version="v2",input={"countries":["Israel","Iran","India"]}):
    print(event['event'])

on_chain_start
on_prompt_start
on_prompt_end
on_chat_model_start
on_chat_model_stream
on_parser_start
on_chat_model_stream
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream
on_chain_stream
on_chat_model_end
on_parser_end
on_chain_end


<h3> Filtering Events </h3>

In [60]:
chain=prompt|llmGemini|JsonOutputParser().with_config(config={"run_name":"outputParser"})

In [61]:
async for event in chain.astream_events(
    version="v2",
    input={"countries":["Japan","USA","Germany"]},
    include_names=["outputParser"]
):
    print(event['event'])

on_parser_start
on_parser_stream
on_parser_stream
on_parser_stream
on_parser_stream
on_parser_end


In [62]:
async for event in chain.astream_events(
    version="v2",
    input={"countries":["Japan","USA","Germany"]},
    include_types=["chat_model"]
):
    print(event['event'])

on_chat_model_start
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
on_chat_model_end
