In [22]:
from langchain_google_genai import ChatGoogleGenerativeAI
from dotenv import load_dotenv, find_dotenv
import os

load_dotenv(find_dotenv(), override=True)

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-001",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    google_api_key=os.getenv("GOOGLE_API_KEY")
)

In [27]:
chunks = []
for chunk in model.stream("Write a poem about Naruto Uzumaki's personality in 10 sentences."):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

A| sun|beam boy with hair of gold,
A spirit bright, a story told.
He laughs| aloud, a boisterous sound,
Though loneliness has him surrounded.
He|'s stubborn, brash, and never yields,
On dreams of Hokage, his heart is sealed.
A prankster's grin, a ramen| bowl,
He fights for friends, to make them whole.
With boundless hope, he lights the way,
And turns the darkest night to day.|

In [32]:
chunks[0]

AIMessageChunk(content='A', additional_kwargs={}, response_metadata={'safety_ratings': []}, id='run-049f5b16-c99d-4f19-88ad-043e3609b0c6', usage_metadata={'input_tokens': 18, 'output_tokens': 0, 'total_tokens': 18, 'input_token_details': {'cache_read': 0}})

In [30]:
for chunk in chunks:
    print(chunk.content)
    print()

A

 sun

beam boy with hair of gold,
A spirit bright, a story told.
He laughs

 aloud, a boisterous sound,
Though loneliness has him surrounded.
He

's stubborn, brash, and never yields,
On dreams of Hokage, his heart is sealed.
A prankster's grin, a ramen

 bowl,
He fights for friends, to make them whole.
With boundless hope, he lights the way,
And turns the darkest night to day.



# Chains

In [40]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    "tell me a joke about {topic}"
])
parser = StrOutputParser()
chain= prompt | model | parser
async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)
    print()

Why|
 did the parrot|
 get sent to his room?

Because he kept squawking swear words!|


# Working with input streams

In [42]:
# stream json
from langchain_core.output_parsers import JsonOutputParser
chain = (
    model | JsonOutputParser()
)
async for chunk in chain.astream(
    "output a list of 5 random chracters from Naruto and their villages in json format"
    'Use a dict with an outer key of "Naruto Series" which contains a list of characters '
    "Each character should have the key 'name' and 'village'"   
):
    print(chunk, flush=True)

{}
{'Naruto Series': [{}]}
{'Naruto Series': [{'name': 'Gaara', 'village': 'Sunagakure'}]}
{'Naruto Series': [{'name': 'Gaara', 'village': 'Sunagakure'}, {'name': 'Shino Aburame', 'village': 'Konohagakure'}, {}]}
{'Naruto Series': [{'name': 'Gaara', 'village': 'Sunagakure'}, {'name': 'Shino Aburame', 'village': 'Konohagakure'}, {'name': 'Kisame Hoshigaki', 'village': 'Kirigakure'}, {'name': ''}]}
{'Naruto Series': [{'name': 'Gaara', 'village': 'Sunagakure'}, {'name': 'Shino Aburame', 'village': 'Konohagakure'}, {'name': 'Kisame Hoshigaki', 'village': 'Kirigakure'}, {'name': 'Temari', 'village': 'Sunagakure'}, {'name': 'Choji Akimichi', 'village': 'Konohagakure'}]}


In [66]:
from langchain_core.output_parsers import JsonOutputParser

# a function that operates on finalized inputs rather than on an input_stream
def _extract_villages(inputs):
    """ A function that does not operates on input streams and break stream """
    """
        Args:
    """
    if not isinstance(inputs, dict):
        return ""
    if "Naruto Series" not in inputs:
        return ""
    characters = inputs["Naruto Series"]
    if not isinstance(characters, list):
        return ""
    
    village_names = [
        character.get('village') for character in characters if isinstance(character, dict) 
    ]
    character_names = [
        character.get('name') for character in characters if isinstance(character, dict)
    ]
    return village_names, character_names

chain = model | JsonOutputParser() | _extract_villages

async for chunk in chain.astream(
    "output a list of 5 random chracters from Naruto and their villages in json format"
    'Use a dict with an outer key of "Naruto Series" which contains a list of characters '
    "Each character should have the key 'name' and 'village'" 
):
    print(chunk[0], end="|", flush=True)

['Sunagakure', 'Konohagakure', 'Kirigakure', 'Sunagakure', 'Konohagakure']|

## Generator Functions
Let's fix the streaming using a generator function that can operate on the **input stream**


In [71]:
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser

async def _extract_villages_streaming(input_stream):
    """ A function that operates on input streams"""
    character_names_so_far = set()
    village_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue
        if "Naruto Series" not in input:
            continue
        characters = input["Naruto Series"]

        if not isinstance(characters, list):
            continue

        for character in characters:
            if not isinstance(character, dict):
                continue
            village_name = character.get('village')
            character_name = character.get('name')
            # if not village_name:
            #     continue
            # if village_name not in village_names_so_far:
            #     yield village_name
            #     village_names_so_far.add(village_name)

            if not character_name or  not village_name:
                continue
            if character_name not in character_names_so_far or village_name not in village_names_so_far:
                yield {
                    "name": character_name,
                    "village": village_name
                }
                character_names_so_far.add(character_name)
                village_names_so_far.add(village_name)

chain = model | JsonOutputParser() | _extract_villages_streaming

async for chunk in chain.astream(
    "output a list of 5 random characters from Naruto and their villages in json format"
    'Use a dict with an outer key of "Naruto Series" which contains a list of characters '
    "Each character should have the key 'name' and 'village'" 
):
    print(chunk.get('name') , "-", chunk.get('village'), end="|\n", flush=True)


Temari - Sunagakure|
Kiba Inuzuka - Konohagakure|
Choji Akimichi - Konohagakure|
Sasori - Sunagakure|
Kisame Hoshigaki - Kirigakure|


# Non-streaming components

Some built-in components like retrievers do not offer any `streaming`. What happens if we try to stream them?

In [82]:
from langchain_mongodb import MongoDBAtlasVectorSearch
from pymongo import MongoClient

from dotenv import load_dotenv, find_dotenv


load_dotenv(find_dotenv(), override=True)
MONGODB_ATLAS_CLUSTER_URI = os.getenv("MONGODB_ATLAS_CLUSTER_URI")
client = MongoClient(
    MONGODB_ATLAS_CLUSTER_URI
)
DB_NAME = "RAG-Chatbot-Cluster"
COLLECTION_NAME = "RAG-Chatbot-Collection"
ATLAS_VECTOR_SEARCH_INDEX_NAME = "RAG-Chatbot-Index"

MONGODB_COLLECTION = client[DB_NAME][COLLECTION_NAME]

vector_store = MongoDBAtlasVectorSearch(
    collection=MONGODB_COLLECTION,
    embedding=embeddings,
    index_name=ATLAS_VECTOR_SEARCH_INDEX_NAME,
    relevance_score_fn="cosine"
)
# create a vector search index on the collection
vector_store.create_vector_search_index(dimensions=768)
print(f"[INFO] Created a vector search index on the collection '{COLLECTION_NAME}' in the database '{DB_NAME}'!")

[INFO] Created a vector search index on the collection 'RAG-Chatbot-Collection' in the database 'RAG-Chatbot-Cluster'!


In [95]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_core.documents import Document
from langchain_mongodb import MongoDBAtlasVectorSearch
from pymongo import MongoClient
from uuid import uuid4
from dotenv import load_dotenv, find_dotenv


load_dotenv(find_dotenv(), override=True)
embeddings = GoogleGenerativeAIEmbeddings(
    model="models/text-embedding-004",
    google_api_key=os.getenv("GOOGLE_API_KEY")
)

template = """Answer the question based only on the following context: {context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_messages(
    ("human", "Answer the question based only on the following context: {context}\n\nQuestion: {question}")
)
documents = [
    Document(
        page_content="Naruto Uzumaki is a genin from the Hidden Leaf village. He is known for his 'never-listening' attitude of him.",
        metadata ={ "source": "Naruto Series"}
    ),
    Document(
        page_content="Sasuke Uchiha is a prodigy from Uchiha clan. But he was lost in the darkk because of his hatred towards his brother.",
        metadata = {"source": "Naruto Series"}
    )
]
ids = [str(uuid4()) for _ in range(len(documents))]

vector_store.add_documents(
    documents=documents,
    ids=ids
)

retriever = vector_store.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"k": 1, "score_threshold": 0.2},
)

chunks = [chunk for chunk in retriever.stream("Who is Naruto Uzumaki?")]
chunks

[[Document(id='6f10eb13-af01-4229-85a1-6ddfcaedea2e', metadata={'_id': '6f10eb13-af01-4229-85a1-6ddfcaedea2e', 'source': 'Naruto Series'}, page_content="Naruto Uzumaki is a genin from the Hidden Leaf village. He is known for his 'never-listening' attitude of him.")]]

In [96]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)


In [97]:
for chunk in retrieval_chain.stream(
    "Who is Naruto Uzumaki?"
):
    print(chunk, end="|", flush=True)

Nar|uto Uzum|aki is a genin from the Hidden Leaf village. He is known for his| 'never-listening' attitude of him.|