## Expert Knowledge Worker

### A question answering agent that is an expert knowledge worker
### To be used by employees of Insurellm, an Insurance Tech company
### The agent needs to be accurate and the solution should be low cost.

This project will use RAG (Retrieval Augmented Generation) to ensure our question/answering assistant has high accuracy.

In [7]:
# imports

import os
import glob
from dotenv import load_dotenv
import gradio as gr

In [30]:
# imports for langchain and Chroma and plotly

from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain

In [31]:
# price is a factor for our company, so we're going to use a low cost model

MODEL = "gpt-4o-mini"
db_name = "vector_db"

In [32]:
# Load environment variables in a file called .env

load_dotenv()
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')

In [None]:
from typing import Union
import docx 
import docx2txt 
import PyPDF2
from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    Docx2txtLoader,
    CSVLoader
)

from typing import Optional, Union, Dict, Set
from pathlib import Path
from langchain.document_loaders import (
    TextLoader,
    PyPDFLoader,
    Docx2txtLoader,
    CSVLoader,
    UnstructuredMarkdownLoader,
    JSONLoader
)

class UnsupportedFileTypeError(Exception):
    """Custom exception for unsupported file types."""
    pass

def get_loader_class(
    file_path: Union[str, Path],
    strict_mode: bool = False
) -> Optional[Union[TextLoader, PyPDFLoader, Docx2txtLoader, CSVLoader, UnstructuredMarkdownLoader, JSONLoader]]:
    """
    Determine the appropriate document loader based on file extension.
    
    Args:
        file_path (Union[str, Path]): Path to the file that needs to be loaded
        strict_mode (bool): If True, raises an exception for unsupported file types
                          If False, returns None for unsupported types
    
    Returns:
        Optional[Union[...]]: Appropriate loader class for the file type, or None if unsupported
        
    Raises:
        UnsupportedFileTypeError: If strict_mode is True and file type is unsupported
        ValueError: If file_path is empty or invalid
    """
    if not file_path:
        raise ValueError("File path cannot be empty")
    
    # Convert to Path object for more robust path handling
    path = Path(file_path)
    
    # Get lowercase extension without the dot
    try:
        extension = path.suffix.lower().lstrip('.')
    except Exception as e:
        raise ValueError(f"Invalid file path format: {str(e)}")
    
    # Define skip extensions
    SKIP_EXTENSIONS: Set[str] = {
        'zip', 'exe', 'bin', 'ppt', 'pptx', 
        'jpg', 'jpeg', 'png', 'gif', 'bmp',  # Skip binary image formats
        'mp3', 'mp4', 'wav', 'avi',          # Skip media files
        'db', 'sqlite', 'sqlite3'            # Skip database files
    }
    
    # Define loader mapping
    LOADER_MAP: Dict[str, type] = {
        'pdf': PyPDFLoader,
        'txt': TextLoader,
        'text': TextLoader,
        'doc': TextLoader,         # Basic text handling for .doc
        'docx': Docx2txtLoader,
        'csv': CSVLoader,
        'md': UnstructuredMarkdownLoader,
        'json': JSONLoader
    }
    
    # Check if file type should be skipped
    if extension in SKIP_EXTENSIONS:
        if strict_mode:
            raise UnsupportedFileTypeError(
                f"File type '.{extension}' is not supported for loading"
            )
        return None
    
    # Get appropriate loader or default to TextLoader
    loader_class = LOADER_MAP.get(extension)
    
    if loader_class is None and strict_mode:
        raise UnsupportedFileTypeError(
            f"No specific loader found for '.{extension}' files"
        )
    
    return loader_class or TextLoader  # Default to TextLoader if no specific loader found

def get_file_metadata(file_path: Union[str, Path]) -> dict:
    """
    Get metadata about the file to be loaded.
    
    Args:
        file_path (Union[str, Path]): Path to the file
        
    Returns:
        dict: Dictionary containing file metadata
    """
    path = Path(file_path)
    return {
        'filename': path.name,
        'extension': path.suffix.lower().lstrip('.'),
        'size_bytes': path.stat().st_size if path.exists() else None,
        'absolute_path': str(path.absolute()),
        'exists': path.exists(),
        'is_file': path.is_file() if path.exists() else None
    }

def load_documents(base_path: str) -> list:
    """Load documents from the specified path with appropriate loaders."""
    documents = []
    text_loader_kwargs = {'encoding': 'utf-8'}
    
    if not os.path.exists(base_path):
        print(f"Directory not found: {base_path}")
        return documents
        
    for root, _, files in os.walk(base_path):
        for file in files:
            file_path = os.path.join(root, file)
            try:
                # Get appropriate loader
                loader_class = get_loader_class(file_path)
                
                # Skip if no loader is available for this file type
                if loader_class is None:
                    print(f"⚠ Skipping unsupported file: {file_path}")
                    continue
                
                # Initialize loader with appropriate arguments
                if loader_class == TextLoader:
                    loader = loader_class(file_path, **text_loader_kwargs)
                elif loader_class == CSVLoader:
                    loader = loader_class(file_path, encoding='utf-8')
                else:
                    loader = loader_class(file_path)
                
                # Load document
                docs = loader.load()
                
                # Add metadata
                doc_type = os.path.basename(root)
                for doc in docs:
                    doc.metadata.update({
                        "doc_type": doc_type,
                        "source": file_path,
                        "file_type": file_path.split('.')[-1],
                        "file_name": file
                    })
                    documents.append(doc)
                
                print(f"✓ Successfully loaded: {file_path}")
                
            except Exception as e:
                print(f"✗ Error loading {file_path}: {str(e)}")
                continue
    
    return documents

# Usage
base_path = "WILP_ASSIGNMENT_WORK"
documents = load_documents(base_path)
print(f"\nTotal documents loaded: {len(documents)}")

# Print document statistics
if documents:
    file_types = {}
    for doc in documents:
        file_type = doc.metadata.get('file_type', 'unknown')
        file_types[file_type] = file_types.get(file_type, 0) + 1
    
    print("\nDocument breakdown by file type:")
    for file_type, count in file_types.items():
        print(f"- {file_type}: {count} documents")

# Please note:

In the next cell, we split the text into chunks.

2 students let me know that the next cell crashed their computer.  
They were able to fix it by changing the chunk_size from 1,000 to 2,000 and the chunk_overlap from 200 to 400.  
This shouldn't be required; but if it happens to you, please make that change!  
(Note that LangChain may give a warning about a chunk being larger than 1,000 - this can be safely ignored).

_With much thanks to Steven W and Nir P for this valuable contribution._

In [48]:
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_documents(documents)

In [49]:
len(chunks)

393

In [50]:
doc_types = set(chunk.metadata['doc_type'] for chunk in chunks)
print(f"Document types found: {', '.join(doc_types)}")

Document types found: uart_led_lab, WILP Final Dissertation, bitsfinalreportdocs, WILP_ASSIGNMENT_WORK, cpu_risc-master


## A sidenote on Embeddings, and "Auto-Encoding LLMs"

We will be mapping each chunk of text into a Vector that represents the meaning of the text, known as an embedding.

OpenAI offers a model to do this, which we will use by calling their API with some LangChain code.

This model is an example of an "Auto-Encoding LLM" which generates an output given a complete input.
It's different to all the other LLMs we've discussed today, which are known as "Auto-Regressive LLMs", and generate future tokens based only on past context.

Another example of an Auto-Encoding LLMs is BERT from Google. In addition to embedding, Auto-encoding LLMs are often used for classification.

### Sidenote

In week 8 we will return to RAG and vector embeddings, and we will use an open-source vector encoder so that the data never leaves our computer - that's an important consideration when building enterprise systems and the data needs to remain internal.

In [51]:
# Put the chunks of data into a Vector Store that associates a Vector Embedding with each chunk

embeddings = OpenAIEmbeddings()

# If you would rather use the free Vector Embeddings from HuggingFace sentence-transformers
# Then replace embeddings = OpenAIEmbeddings()
# with:
# from langchain.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

In [52]:
# Check if a Chroma Datastore already exists - if so, delete the collection to start from scratch

if os.path.exists(db_name):
    Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()

In [53]:
os.path.exists(db_name)

True

In [54]:
# Create our Chroma vectorstore!

vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)
print(f"Vectorstore created with {vectorstore._collection.count()} documents")

Vectorstore created with 393 documents


In [55]:
# Get one vector and find how many dimensions it has

collection = vectorstore._collection
sample_embedding = collection.get(limit=1, include=["embeddings"])["embeddings"][0]
dimensions = len(sample_embedding)
print(f"The vectors have {dimensions:,} dimensions")

The vectors have 1,536 dimensions


In [56]:
len(sample_embedding)

1536

## Visualizing the Vector Store

Let's take a minute to look at the documents and their embedding vectors to see what's going on.

In [57]:
def get_color_for_doc_type(doc_type, color_mapping=None):
    if color_mapping is None:
        # Define default color mapping
        color_mapping = {
            'WILP_ASSIGNMENT_WORK': 'black',
            'cpu_risc-master': 'blue',
            'cpu_risc-master_default': 'green',
            'uart_led_lab': 'red',
            'WILP Final Dissertation': 'orange'
        }
    # Return a default color if doc_type not in mapping
    return color_mapping.get(doc_type, 'gray')  # 'gray' is the default color

# Get document types from metadata
doc_types = [metadata['doc_type'] for metadata in result['metadatas']]

# Map to colors with default handling
colors = [get_color_for_doc_type(t) for t in doc_types]

# Print unique document types to help debug
unique_doc_types = set(doc_types)
print("Found document types:", unique_doc_types)

# Custom color mapping
custom_colors = {
    'WILP_ASSIGNMENT_WORK': 'black',
    'cpu_risc-master': 'blue',
    'cpu_risc-master_default': 'green',
    'uart_led_lab': 'red',
    'WILP Final Dissertation': 'orange',
    'bitsfinalreportdocs': 'purple'  # Add new document type
}

colors = [get_color_for_doc_type(t, custom_colors) for t in doc_types]

NameError: name 'result' is not defined

In [58]:
# We humans find it easier to visalize things in 2D!
# Reduce the dimensionality of the vectors to 2D using t-SNE
# (t-distributed stochastic neighbor embedding)

tsne = TSNE(n_components=2, random_state=42)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 2D scatter plot
fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='2D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x',yaxis_title='y'),
    width=800,
    height=600,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

NameError: name 'vectors' is not defined

In [59]:
# Let's try 3D!

tsne = TSNE(n_components=3, random_state=42)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    z=reduced_vectors[:, 2],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='3D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x', yaxis_title='y', zaxis_title='z'),
    width=900,
    height=700,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

NameError: name 'vectors' is not defined

In [61]:
# create a new Chat with OpenAI
llm = ChatOpenAI(temperature=0.7, model_name=MODEL)

# set up the conversation memory for the chat
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

# the retriever is an abstraction over the VectorStore that will be used during RAG
retriever = vectorstore.as_retriever()

# putting it together: set up the conversation chain with the GPT 4o-mini LLM, the vector store and memory
conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)

In [64]:
query = "Can you describe the HOST in DUT components in a few sentences"
result = conversation_chain.invoke({"question":query})
print(result["answer"])

The HOST in DUT components refers to the VIP (Verification Intellectual Property) used to generate host stimulus following the host protocol. It includes two types of VIPs: the PCIe VIP, which can generate various PCIe GEN3 data traffic over 4 lanes, and the SD VIP, which can generate data and control signals supported by the SD protocol. The HOST is essential for interacting with the DUT by simulating external conditions and stimuli.


In [65]:
# set up a new conversation memory for the chat
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

# putting it together: set up the conversation chain with the GPT 4o-mini LLM, the vector store and memory
conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)

In [66]:
# Wrapping in a function - note that history isn't used, as the memory is in the conversation_chain

def chat(message, history):
    result = conversation_chain.invoke({"question": message})
    return result["answer"]

In [67]:
# And in Gradio:

view = gr.ChatInterface(chat, type="messages").launch(inbrowser=True)

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.
