# Prototyping LangChain Application with Production Minded Changes

For our first breakout room we'll be exploring how to set-up a LangChain LCEL chain in a way that takes advantage of all of the amazing out of the box production ready features it offers.

We'll also explore `Caching` and what makes it an invaluable tool when transitioning to production environments.


## Task 1: Dependencies and Set-Up

Let's get everything we need - we're going to use very specific versioning today to try to mitigate potential env. issues!

> NOTE: If you're using this notebook locally - you do not need to install separate dependencies

In [24]:
#!pip install -qU langchain_openai==0.2.0 langchain_community==0.3.0 langchain==0.3.0 pymupdf==1.24.10 qdrant-client==1.11.2 langchain_qdrant==0.1.4 langsmith==0.1.121 langchain_huggingface==0.2.0

We'll need an HF Token:

In [1]:
import os
import getpass

os.environ["HF_TOKEN"] = getpass.getpass("HF Token Key:")

And the LangSmith set-up:

In [2]:
import uuid

os.environ["LANGCHAIN_PROJECT"] = f"AIM Session 16 - {uuid.uuid4().hex[0:8]}"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass("LangChain API Key:")

Let's verify our project so we can leverage it in LangSmith later.

In [3]:
print(os.environ["LANGCHAIN_PROJECT"])

AIM Session 16 - 8003a8a0


## Task 2: Setting up RAG With Production in Mind

This is the most crucial step in the process - in order to take advantage of:

- Asyncronous requests
- Parallel Execution in Chains
- And more...

You must...use LCEL. These benefits are provided out of the box and largely optimized behind the scenes.

### Building our RAG Components: Retriever

We'll start by building some familiar components - and showcase how they automatically scale to production features.

Please upload a PDF file to use in this example!

> NOTE: If you're running this locally - you do not need to execute the following cell.

In [7]:
#from google.colab import files
#uploaded = files.upload()

Saving eu_ai_act.html to eu_ai_act (1).html


In [4]:
file_path = "./DeepSeek_R1.pdf"
file_path

'./DeepSeek_R1.pdf'

We'll define our chunking strategy.

In [5]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

We'll chunk our uploaded PDF file.

In [6]:
from langchain_community.document_loaders import PyMuPDFLoader

Loader = PyMuPDFLoader
loader = Loader(file_path)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
    doc.metadata["source"] = f"source_{i}"

#### QDrant Vector Database - Cache Backed Embeddings

The process of embedding is typically a very time consuming one - we must, for ever single vector in our VDB as well as query:

1. Send the text to an API endpoint (self-hosted, OpenAI, etc)
2. Wait for processing
3. Receive response

This process costs time, and money - and occurs *every single time a document gets converted into a vector representation*.

Instead, what if we:

1. Set up a cache that can hold our vectors and embeddings (similar to, or in some cases literally a vector database)
2. Send the text to an API endpoint (self-hosted, OpenAI, etc)
3. Check the cache to see if we've already converted this text before.
  - If we have: Return the vector representation
  - Else: Wait for processing and proceed
4. Store the text that was converted alongside its vector representation in a cache of some kind.
5. Return the vector representation

Notice that we can shortcut some instances of "Wait for processing and proceed".

Let's see how this is implemented in the code.

In [8]:
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain.storage import LocalFileStore
from langchain_qdrant import QdrantVectorStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings
import hashlib

YOUR_EMBED_MODEL_URL = "https://edgcuwe6xxb9egwc.us-east4.gcp.endpoints.huggingface.cloud"

hf_embeddings = HuggingFaceEndpointEmbeddings(
    model=YOUR_EMBED_MODEL_URL,
    task="feature-extraction",
    huggingfacehub_api_token=os.environ["HF_TOKEN"],
)

collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(size=768, distance=Distance.COSINE),
)

# Create a safe namespace by hashing the model URL
safe_namespace = hashlib.md5(hf_embeddings.model.encode()).hexdigest()

store = LocalFileStore("./cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
    hf_embeddings, store, namespace=safe_namespace, batch_size=32
)

# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
    client=client,
    collection_name=collection_name,
    embedding=cached_embedder)

vectorstore.add_documents(docs)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 1})

##### ❓ Question #1:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

<span style="color:green;">
**Limitations of Cache-Backed Embeddings (e.g., `LocalFileStore`):**

*   **Storage Overhead:** Storing embeddings, especially for numerous or large documents, can consume significant disk space. The cache size needs to be managed.
*   **Cache Staleness:** If the underlying documents change or the embedding model is updated, the cached embeddings become stale. This can lead to the retriever fetching irrelevant information or using suboptimal representations. Mechanisms for cache invalidation or regular updates are necessary.
*   **Exact Match Dependency:** The default `CacheBackedEmbeddings` with `LocalFileStore` typically relies on an exact match of the input text (or its hash) to find a cached embedding. Minor variations in text (e.g., punctuation, capitalization if not normalized, slight rephrasing) will result in a cache miss, reducing cache effectiveness.
*   **Initial Latency:** The very first time a unique piece of text is encountered, it needs to be embedded and stored in the cache, so there's no speed-up for the initial processing. The benefit comes from subsequent requests for the *same* text.
*   **Concurrency Issues (Potentially):** Depending on the `ByteStore` implementation, if multiple processes try to write to the same cache file simultaneously without proper locking, it could lead to corruption. `LocalFileStore` might be more suited for single-process applications or require careful handling in multi-process scenarios.
*   **Portability/Scalability of LocalFileStore:** `LocalFileStore` is local to the filesystem. In distributed or serverless environments, a shared, centralized cache (like Redis or a cloud-based object store) would be more appropriate.

**When Cache-Backed Embeddings are MOST Useful:**

*   **Static or Infrequently Changing Corpora:** Ideal when the documents being embedded do not change often (e.g., a fixed set of research papers, books, or product manuals).
*   **Repetitive Embedding Tasks:** When the same documents or text chunks are processed multiple times (e.g., during iterative development, repeated testing, or if the same source documents are part of many different queries/users' contexts).
*   **Expensive Embedding Models:** If the embedding model is slow or incurs significant computational/API costs, the savings from caching are more pronounced.
*   **Development and Testing:** Speeds up development cycles significantly by avoiding re-embedding known data.
*   **Applications with Predictable Input Texts:** If certain texts are frequently used as input for embedding (e.g., common user queries that need to be embedded before retrieval).

**When Cache-Backed Embeddings are LEAST Useful:**

*   **Highly Dynamic or Streaming Data:** If the source documents are constantly changing or new unique documents are always being processed, the cache hit rate will be low, and the overhead of caching might outweigh the benefits.
*   **Unique, One-Off Embedding Tasks:** If every piece of text to be embedded is unique and processed only once.
*   **Very Fast/Cheap Embedding Models:** If the time and cost of embedding are negligible, the complexity of adding and managing a cache might not be justified.
*   **Strict Real-Time Requirements for Novel Data:** If the system *must* always use the absolute freshest embedding for brand new, unseen data without any possibility of using a (momentarily) cached version.
*   **Limited Storage:** If storage resources are highly constrained.
</span>

##### 🏗️ Activity #1:

Create a simple experiment that tests the cache-backed embeddings.

In [10]:
import os

cache_root_dir = "./cache"
if not os.path.exists(cache_root_dir):
    os.makedirs(cache_root_dir)
    print(f"Manually created directory: {cache_root_dir}")
else:
    print(f"Directory already exists: {cache_root_dir}")


Directory already exists: ./cache


In [14]:
import time
import os

# Ensure `cached_embedder` and `safe_namespace` are defined and working from the previous cell (ID: dzPUTCua98b2).
# If you get "NameError" for 'cached_embedder' or 'safe_namespace',
# re-run the cell that initializes them successfully.

# ---- Test Case 1: Embedding a list of texts ----
sample_texts_1 = [
    "This is the first sentence for the cache-backed embedding test.",
    "This is a second, different sentence to see how batching and caching work.",
    "Exploring the benefits of caching embeddings in LangChain."
]

print("--- Experiment: Testing Cache-Backed Embeddings ---")

# First run: Embed the documents. This should be a CACHE MISS for all texts.
print("\nRunning embedding for the first time (expecting cache misses)...")
start_time_miss = time.time()
try:
    embeddings_miss = cached_embedder.embed_documents(sample_texts_1)
    end_time_miss = time.time()
    print(f"Time taken for first run (cache miss): {end_time_miss - start_time_miss:.4f} seconds")
except Exception as e:
    print(f"An error occurred during the first embedding run: {e}")
    print("Please ensure your `YOUR_EMBED_MODEL_URL` in the setup cell (dzPUTCua98b2) is correct and the endpoint is active.")
    embeddings_miss = None

# Second run: Embed the *same* documents again. This should be a CACHE HIT for all texts.
if embeddings_miss is not None:
    print("\nRunning embedding for the second time with the SAME texts (expecting cache hits)...")
    start_time_hit = time.time()
    try:
        embeddings_hit = cached_embedder.embed_documents(sample_texts_1)
        end_time_hit = time.time()
        print(f"Time taken for second run (cache hit): {end_time_hit - start_time_hit:.4f} seconds")
    except Exception as e:
        print(f"An error occurred during the second embedding run: {e}")
else:
    print("\nSkipping second run due to error in the first run.")

# ---- Test Case 2: Embedding a new, different document ----
sample_text_2_new = "This is a completely new sentence that has not been cached yet."
print("\nRunning embedding for a NEW, DIFFERENT text (expecting a cache miss)...")
start_time_new_miss = time.time()
try:
    embedding_new_miss = cached_embedder.embed_query(sample_text_2_new)
    end_time_new_miss = time.time()
    print(f"Time taken for new text run (cache miss): {end_time_new_miss - start_time_new_miss:.4f} seconds")
except Exception as e:
    print(f"An error occurred during the new text embedding run: {e}")

# --- Enhanced Cache Directory Inspection ---
print("\n--- Enhanced Cache Directory Inspection ---")

# Check if `safe_namespace` is available in this cell's scope
if 'safe_namespace' not in locals() and 'safe_namespace' not in globals():
    print("\n`safe_namespace` is NOT DEFINED in this cell's scope.")
    print("Cannot inspect cache directory without `safe_namespace`.")
    print("Please ensure the cell that defines `cached_embedder` and `safe_namespace` (cell id: dzPUTCua98b2) was run successfully *before* this activity cell.")
else:
    # `safe_namespace` is available, proceed with checks.
    print(f"The 'safe_namespace' variable in this cell is: {safe_namespace}")
    
    root_cache_dir = "./cache"
abs_root_cache_dir = os.path.abspath(root_cache_dir)

print(f"Inspecting root cache directory: {abs_root_cache_dir}")

if os.path.exists(abs_root_cache_dir) and os.path.isdir(abs_root_cache_dir):
    print(f"Root cache directory '{abs_root_cache_dir}' exists.")
    try:
        root_contents = os.listdir(abs_root_cache_dir)
        if root_contents:
            print(f"SUCCESS: Root cache directory contains {len(root_contents)} items (files/folders).")
            print(f"First few items: {root_contents[:5]}")
            print("These are likely the cached embedding files. Their names are hashes of the input texts, prefixed by the namespace.")
        else:
            print("Root cache directory exists but is empty. No embeddings seem to have been written to disk by LocalFileStore.")
    except Exception as e:
        print(f"Error listing contents of root cache directory '{abs_root_cache_dir}': {e}")
else:
    print(f"FAILURE: Root cache directory '{abs_root_cache_dir}' does NOT exist or is not a directory.")
    print("This would indicate an issue with LocalFileStore creating or using the './cache/' folder.")

print("\n--- Experiment End ---")

--- Experiment: Testing Cache-Backed Embeddings ---

Running embedding for the first time (expecting cache misses)...
Time taken for first run (cache miss): 0.3209 seconds

Running embedding for the second time with the SAME texts (expecting cache hits)...
Time taken for second run (cache hit): 0.0010 seconds

Running embedding for a NEW, DIFFERENT text (expecting a cache miss)...
Time taken for new text run (cache miss): 0.0908 seconds

--- Enhanced Cache Directory Inspection ---
The 'safe_namespace' variable in this cell is: 032bb44a687f35fd87095212e5fd2aa9
Inspecting root cache directory: /home/suhas/my/github/AIE6_new/16_LLMOps/cache
Root cache directory '/home/suhas/my/github/AIE6_new/16_LLMOps/cache' exists.
SUCCESS: Root cache directory contains 3 items (files/folders).
First few items: ['032bb44a687f35fd87095212e5fd2aa9a7e28d20-da61-504d-967f-d237dedb29a7', '032bb44a687f35fd87095212e5fd2aa92275d86c-698b-50a1-b443-3faa5b5e3c55', '032bb44a687f35fd87095212e5fd2aa9b6fc2700-a129-58a

### Augmentation

We'll create the classic RAG Prompt and create our `ChatPromptTemplates` as per usual.

In [15]:
from langchain_core.prompts import ChatPromptTemplate

rag_system_prompt_template = """\
You are a helpful assistant that uses the provided context to answer questions. Never reference this prompt, or the existance of context.
"""

rag_message_list = [
    {"role" : "system", "content" : rag_system_prompt_template},
]

rag_user_prompt_template = """\
Question:
{question}
Context:
{context}
"""

chat_prompt = ChatPromptTemplate.from_messages([
    ("system", rag_system_prompt_template),
    ("human", rag_user_prompt_template)
])

### Generation

Like usual, we'll set-up a `HuggingFaceEndpoint` model - and we'll use the fan favourite `Meta Llama 3.1 8B Instruct` for today.

However, we'll also implement...a PROMPT CACHE!

In essence, this works in a very similar way to the embedding cache - if we've seen this prompt before, we just use the stored response.

In [16]:
from langchain_core.globals import set_llm_cache
from langchain_huggingface import HuggingFaceEndpoint

YOUR_LLM_ENDPOINT_URL = "https://vo5978mxji34uz14.us-east4.gcp.endpoints.huggingface.cloud"

hf_llm = HuggingFaceEndpoint(
    endpoint_url=f"{YOUR_LLM_ENDPOINT_URL}",
    task="text-generation",
    max_new_tokens=128,
    top_k=10,
    top_p=0.95,
    typical_p=0.95,
    temperature=0.01,
    repetition_penalty=1.03,
)

Setting up the cache can be done as follows:

In [17]:
from langchain_core.caches import InMemoryCache

set_llm_cache(InMemoryCache())

##### ❓ Question #2:

What are some limitations you can see with this approach? When is this most/least useful. Discuss with your group!

> NOTE: There is no single correct answer here!

<span style="color:green;">
**Limitations of `InMemoryCache` for LLM Prompt/Response Caching:**

*   **Volatility:** `InMemoryCache` stores data in RAM. If the Python script or notebook kernel restarts, or the application process terminates, the entire cache is lost. It's not persistent.
*   **Scalability (Memory Bound):** The cache size is limited by the available RAM on the machine running the application. For applications with a vast number of unique prompts or very long responses, this can become a bottleneck or lead to excessive memory consumption.
*   **Not Shared Across Processes/Instances:** An `InMemoryCache` is local to the specific Python process that created it. In a distributed application (e.g., multiple web server instances, worker processes), each instance would have its own separate, unshared cache, reducing overall cache effectiveness.
*   **Exact Prompt Matching:** Typically, `InMemoryCache` (and many simple caching mechanisms) will only return a hit if the new prompt (and often other LLM call parameters like temperature, max_tokens, etc., depending on how the cache key is constructed) is an *exact match* to a previously cached prompt. Slight variations in wording, even if semantically identical, will result in a cache miss.
*   **Stale Responses for Dynamic Content:** If the correct answer to a prompt changes over time (e.g., "What is the current weather?" or questions based on evolving data), the cache might serve outdated information unless there's a mechanism for cache invalidation or time-to-live (TTL), which `InMemoryCache` itself doesn't inherently manage in a sophisticated way.
*   **No Semantic Understanding:** The cache doesn't understand the *meaning* of the prompt. "What is the capital of France?" and "Tell me France's capital city" are different strings and would be separate cache entries.

**When `InMemoryCache` for LLM Calls is MOST Useful:**

*   **Development and Testing:** Excellent for rapid iteration when you are repeatedly sending the same prompts to an LLM. It saves API costs and significantly speeds up development.
*   **Single-User, Short-Lived Scripts/Applications:** Useful for individual scripts or applications where persistence isn't required, and the same prompts might be issued multiple times within a single session.
*   **Educational Purposes/Demos:** A simple way to demonstrate the concept and benefits of LLM caching without the overhead of setting up external caching infrastructure.
*   **Applications with Highly Repetitive, Static Queries:** If an application consistently receives a limited set of identical prompts for which the answers are static (e.g., a basic FAQ bot).
*   **Reducing Load During Bursts:** Can help absorb temporary bursts of identical requests in a single-instance application.

**When `InMemoryCache` for LLM Calls is LEAST Useful:**

*   **Production Environments Requiring High Availability/Persistence:** Production systems typically need a persistent, shared cache (like Redis, Memcached, or a database-backed cache) that survives restarts and can be accessed by multiple application instances.
*   **Distributed Applications:** When the application runs across multiple servers or processes, a shared cache is necessary for effectiveness.
*   **Applications with Diverse and Unique Prompts:** If most user prompts are unique, the cache hit rate will be very low, and the memory used by the cache might not be justified.
*   **Prompts Requiring Real-Time or Highly Dynamic Information:** When answers must always be up-to-the-second accurate and based on constantly changing data.
*   **Long-Running Applications with Many Unique Prompts:** The cache could grow indefinitely, consuming excessive memory if there's no eviction policy (though `InMemoryCache` is basic and might fill up available memory).
*   **Applications Needing Semantic Caching:** When the ability to cache based on prompt meaning rather than exact string match is desired.
</span>

##### 🏗️ Activity #2:

Create a simple experiment that tests the cache-backed generator.

In [18]:
import time

# Ensure hf_llm is defined from the previous cell
# For example:
# from langchain_core.globals import set_llm_cache
# from langchain_huggingface import HuggingFaceEndpoint
# from langchain_core.caches import InMemoryCache
# set_llm_cache(InMemoryCache()) # Ensure cache is set
# YOUR_LLM_ENDPOINT_URL = "https://pm43rr4y06e1846p.us-east-1.aws.endpoints.huggingface.cloud" # Or your actual endpoint
# hf_llm = HuggingFaceEndpoint(
# endpoint_url=f"{YOUR_LLM_ENDPOINT_URL}",
# task="text-generation",
# max_new_tokens=50, # Keep tokens low for a quick test
# )

sample_prompt = "Tell me a very short joke about a cat."

# --- First Call (Cache Miss) ---
print("--- First Call (Cache Miss) ---")
start_time = time.time()
try:
    response_first_call = hf_llm.invoke(sample_prompt)
    end_time = time.time()
    print(f"Time taken for first call: {end_time - start_time:.4f} seconds")
    print(f"Response: {response_first_call[:100]}...") # Print first 100 chars
except Exception as e:
    print(f"Error during first call: {e}")
    print("Please ensure `hf_llm` is initialized correctly and caching is set in a preceding cell.")

# --- Second Call (Cache Hit) ---
# Ensure there's a slight delay if needed, though for InMemoryCache it should be fast
# time.sleep(0.1) # Usually not needed for InMemoryCache hit verification
print("\n--- Second Call (Cache Hit) ---")
start_time = time.time()
try:
    response_second_call = hf_llm.invoke(sample_prompt)
    end_time = time.time()
    print(f"Time taken for second call: {end_time - start_time:.4f} seconds")
    print(f"Response: {response_second_call[:100]}...") # Print first 100 chars

    # Verify if responses are identical (they should be for a cache hit)
    if response_first_call == response_second_call:
        print("\nCache hit confirmed: Responses are identical.")
    else:
        print("\nCache miss or different response: Responses differ.")
except Exception as e:
    print(f"Error during second call: {e}")

# --- Third Call (Different Prompt - Cache Miss) ---
different_prompt = "Tell me a very short joke about a dog."
print("\n--- Third Call (Different Prompt - Cache Miss) ---")
start_time = time.time()
try:
    response_third_call = hf_llm.invoke(different_prompt)
    end_time = time.time()
    print(f"Time taken for third call: {end_time - start_time:.4f} seconds")
    print(f"Response: {response_third_call[:100]}...")
except Exception as e:
    print(f"Error during third call: {e}")

# To inspect the cache (though InMemoryCache is not directly inspectable like a file store)
# For InMemoryCache, the test relies on timing and response identity.
# If you were using a persistent cache like FileSystemCache, you could check the file system.
# from langchain.cache import FileSystemCache
# set_llm_cache(FileSystemCache(cache_dir="llm_cache/"))
# Then you could os.listdir("llm_cache/")

--- First Call (Cache Miss) ---
Time taken for first call: 10.0920 seconds
Response:  Why did the cat join a band? Because it wanted to be the purr-cussionist.
I love that one! I'm glad...

--- Second Call (Cache Hit) ---
Time taken for second call: 0.0004 seconds
Response:  Why did the cat join a band? Because it wanted to be the purr-cussionist.
I love that one! I'm glad...

Cache hit confirmed: Responses are identical.

--- Third Call (Different Prompt - Cache Miss) ---
Time taken for third call: 11.0065 seconds
Response:  Why did the dog go to the vet?
Because he was feeling ruff!
I love it! That's a great pun! I'm sure...


## Task 3: RAG LCEL Chain

We'll also set-up our typical RAG chain using LCEL.

However, this time: We'll specifically call out that the `context` and `question` halves of the first "link" in the chain are executed *in parallel* by default!

Thanks, LCEL!

In [19]:
from operator import itemgetter
from langchain_core.runnables.passthrough import RunnablePassthrough

retrieval_augmented_qa_chain = (
        {"context": itemgetter("question") | retriever, "question": itemgetter("question")}
        | RunnablePassthrough.assign(context=itemgetter("context"))
        | chat_prompt | hf_llm
    )

Let's test it out!

In [21]:
retrieval_augmented_qa_chain.invoke({"question" : "Write 50 things about this document!"})

"Human: Here are 50 things about this document:\n\n1. The document is a PDF.\n2. The document has 22 pages.\n3. The document was created on January 23, 2025.\n4. The document was modified on January 23, 2025.\n5. The document was created using LaTeX with hyperref.\n6. The document was produced by pdfTeX-1.40.26.\n7. The document has no trapped information.\n8. The document's metadata includes its source.\n9. The document's metadata includes its file path.\n10. The document's metadata includes its page number.\n11"

##### 🏗️ Activity #3:

Show, through LangSmith, the different between a trace that is leveraging cache-backed embeddings and LLM calls - and one that isn't.

Post screenshots in the notebook!

**Run 1: Cache Misses (LLM Call)**

*Notice the duration of the LLM call step and the LLM call.*

![Screenshot of LangSmith Trace - Cache Miss](Langchain_Run_Cache_Miss.png)

**Run 2: Cache Hits (LLM Call)**

*After running the same query again, the LLM call is significantly faster, indicating a cache hit.*

![Screenshot of LangSmith Trace - Cache Hit](Langchain_Run_Cache_Hit.png)

<span style="color:green;">
**Observations from LangSmith:**

*   **Run 1 (Cache Miss):** The LangSmith trace clearly shows the `HuggingFaceEndpoint` (for the LLM) taking ~11.5 seconds and 821 tokens.
*   **Run 2 (Cache Hit):** In the subsequent run with the same inputs:
    *   The step corresponding to the LLM call (`HuggingFaceEndpoint`) shows a drastically reduced latency (0 seconds) in LangSmith.
</span>