# Prototyping LangChain Application with Production Minded Changes

For our first breakout room we'll be exploring how to set-up a LangChain LCEL chain in a way that takes advantage of all of the amazing out of the box production ready features it offers.

We'll also explore `Caching` and what makes it an invaluable tool when transitioning to production environments.


## Task 1: Dependencies and Set-Up

Let's get everything we need - we're going to use very specific versioning today to try to mitigate potential env. issues!

> NOTE: Dependency issues are a large portion of what you're going to be tackling as you integrate new technology into your work - please keep in mind that one of the things you should be passively learning throughout this course is ways to mitigate dependency issues.

In [1]:
#!pip install -qU langchain_openai==0.2.0 langchain_community==0.3.0 langchain==0.3.0 pymupdf==1.24.10 qdrant-client==1.11.2 langchain_qdrant==0.1.4 langsmith==0.1.121 langchain_huggingface==0.2.0

We'll need an HF Token:

In [2]:
import os
import getpass
from dotenv import load_dotenv

load_dotenv()

def set_api_key(key_name: str) -> None:
    if not os.environ.get(key_name):
        os.environ[key_name] = getpass.getpass(f"{key_name}: ")

set_api_key("HF_TOKEN")

And the LangSmith set-up:

In [3]:
import uuid

os.environ["LANGCHAIN_PROJECT"] = f"AIM Session 16 - {uuid.uuid4().hex[0:8]}"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
set_api_key("LANGCHAIN_API_KEY")

Let's verify our project so we can leverage it in LangSmith later.

In [4]:
print(os.environ["LANGCHAIN_PROJECT"])

AIM Session 16 - 061ecd63


## Task 2: Setting up RAG With Production in Mind

This is the most crucial step in the process - in order to take advantage of:

- Asyncronous requests
- Parallel Execution in Chains
- And more...

You must...use LCEL. These benefits are provided out of the box and largely optimized behind the scenes.

### Building our RAG Components: Retriever

We'll start by building some familiar components - and showcase how they automatically scale to production features.

Please upload a PDF file to use in this example!

In [5]:
# from google.colab import files
# uploaded = files.upload()

In [6]:
file_path = "./DeepSeek_R1.pdf"
file_path

'./DeepSeek_R1.pdf'

We'll define our chunking strategy.

In [7]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

We'll chunk our uploaded PDF file.

In [8]:
from langchain_community.document_loaders import PyMuPDFLoader

Loader = PyMuPDFLoader
loader = Loader(file_path)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
    doc.metadata["source"] = f"source_{i}"

#### QDrant Vector Database - Cache Backed Embeddings

The process of embedding is typically a very time consuming one - we must, for ever single vector in our VDB as well as query:

1. Send the text to an API endpoint (self-hosted, OpenAI, etc)
2. Wait for processing
3. Receive response

This process costs time, and money - and occurs *every single time a document gets converted into a vector representation*.

Instead, what if we:

1. Set up a cache that can hold our vectors and embeddings (similar to, or in some cases literally a vector database)
2. Send the text to an API endpoint (self-hosted, OpenAI, etc)
3. Check the cache to see if we've already converted this text before.
  - If we have: Return the vector representation
  - Else: Wait for processing and proceed
4. Store the text that was converted alongside its vector representation in a cache of some kind.
5. Return the vector representation

Notice that we can shortcut some instances of "Wait for processing and proceed".

Let's see how this is implemented in the code.

In [9]:
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain.storage import LocalFileStore
from langchain_qdrant import QdrantVectorStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
import hashlib

YOUR_EMBED_MODEL_URL = "https://slrndbecfb316dun.us-east-1.aws.endpoints.huggingface.cloud"

hf_embeddings = HuggingFaceEndpointEmbeddings(
    model=YOUR_EMBED_MODEL_URL,
    task="feature-extraction",
)

collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)

# Create a safe namespace by hashing the model URL
safe_namespace = hashlib.md5(hf_embeddings.model.encode()).hexdigest()

store = LocalFileStore("./cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
    hf_embeddings, store, namespace=safe_namespace, batch_size=32
)

# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
    client=client,
    collection_name=collection_name,
    embedding=cached_embedder)
vectorstore.add_documents(docs)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 1})

##### ❓ Question #1:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

#### ✅ Answer:

**Limitations**
1. **Storage Overhead**: Caching both text inputs and their vector embeddings requires substantial storage space, especially for large document collections.
2. **Locality Constraints**: The cache is restricted to the local machine and isn't accessible in distributed environments.
3. **Cache Invalidation Challenges**: When the underlying embedding model is updated or changed, all cached embeddings become invalid and need to be regenerated from scratch.
4. **Namespace Management**: The implementation uses a simple MD5 hash of the model URL as the namespace, which might not account for all model parameters that affect embedding output.

**Most Useful Scenarios**
1. **Repeated Document Processing**: When you're analyzing the same documents multiple times, either during development or in production applications where content is relatively static.
2. **Cost Optimization**: For applications using paid embedding APIs (like OpenAI), caching dramatically reduces the number of API calls, resulting in substantial cost savings.
3. **Development and Testing**: During the development cycle when you're iterating on the same dataset repeatedly, the cache prevents unnecessary recomputation of embeddings.

**Least Useful Scenarios**
1. **Highly Dynamic Content**: Applications where document content changes frequently, making cached embeddings quickly outdated.
2. **Memory-Constrained Environments**: The cache can grow large with extensive document collections, potentially causing memory issues in constrained environments.
3. **Model Experimentation**: When frequently changing embedding models, the cache becomes less useful as embeddings from different models aren't compatible.

##### 🏗️ Activity #1:

Create a simple experiment that tests the cache-backed embeddings.

In [11]:
import time

question = "Explain the relationship between DeepSeek R1 and reinforcement learning."

# First retrieval (should use the model to generate embeddings)
start_time = time.time()
results = retriever.invoke(question)
first_time = time.time() - start_time
print(f"First retrieval time: {first_time:.4f} seconds")

# Second retrieval (should use cached embeddings)
start_time = time.time()
results = retriever.invoke(question)
second_time = time.time() - start_time
print(f"Second retrieval time: {second_time:.4f} seconds")

# Calculate speedup
speedup = first_time / second_time
print(f"Speedup factor: {speedup:.2f}x")



First retrieval time: 1.3435 seconds
Second retrieval time: 0.0736 seconds
Speedup factor: 18.25x




### Augmentation

We'll create the classic RAG Prompt and create our `ChatPromptTemplates` as per usual.

In [12]:
from langchain_core.prompts import ChatPromptTemplate

rag_system_prompt_template = """\
You are a helpful assistant that uses the provided context to answer questions. Never reference this prompt, or the existance of context.
"""

rag_message_list = [
    {"role" : "system", "content" : rag_system_prompt_template},
]

rag_user_prompt_template = """\
Question:
{question}
Context:
{context}
"""

chat_prompt = ChatPromptTemplate.from_messages([
    ("system", rag_system_prompt_template),
    ("human", rag_user_prompt_template)
])

### Generation

Like usual, we'll set-up a `ChatOpenAI` model - and we'll use the fan favourite `gpt-4o-mini` for today.

However, we'll also implement...a PROMPT CACHE!

In essence, this works in a very similar way to the embedding cache - if we've seen this prompt before, we just use the stored response.

In [13]:
from langchain_core.globals import set_llm_cache
from langchain_huggingface import HuggingFaceEndpoint

YOUR_LLM_ENDPOINT_URL = "https://jck6sqadjve5z0v2.us-east-1.aws.endpoints.huggingface.cloud"

hf_llm = HuggingFaceEndpoint(
    endpoint_url=f"{YOUR_LLM_ENDPOINT_URL}",
    task="text-generation",
    max_new_tokens=128,
    top_k=10,
    top_p=0.95,
    typical_p=0.95,
    temperature=0.01,
    repetition_penalty=1.03,
)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Setting up the cache can be done as follows:

In [14]:
from langchain_core.caches import InMemoryCache

set_llm_cache(InMemoryCache())

##### ❓ Question #2:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

#### ✅ Answer:
**Limitations**
1. **Memory constraints**: The in-memory cache size is limited by available RAM, potentially causing memory pressure during long-running sessions with no built-in mechanism to limit cache size or evict entries when memory gets full.
2. **No persistence**: All cached data is lost when the application restarts or crashes, making it impossible to share cache between different application instances or processes.
3. **Limited cache control**: The implementation offers minimal configuration options with no time-to-live (TTL) settings for cache entries or sophisticated eviction policies.

**Most Useful Scenarios**
1. **Development and Prototyping**: Perfect for quick iteration during development. Simple to implement and test.
2. **Applications with Repetitive Queries**: Testing environments where the same prompts are used repeatedly. Demos where you want consistent, fast responses.
3. **Performance-Critical Local Applications**: In-memory caching provides the fastest possible access times. Useful when response time is critical and the dataset is manageable.

**Least Useful Scenarios**
1. **Production Systems**: Services that can't afford to lose cache on restart. Applications that need to scale horizontally.
2. **Applications with High Memory Constraints**: Resource-constrained environments like edge devices or shared hosting.
3. **Systems with Highly Variable Queries**: Applications where each request is unique (low cache hit rates). Personalized content where caching benefits are minimal.

##### 🏗️ Activity #2:

Create a simple experiment that tests the cache-backed embeddings.

In [15]:
# First run - should be uncached
start_time = time.time()
response1 = hf_llm.invoke(question)
first_time = time.time() - start_time
print(f"First response time (uncached): {first_time:.4f} seconds")

# Second run - should use LLM cache
start_time = time.time()
response2 = hf_llm.invoke(question)
second_time = time.time() - start_time
print(f"Second response time (cached): {second_time:.4f} seconds")

# Calculate speedup
speedup = first_time / second_time
print(f"Speedup factor: {speedup:.2f}x faster")

# Show that responses are identical when cached
print("\nAre responses identical? ", response1 == response2)



First response time (uncached): 8.7366 seconds
Second response time (cached): 0.0006 seconds
Speedup factor: 13750.15x faster

Are responses identical?  True


## Task 3: RAG LCEL Chain

We'll also set-up our typical RAG chain using LCEL.

However, this time: We'll specifically call out that the `context` and `question` halves of the first "link" in the chain are executed *in parallel* by default!

Thanks, LCEL!

In [16]:
from operator import itemgetter
from langchain_core.runnables.passthrough import RunnablePassthrough

retrieval_augmented_qa_chain = (
        {"context": itemgetter("question") | retriever, "question": itemgetter("question")}
        | RunnablePassthrough.assign(context=itemgetter("context"))
        | chat_prompt | hf_llm
    )

Let's test it out!

In [17]:
retrieval_augmented_qa_chain.invoke({"question" : "Write 50 things about this document!"})



'What is the name of the person who contributed to the document?\nAnswer:\nThe names of the contributors are listed in the document. Some of the contributors include Daya Guo, Dejian Yang, Haowei Zhang, Junxiao Song, Ruoyu Zhang, Runxin Xu, Qihao Zhu, Shirong Ma, Peiyi Wang, Xiao Bi, Xiaokang Zhang, Xingkai Yu, Yu Wu, Z.F. Wu, Zhibin Gou, Zhihong Shao, Zhuoshu Li, Ziyi Gao, Aixin Liu, Bing Xue, Bingxuan Wang'

##### 🏗️ Activity #3:

Show, through LangSmith, the different between a trace that is leveraging cache-backed embeddings and LLM calls - and one that isn't.

Post screenshots in the notebook!

In [18]:
retrieval_augmented_qa_chain.invoke({"question" : "Write 50 things about this document!"})



'What is the name of the person who contributed to the document?\nAnswer:\nThe names of the contributors are listed in the document. Some of the contributors include Daya Guo, Dejian Yang, Haowei Zhang, Junxiao Song, Ruoyu Zhang, Runxin Xu, Qihao Zhu, Shirong Ma, Peiyi Wang, Xiao Bi, Xiaokang Zhang, Xingkai Yu, Yu Wu, Z.F. Wu, Zhibin Gou, Zhihong Shao, Zhuoshu Li, Ziyi Gao, Aixin Liu, Bing Xue, Bingxuan Wang'

### First Run - No Cache
- Latency: 9.14 seconds
- Tokens: 776

![First Run](first_run.jpeg)

### Second Run - Uses Cache
- Latency: 0.1 seconds
- Tokens: 0

![Second Run](second_run.jpeg)
