Cell 1: Setup the environment


Set up environment variables and suppress warnings. Account for the deprecation of specific LLM models by selecting the appropriate model based on the current date.



In [None]:
# Import necessary modules
import os
from dotenv import load_dotenv, find_dotenv

# Load environment variables from a .env file
_ = load_dotenv(find_dotenv())  # Load .env file

# Suppress warnings related to deprecated functions or modules
import warnings
warnings.filterwarnings('ignore')

# Handle model deprecation based on current date
import datetime

# Get the current date
current_date = datetime.datetime.now().date()

# Define a target date after which a new model should be used
target_date = datetime.date(2024, 6, 12)

# Conditionally set the LLM model based on the current date
if current_date > target_date:
    llm_model = "gpt-3.5-turbo"
else:
    llm_model = "gpt-3.5-turbo-0301"


Cell 2: Load the product catalog from CSV


Load the product catalog from a CSV file using CSVLoader, which reads the file and converts it into a format that LangChain can process.



In [None]:
# Import necessary classes for document loading and vector search
from langchain.document_loaders import CSVLoader
from langchain.vectorstores import DocArrayInMemorySearch

# Define the path to the product catalog file
file = 'OutdoorClothingCatalog_1000.csv'

# Load the CSV file into a document format
loader = CSVLoader(file_path=file)

# Load the documents into a variable
docs = loader.load()

# Display the first document to understand its structure
docs[0]


Cell 3: Create embeddings for documents

Create embeddings for the documents using OpenAI embeddings, which will be used to perform similarity searches over the documents.



In [None]:
# Import OpenAI embeddings class
from langchain.embeddings import OpenAIEmbeddings

# Initialize embeddings using OpenAI's API
embeddings = OpenAIEmbeddings()

# Embed a query as an example
embed = embeddings.embed_query("Hi my name is Harrison")

# Check the length of the embedding and display the first 5 values
print(len(embed))
print(embed[:5])


Cell 4: Create a document search database


Use DocArrayInMemorySearch to create a database from the documents and embeddings. This allows performing similarity-based searches on the documents.

In [None]:
# Use DocArrayInMemorySearch to create an in-memory search database with documents and embeddings
db = DocArrayInMemorySearch.from_documents(docs, embeddings)

# Perform a similarity search based on a query related to sunblocking shirts
query = "Please suggest a shirt with sunblocking"
docs = db.similarity_search(query)

# Check the number of matching documents
len(docs)

# Display the first document in the results
docs[0]


Cell 5: Setup LLM-based Q&A over documents

Use RetrievalQA to perform a question-answering process over the document database using the retrieved documents and a language model.

In [None]:
# Convert the document database to a retriever to enable querying
retriever = db.as_retriever()

# Initialize the language model
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0.0, model=llm_model)

# Combine the content of all retrieved documents
qdocs = "".join([docs[i].page_content for i in range(len(docs))])

# Use the LLM to answer the question based on the retrieved documents
response = llm.call_as_llm(f"{qdocs} Question: Please list all your shirts with sun protection in a table in markdown and summarize each one.")

# Display the response in Markdown format
from IPython.display import display, Markdown
display(Markdown(response))


Cell 6: Use RetrievalQA for question answering

Set up RetrievalQA using the "stuff" chain type, where the chain retrieves relevant documents and uses the LLM to answer queries based on them.

In [None]:
# Import the RetrievalQA class
from langchain.chains import RetrievalQA

# Setup the RetrievalQA chain using "stuff" as the chain type
qa_stuff = RetrievalQA.from_chain_type(
    llm=llm, 
    chain_type="stuff", 
    retriever=retriever, 
    verbose=True
)

# Define the query for shirts with sun protection
query = "Please list all your shirts with sun protection in a table in markdown and summarize each one."

# Run the query and display the response in markdown format
response = qa_stuff.run(query)
display(Markdown(response))


Cell 7: VectorstoreIndexCreator to index and query documents


Use VectorstoreIndexCreator to create a vector search index over the documents, and then query it for relevant information.



In [None]:
# Import VectorstoreIndexCreator and necessary classes
from langchain.indexes import VectorstoreIndexCreator
from langchain.llms import OpenAI

# Create an index using VectorstoreIndexCreator
index = VectorstoreIndexCreator(
    vectorstore_cls=DocArrayInMemorySearch
).from_loaders([loader])

# Define the query for the index
query = "Please list all your shirts with sun protection in a table in markdown and summarize each one."

# Set up the LLM replacement model (due to model deprecation)
llm_replacement_model = OpenAI(temperature=0, model='gpt-3.5-turbo-instruct')

# Query the index and display the response
response = index.query(query, llm=llm_replacement_model)
display(Markdown(response))


Suggestions for further improvements:
a. Extend the question-answering functionality to handle multi-step queries across different product categories.
b. Add caching for embeddings to avoid recalculating them every time a query is made.


Cell 8: Extend Q&A for Multi-Step Queries across Different Product Categories

Extend the functionality to handle multi-step queries, allowing the user to ask about different product categories (e.g., shirts, pants, jackets) in a single conversation.

In [None]:
# Define a multi-step query function for different product categories
def multi_step_query(categories, retriever, llm):
    responses = {}
    
    # Iterate through each product category and run a query
    for category in categories:
        query = f"Please list all your {category} with sun protection in a table in markdown and summarize each one."
        response = qa_stuff.run(query)
        responses[category] = response
    
    return responses

# Example categories to query for
categories = ["shirts", "pants", "jackets"]

# Run the multi-step query function
responses = multi_step_query(categories, retriever, llm)

# Display the results for each category in markdown format
from IPython.display import display, Markdown

for category, response in responses.items():
    print(f"Category: {category}")
    display(Markdown(response))


Cell 9: Add Caching for Embeddings to Avoid Recalculating

Implement a caching mechanism for embeddings to avoid recalculating them every time a query is made, improving efficiency.



In [None]:
# Import necessary modules for caching
import os
import pickle

# Define the file path for cached embeddings
cache_file = "embeddings_cache.pkl"

# Function to load embeddings from cache if available
def load_embeddings_cache():
    if os.path.exists(cache_file):
        with open(cache_file, 'rb') as f:
            cached_db = pickle.load(f)
        print("Loaded embeddings from cache.")
        return cached_db
    return None

# Function to save embeddings to cache
def save_embeddings_cache(db):
    with open(cache_file, 'wb') as f:
        pickle.dump(db, f)
    print("Embeddings saved to cache.")

# Load cached embeddings if they exist
cached_db = load_embeddings_cache()

# If no cached embeddings, calculate and cache them
if cached_db is None:
    db = DocArrayInMemorySearch.from_documents(docs, embeddings)
    save_embeddings_cache(db)
else:
    db = cached_db

# Continue with similarity search and Q&A process using cached or newly created embeddings
query = "Please suggest a shirt with sunblocking"
docs = db.similarity_search(query)

# Display the first matching document
docs[0]


Suggestions for further improvements:
a. Add logging to track how many queries are made across different categories.
b. Implement a more sophisticated cache invalidation strategy to handle updated data.







Cell 10: Add Logging to Track Queries Across Different Categories

Implement logging to track how many queries are made for each product category, along with timestamps and the details of the queries.



In [None]:
# Import the logging module
import logging
from datetime import datetime

# Configure logging to write to a file with timestamps
logging.basicConfig(filename='query_log.log', 
                    level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Function to log query details
def log_query(category, query):
    logging.info(f"Query made for category: {category} | Query: {query}")

# Modify the multi-step query function to include logging
def multi_step_query_with_logging(categories, retriever, llm):
    responses = {}
    
    # Iterate through each product category and run a query
    for category in categories:
        query = f"Please list all your {category} with sun protection in a table in markdown and summarize each one."
        
        # Log the query before executing it
        log_query(category, query)
        
        # Run the query and store the response
        response = qa_stuff.run(query)
        responses[category] = response
    
    return responses

# Example categories to query for
categories = ["shirts", "pants", "jackets"]

# Run the multi-step query with logging
responses = multi_step_query_with_logging(categories, retriever, llm)

# Display the results for each category in markdown format
from IPython.display import display, Markdown

for category, response in responses.items():
    print(f"Category: {category}")
    display(Markdown(response))


Cell 11: Implement a More Sophisticated Cache Invalidation Strategy

In [None]:
# Import os module to check file modification time
import os
import time

# Define the cache file path and the document source file path
cache_file = "embeddings_cache.pkl"
document_file = "OutdoorClothingCatalog_1000.csv"

# Function to check if cache needs invalidation (i.e., if the document has been modified since cache was created)
def is_cache_valid(cache_file, document_file):
    # Check if cache file exists
    if not os.path.exists(cache_file):
        return False  # Cache is not valid if it doesn't exist

    # Get the modification time of the document and cache files
    document_mtime = os.path.getmtime(document_file)
    cache_mtime = os.path.getmtime(cache_file)

    # Cache is invalid if the document has been modified after the cache was created
    return cache_mtime > document_mtime

# Function to load cache if valid, or regenerate cache if invalid
def load_or_regenerate_cache(docs, embeddings):
    if is_cache_valid(cache_file, document_file):
        print("Cache is valid. Loading embeddings from cache.")
        with open(cache_file, 'rb') as f:
            cached_db = pickle.load(f)
        return cached_db
    else:
        print("Cache is invalid or missing. Regenerating embeddings.")
        db = DocArrayInMemorySearch.from_documents(docs, embeddings)
        save_embeddings_cache(db)  # Save new cache after regenerating
        return db

# Function to save embeddings to cache
def save_embeddings_cache(db):
    with open(cache_file, 'wb') as f:
        pickle.dump(db, f)
    print("Embeddings saved to cache.")

# Load documents and embeddings, either from cache or regenerate if invalid
db = load_or_regenerate_cache(docs, embeddings)

# Continue with similarity search and Q&A process using the cache or newly created embeddings
query = "Please suggest a shirt with sunblocking"
docs = db.similarity_search(query)

# Display the first matching document
docs[0]


Suggestions for further improvements:
a. Add error handling to ensure robustness in case of missing files or corrupted caches.
b. Implement periodic cache expiration to refresh embeddings periodically, even if the source document hasn't changed.

Cell 12: Add Error Handling for Missing Files or Corrupted Caches

Add error handling to manage situations where files (such as cache or source document) are missing or the cache is corrupted, ensuring the program continues to function correctly.



In [None]:
import os
import pickle

# Function to load cache with error handling for missing or corrupted files
def load_embeddings_cache_with_error_handling():
    try:
        if os.path.exists(cache_file):
            with open(cache_file, 'rb') as f:
                cached_db = pickle.load(f)
            print("Loaded embeddings from cache.")
            return cached_db
        else:
            print("Cache file not found.")
            return None
    except (pickle.UnpicklingError, EOFError):
        print("Cache file is corrupted. Regenerating embeddings.")
        return None

# Function to save embeddings to cache with error handling
def save_embeddings_cache_with_error_handling(db):
    try:
        with open(cache_file, 'wb') as f:
            pickle.dump(db, f)
        print("Embeddings saved to cache.")
    except Exception as e:
        print(f"Failed to save embeddings to cache: {e}")

# Function to regenerate embeddings if cache is missing or invalid
def load_or_regenerate_cache_with_error_handling(docs, embeddings):
    try:
        if is_cache_valid(cache_file, document_file):
            print("Cache is valid. Loading embeddings from cache.")
            return load_embeddings_cache_with_error_handling()
        else:
            print("Cache is invalid or missing. Regenerating embeddings.")
            db = DocArrayInMemorySearch.from_documents(docs, embeddings)
            save_embeddings_cache_with_error_handling(db)  # Save new cache
            return db
    except Exception as e:
        print(f"Error during cache handling: {e}")
        return None

# Load documents and embeddings with error handling
db = load_or_regenerate_cache_with_error_handling(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Cell 13: Implement Periodic Cache Expiration


Implement periodic cache expiration to refresh embeddings periodically, even if the source document hasn't changed, ensuring the cache stays updated.



In [None]:
import time

# Define the cache expiration time in seconds (e.g., 7 days = 7 * 24 * 60 * 60)
CACHE_EXPIRATION_TIME = 7 * 24 * 60 * 60  # 7 days

# Function to check if the cache is expired based on last modification time
def is_cache_expired(cache_file):
    if os.path.exists(cache_file):
        cache_mtime = os.path.getmtime(cache_file)
        current_time = time.time()
        # Check if cache is older than expiration time
        return (current_time - cache_mtime) > CACHE_EXPIRATION_TIME
    return True  # If cache doesn't exist, consider it expired

# Function to check cache validity, combining document modification and expiration logic
def is_cache_valid_with_expiration(cache_file, document_file):
    # Check if cache file exists and is not expired
    if is_cache_expired(cache_file):
        print("Cache is expired. Regenerating embeddings.")
        return False

    # Also ensure document has not been modified since cache was created
    document_mtime = os.path.getmtime(document_file)
    cache_mtime = os.path.getmtime(cache_file)

    return cache_mtime > document_mtime

# Regenerate cache if it's expired or invalid
def load_or_regenerate_cache_with_expiration(docs, embeddings):
    if is_cache_valid_with_expiration(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings.")
        db = DocArrayInMemorySearch.from_documents(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with expiration handling
db = load_or_regenerate_cache_with_expiration(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Suggestions for further improvements:
a. Implement notifications or warnings when the cache is regenerated due to expiration.
b. Add parallel processing for embedding generation to speed up the regeneration process if the document is large.

Cell 14: Implement Notifications or Warnings When Cache is Regenerated Due to Expiration

Add notifications (or warnings) to alert when the cache is expired and regenerated, providing clear feedback to the user.

In [None]:
import warnings

# Function to notify when the cache is being regenerated due to expiration
def notify_cache_regeneration(reason):
    warnings.warn(f"Cache is being regenerated due to {reason}.", UserWarning)

# Function to check cache validity with expiration and notify if it's invalid
def is_cache_valid_with_notification(cache_file, document_file):
    # Check if cache file exists and is not expired
    if is_cache_expired(cache_file):
        notify_cache_regeneration("expiration")
        return False

    # Ensure document has not been modified since cache was created
    document_mtime = os.path.getmtime(document_file)
    cache_mtime = os.path.getmtime(cache_file)

    if document_mtime > cache_mtime:
        notify_cache_regeneration("document modification")
        return False

    return True

# Regenerate cache if it's expired or invalid with notification
def load_or_regenerate_cache_with_notification(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings.")
        db = DocArrayInMemorySearch.from_documents(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with notification handling
db = load_or_regenerate_cache_with_notification(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Cell 15: Add Parallel Processing for Embedding Generation

Implement parallel processing using Python's concurrent.futures module to speed up the embedding generation process for large documents.

In [None]:
import concurrent.futures

# Function to generate embeddings in parallel for each document
def generate_embeddings_in_parallel(docs, embeddings):
    def generate_single_embedding(doc):
        return embeddings.embed_query(doc.page_content)

    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Map the documents to embedding generation, parallelizing the process
        results = list(executor.map(generate_single_embedding, docs))
    
    return results

# Function to regenerate the document search database with parallel embedding generation
def regenerate_database_with_parallel_processing(docs, embeddings):
    print("Generating embeddings in parallel...")
    
    # Generate embeddings in parallel for all documents
    embedded_docs = generate_embeddings_in_parallel(docs, embeddings)
    
    # Create a document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Function to load or regenerate cache with parallel processing for large documents
def load_or_regenerate_cache_with_parallel_processing(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings with parallel processing.")
        db = regenerate_database_with_parallel_processing(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with parallel processing and notifications
db = load_or_regenerate_cache_with_parallel_processing(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Suggestions for further improvements:
a. Add a progress bar to track the embedding generation process for large datasets.
b. Implement a fallback to sequential processing if parallel processing fails due to resource constraints.

Cell 16: Add a Progress Bar to Track Embedding Generation for Large Datasets


Integrate a progress bar to track the progress of embedding generation using tqdm, which is helpful for monitoring the process on large datasets.



In [None]:
# Install tqdm if not already installed
# !pip install tqdm

from tqdm import tqdm
import concurrent.futures

# Function to generate embeddings in parallel with progress tracking
def generate_embeddings_with_progress(docs, embeddings):
    def generate_single_embedding(doc):
        return embeddings.embed_query(doc.page_content)

    # Initialize tqdm progress bar
    with tqdm(total=len(docs), desc="Generating embeddings") as pbar:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Use parallel processing and track progress using tqdm
            futures = [executor.submit(generate_single_embedding, doc) for doc in docs]
            results = []
            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())
                pbar.update(1)  # Update progress bar for each completed task
    
    return results

# Function to regenerate document search database with progress bar
def regenerate_database_with_progress(docs, embeddings):
    print("Generating embeddings in parallel with progress tracking...")
    
    # Generate embeddings with progress bar
    embedded_docs = generate_embeddings_with_progress(docs, embeddings)
    
    # Create document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Regenerate cache with progress bar
def load_or_regenerate_cache_with_progress(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings with progress tracking.")
        db = regenerate_database_with_progress(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with progress tracking
db = load_or_regenerate_cache_with_progress(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Cell 17: Implement a Fallback to Sequential Processing if Parallel Processing Fails


Add error handling to switch from parallel processing to sequential processing if parallel processing fails due to resource constraints.



In [None]:
# Function to generate embeddings sequentially as a fallback
def generate_embeddings_sequentially(docs, embeddings):
    print("Parallel processing failed. Falling back to sequential embedding generation...")
    
    # Sequential embedding generation with progress bar
    embedded_docs = []
    for doc in tqdm(docs, desc="Generating embeddings sequentially"):
        embedded_docs.append(embeddings.embed_query(doc.page_content))
    
    return embedded_docs

# Function to generate embeddings with fallback to sequential processing
def generate_embeddings_with_fallback(docs, embeddings):
    try:
        # Try generating embeddings in parallel
        return generate_embeddings_with_progress(docs, embeddings)
    except (RuntimeError, concurrent.futures.ProcessPoolExecutor):
        # On failure, fallback to sequential processing
        return generate_embeddings_sequentially(docs, embeddings)

# Function to regenerate document search database with fallback mechanism
def regenerate_database_with_fallback(docs, embeddings):
    print("Generating embeddings with fallback in case of failure...")
    
    # Generate embeddings with fallback mechanism
    embedded_docs = generate_embeddings_with_fallback(docs, embeddings)
    
    # Create document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Regenerate cache with fallback to sequential processing
def load_or_regenerate_cache_with_fallback(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings with fallback mechanism.")
        db = regenerate_database_with_fallback(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with fallback mechanism
db = load_or_regenerate_cache_with_fallback(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Suggestions for further improvements:
a. Add detailed logging for parallel vs. sequential processing to analyze performance trade-offs.
b. Implement asynchronous handling to allow the embedding process to continue without blocking other operations in the application.

Cell 18: Add Detailed Logging for Parallel vs. Sequential Processing to Analyze Performance Trade-offs

Log detailed information about the performance (time taken) for parallel and sequential processing, allowing analysis of trade-offs between both methods.

In [None]:
import time
import logging

# Configure logging to track processing times
logging.basicConfig(filename='processing_time.log', 
                    level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Function to log the processing time for parallel or sequential processing
def log_processing_time(method, start_time, end_time):
    elapsed_time = end_time - start_time
    logging.info(f"Processing method: {method} | Time taken: {elapsed_time:.2f} seconds")

# Function to generate embeddings in parallel with logging
def generate_embeddings_with_logging(docs, embeddings):
    start_time = time.time()  # Record start time
    try:
        # Try generating embeddings in parallel with progress
        embedded_docs = generate_embeddings_with_progress(docs, embeddings)
        end_time = time.time()  # Record end time
        log_processing_time("Parallel Processing", start_time, end_time)
        return embedded_docs
    except (RuntimeError, concurrent.futures.ProcessPoolExecutor):
        # On failure, log and fallback to sequential processing
        end_time = time.time()  # Record end time for parallel failure
        log_processing_time("Parallel Processing (Failed)", start_time, end_time)

        # Switch to sequential processing
        start_time = time.time()  # Record start time for sequential
        embedded_docs = generate_embeddings_sequentially(docs, embeddings)
        end_time = time.time()  # Record end time for sequential
        log_processing_time("Sequential Processing", start_time, end_time)
        return embedded_docs

# Function to regenerate document search database with logging
def regenerate_database_with_logging(docs, embeddings):
    print("Generating embeddings with detailed logging...")
    
    # Generate embeddings with fallback and logging
    embedded_docs = generate_embeddings_with_logging(docs, embeddings)
    
    # Create document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Regenerate cache with logging for performance analysis
def load_or_regenerate_cache_with_logging(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings with detailed logging.")
        db = regenerate_database_with_logging(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Load documents and embeddings with logging
db = load_or_regenerate_cache_with_logging(docs, embeddings)

# If db is None, ensure the program doesn't crash
if db is not None:
    # Continue with similarity search and Q&A process using the cache or newly created embeddings
    query = "Please suggest a shirt with sunblocking"
    docs = db.similarity_search(query)

    # Display the first matching document
    docs[0]
else:
    print("Unable to proceed due to cache or document issues.")


Cell 19: Implement Asynchronous Handling to Allow Non-blocking Embedding Generation

Use asynchronous programming with asyncio to allow embedding generation to run in the background without blocking other operations.



In [None]:
import asyncio

# Define an asynchronous function to generate embeddings in parallel
async def generate_embeddings_async(docs, embeddings):
    loop = asyncio.get_event_loop()
    futures = []
    
    for doc in docs:
        # Use loop.run_in_executor to run embedding generation asynchronously
        futures.append(loop.run_in_executor(None, embeddings.embed_query, doc.page_content))
    
    # Asynchronously gather all results
    embedded_docs = await asyncio.gather(*futures)
    
    return embedded_docs

# Function to regenerate document search database asynchronously
async def regenerate_database_async(docs, embeddings):
    print("Generating embeddings asynchronously...")
    
    # Generate embeddings asynchronously
    embedded_docs = await generate_embeddings_async(docs, embeddings)
    
    # Create a document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Function to load or regenerate cache asynchronously
async def load_or_regenerate_cache_async(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings asynchronously.")
        db = await regenerate_database_async(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Main function to handle asynchronous embedding generation
async def main():
    db = await load_or_regenerate_cache_async(docs, embeddings)
    
    # If db is None, ensure the program doesn't crash
    if db is not None:
        # Continue with similarity search and Q&A process using the cache or newly created embeddings
        query = "Please suggest a shirt with sunblocking"
        docs = db.similarity_search(query)
        
        # Display the first matching document
        docs[0]
    else:
        print("Unable to proceed due to cache or document issues.")

# Run the asynchronous main function
asyncio.run(main())


Suggestions for further improvements:
a. Combine asynchronous handling with real-time progress tracking to monitor embedding generation without blocking the application.
b. Add retry mechanisms for failed asynchronous tasks to ensure reliable embedding generation even when facing network or resource issues.

Cell 20: Combine Asynchronous Handling with Real-Time Progress Tracking

Use tqdm to track the progress of embedding generation in real-time while running the task asynchronously, ensuring the process doesn't block other operations.

In [None]:
import asyncio
from tqdm.asyncio import tqdm_asyncio

# Define an asynchronous function to generate embeddings in parallel with real-time progress tracking
async def generate_embeddings_async_with_progress(docs, embeddings):
    loop = asyncio.get_event_loop()
    futures = []
    
    for doc in docs:
        # Use loop.run_in_executor to run embedding generation asynchronously
        futures.append(loop.run_in_executor(None, embeddings.embed_query, doc.page_content))
    
    # Asynchronously gather all results while showing progress
    embedded_docs = []
    for result in tqdm_asyncio.as_completed(futures, desc="Generating embeddings asynchronously", total=len(futures)):
        embedded_docs.append(await result)
    
    return embedded_docs

# Function to regenerate document search database asynchronously with progress tracking
async def regenerate_database_async_with_progress(docs, embeddings):
    print("Generating embeddings asynchronously with progress tracking...")
    
    # Generate embeddings asynchronously with progress tracking
    embedded_docs = await generate_embeddings_async_with_progress(docs, embeddings)
    
    # Create document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Function to load or regenerate cache asynchronously with progress tracking
async def load_or_regenerate_cache_async_with_progress(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings asynchronously with progress tracking.")
        db = await regenerate_database_async_with_progress(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Main function to handle asynchronous embedding generation with progress tracking
async def main_with_progress():
    db = await load_or_regenerate_cache_async_with_progress(docs, embeddings)
    
    # If db is None, ensure the program doesn't crash
    if db is not None:
        # Continue with similarity search and Q&A process using the cache or newly created embeddings
        query = "Please suggest a shirt with sunblocking"
        docs = db.similarity_search(query)
        
        # Display the first matching document
        docs[0]
    else:
        print("Unable to proceed due to cache or document issues.")

# Run the asynchronous main function with progress tracking
asyncio.run(main_with_progress())


Cell 21: Add Retry Mechanisms for Failed Asynchronous Tasks

Add a retry mechanism using tenacity to automatically retry failed embedding generation tasks, ensuring robustness in the face of network or resource issues.



In [None]:
# Install tenacity for retry mechanisms
# !pip install tenacity

from tenacity import retry, stop_after_attempt, wait_fixed

# Function to retry failed embedding generation up to 3 times with a fixed wait time
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def generate_single_embedding_with_retry(doc, embeddings):
    loop = asyncio.get_event_loop()
    # Use loop.run_in_executor to run embedding generation asynchronously
    return await loop.run_in_executor(None, embeddings.embed_query, doc.page_content)

# Define an asynchronous function to generate embeddings with retry mechanisms
async def generate_embeddings_async_with_retries(docs, embeddings):
    futures = []
    
    for doc in docs:
        # Generate embeddings with retry mechanism
        futures.append(generate_single_embedding_with_retry(doc, embeddings))
    
    # Asynchronously gather all results while showing progress
    embedded_docs = []
    for result in tqdm_asyncio.as_completed(futures, desc="Generating embeddings with retries", total=len(futures)):
        embedded_docs.append(await result)
    
    return embedded_docs

# Function to regenerate document search database asynchronously with retry mechanism
async def regenerate_database_async_with_retries(docs, embeddings):
    print("Generating embeddings asynchronously with retry mechanism...")
    
    # Generate embeddings asynchronously with retries
    embedded_docs = await generate_embeddings_async_with_retries(docs, embeddings)
    
    # Create document search database using the embeddings
    db = DocArrayInMemorySearch.from_embeddings(embedded_docs, docs)
    
    return db

# Function to load or regenerate cache asynchronously with retries
async def load_or_regenerate_cache_async_with_retries(docs, embeddings):
    if is_cache_valid_with_notification(cache_file, document_file):
        print("Cache is valid and not expired. Loading embeddings from cache.")
        return load_embeddings_cache_with_error_handling()
    else:
        print("Cache is invalid or expired. Regenerating embeddings asynchronously with retry mechanism.")
        db = await regenerate_database_async_with_retries(docs, embeddings)
        save_embeddings_cache_with_error_handling(db)  # Save new cache
        return db

# Main function to handle asynchronous embedding generation with retry mechanism
async def main_with_retries():
    db = await load_or_regenerate_cache_async_with_retries(docs, embeddings)
    
    # If db is None, ensure the program doesn't crash
    if db is not None:
        # Continue with similarity search and Q&A process using the cache or newly created embeddings
        query = "Please suggest a shirt with sunblocking"
        docs = db.similarity_search(query)
        
        # Display the first matching document
        docs[0]
    else:
        print("Unable to proceed due to cache or document issues.")

# Run the asynchronous main function with retry mechanism
asyncio.run(main_with_retries())


Suggestions for further improvements:
a. Implement more advanced retry policies (e.g., exponential backoff) to handle different types of failures more gracefully.
b. Introduce concurrent task management to limit the number of parallel tasks to avoid overwhelming system resources in low-memory environments.






