In [None]:
!pip install langchain langchain-community langchain-chroma langchain-ollama langchain-huggingface sentence-transformers transformers pypdf

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
CSV_PATH = "./gdrive/MyDrive/Agroknow/Incidents"
PDF_PATH = "./gdrive/MyDrive/Agroknow/PDFs"
CHROMA_PATH = "./gdrive/MyDrive/Agroknow/chroma"
FAISS_INDEX_PATH = "./faiss"

In [None]:
import os
from tqdm.notebook import tqdm
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain.schema import Document
from langchain_chroma import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain_ollama import OllamaLLM
from langchain_community.document_loaders.pdf import PyPDFDirectoryLoader, PyPDFLoader, UnstructuredPDFLoader
from langchain_huggingface import HuggingFacePipeline, HuggingFaceEmbeddings


def load_and_chunk_csv(CSV_PATH):
    """
    Load and process CSV files to create Document objects for use with LangChain using Pandas.

    Args:
        CSV_PATH (str): Path to the directory containing CSV files.

    Returns:
        list[Document]: A list of processed Document objects.
    """
    print("🔍 Loading and chunking CSV data with Pandas...")
    chunks = []

    # Check if CSV_PATH is a directory
    if not os.path.isdir(CSV_PATH):
        print(f"Error: {CSV_PATH} is not a directory")
        return chunks

    # Process each CSV file in the directory
    for filename in os.listdir(CSV_PATH):
        if filename.endswith('.csv'):
            file_path = os.path.join(CSV_PATH, filename)
            try:
                # Read the CSV into a Pandas DataFrame
                df = pd.read_csv(file_path, sep='\t')
                df['summary'] = df['summary'].fillna(df['description'])

                # Process each row in the DataFrame
                for _, row in df.iterrows():
                    # Extract relevant information
                    title = row.get('title', '')
                    date = row.get('date', '')
                    hazard = row.get('hazard', '')
                    summary = row.get('summary', '')
                    product = row.get('product', '')
                    origin = row.get('origin', '')
                    year = row.get('date', '').split('-')[0] if pd.notna(row.get('date', '')) else ''  # Extract year from date

                    # Create a concise content string
                    content = (
                        f"Title: {title}\n"
                        f"Date: {date}\n"
                        f"Hazard: {hazard}\n"
                        f"Product: {product}\n"
                        f"Origin: {origin}\n"
                        f"Year: {year}\n"
                        f"Summary: {summary}"
                    )

                    # Create a Document object
                    chunk = Document(
                        page_content=content,
                        metadata={
                            "title": title,
                            "date": date,
                            "hazard": hazard,
                            "product": product,
                            "origin": origin,
                            "year": year,
                            "source_file": filename  # Optional: Track source file
                        }
                    )
                    chunks.append(chunk)

            except Exception as e:
                print(f"An unexpected error occurred with {filename}: {str(e)}")

    print(f"✂️ Loaded and processed {len(chunks)} chunks from CSV files using Pandas")
    return chunks


def load_and_split_pdfs(PDF_PATH):
    """Load and split PDFs from a directory with a progress bar."""
    print("🔍 Loading PDF documents...")

    if not os.path.isdir(PDF_PATH):
        print(f"❌ Error: {PDF_PATH} is not a directory")
        return []

    # Load all PDFs at once
    loader = PyPDFDirectoryLoader(PDF_PATH)
    documents = loader.load()
    print(f"📚 Loaded {len(documents)} PDF documents")

    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=128)

    # Split documents with progress bar
    chunks = []
    for doc in tqdm(documents, total=len(documents), desc="🔄 Splitting PDFs"):
        chunks.extend(text_splitter.split_documents([doc]))

    print(f"✅ Finished splitting PDFs into {len(chunks)} chunks")
    return chunks


def calculate_chunk_ids(chunks, prefix):
    # Assign unique IDs to each chunk based on its source and index
    for index, chunk in enumerate(chunks):
        if prefix == "pdf": #pdf
            source = chunk.metadata.get("source")
            page = chunk.metadata.get("page")
            chunk_id = f"pdf:{source}:{page}:{index}"
        else:  # CSV
            filename = chunk.metadata.get("source_file", "unknown")
            chunk_id = f"csv:{filename}:{index}"

        chunk.metadata["id"] = chunk_id

    return chunks


def add_to_chroma(chunks: list[Document], embedding_model):
     # Add chunks to Chroma database
    print("💾 Adding to Chroma...")
    db = Chroma(
        persist_directory=CHROMA_PATH,
        embedding_function=embedding_model
    )

    # Get existing items in the database
    existing_items = db.get(include=[])
    existing_ids = set(existing_items["ids"])
    print(f"📊 Number of existing documents in DB: {len(existing_ids)}")

    # Filter out chunks that already exist in the database
    new_chunks = [chunk for chunk in chunks if chunk.metadata["id"] not in existing_ids]

    if new_chunks:
        # Add new chunks to the database
        print(f"👉 Adding {len(new_chunks)} new documents")
        new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]

        # Use tqdm to show progress
        pbar = tqdm(total=len(new_chunks), desc="Adding documents", unit="doc")

        for chunk, chunk_id in zip(new_chunks, new_chunk_ids):
            db.add_documents([chunk], ids=[chunk_id])
            pbar.update(1)

        pbar.close()

        print("✅ Database updated and persisted")
    else:
        print("✅ No new documents to add")


def reset_chroma_database(embedding_model):
    # Print a message indicating the start of the database reset process
    print("🗑️ Clearing entire database")

    # Initialize a new Chroma database instance
    # This connects to the existing database using the specified persistence directory and embedding model
    db = Chroma(
        persist_directory=CHROMA_PATH,
        embedding_function=embedding_model
    )

    # Delete the entire collection in the database
    # This removes all documents and their associated embeddings
    db.delete_collection()
    print("✅ Database reset completed")


def add_to_faiss(chunks: list[Document], embedding_model):
    """
    Add chunks to a FAISS vectorstore, ensuring no duplicates are added.

    Args:
        chunks (list[Document]): A list of Document objects to add.
        embedding_model (SentenceTransformerEmbeddings): Embedding model for generating embeddings.
    """
    print("💾 Adding to FAISS...")

    # Filter valid documents (ensure non-empty content)
    valid_chunks = [chunk for chunk in chunks if chunk.page_content.strip()]
    if not valid_chunks:
        print("⚠️ No valid documents to process. Aborting.")
        return

    # Check if FAISS index exists
    if os.path.exists(FAISS_INDEX_PATH):
        print("📂 Loading existing FAISS index...")
        vectorstore = FAISS.load_local(FAISS_INDEX_PATH, embedding_model)
    else:
        print("🆕 Creating a new FAISS index...")

        # Generate embeddings for the valid documents
        texts = [chunk.page_content for chunk in valid_chunks]
        metadatas = [chunk.metadata for chunk in valid_chunks]
        embeddings = embedding_model.embed_documents(texts)

        # Ensure embeddings are generated correctly
        if not embeddings or len(embeddings) != len(texts):
            print("⚠️ Embedding generation failed. Aborting.")
            return

        # Initialize FAISS with the valid documents
        vectorstore = FAISS.from_texts(texts, embedding_model, metadatas=metadatas)
        vectorstore.save_local(FAISS_INDEX_PATH)
        print("✅ New FAISS index created and saved.")
        return  # No need to continue; the index has just been created.

    # Extract existing document IDs
    existing_ids = set()
    for doc in vectorstore.docstore._dict.values():
        if "id" in doc["metadata"]:
            existing_ids.add(doc["metadata"]["id"])
    print(f"📊 Number of existing documents in DB: {len(existing_ids)}")

    # Filter out chunks that already exist
    new_chunks = [chunk for chunk in valid_chunks if chunk.metadata["id"] not in existing_ids]

    if new_chunks:
        print(f"👉 Adding {len(new_chunks)} new documents")

        # Extract text and metadata for new documents
        new_texts = [chunk.page_content for chunk in new_chunks]
        new_metadatas = [chunk.metadata for chunk in new_chunks]

        # Generate embeddings for the new texts
        embeddings = embedding_model.embed_documents(new_texts)
        if not embeddings or len(embeddings) != len(new_texts):
            print("⚠️ Embedding generation failed. Aborting.")
            return

        # Add new documents to FAISS
        vectorstore.add_texts(new_texts, metadatas=new_metadatas, embeddings=embeddings)

        # Save the updated FAISS index
        vectorstore.save_local(FAISS_INDEX_PATH)
        print("✅ Database updated and saved")
    else:
        print("✅ No new documents to add")


In [None]:
csv_chunks = load_and_chunk_csv(CSV_PATH)
csv_chunks = calculate_chunk_ids(csv_chunks, "csv")

In [None]:
pdf_chunks = load_and_split_pdfs(PDF_PATH)
pdf_chunks = calculate_chunk_ids(pdf_chunks, "pdf")

In [None]:
# Combine all chunks
all_chunks = pdf_chunks + csv_chunks
print(f"📚 Total chunks: {len(all_chunks)}")

In [None]:
print(csv_chunks[5000])

In [None]:
print(pdf_chunks[5000])

In [None]:
# Load SentenceTransformer model
embedding_model = SentenceTransformerEmbeddings(model_name="sentence-transformers/all-MiniLM-L12-v2")

In [None]:
add_to_chroma(all_chunks, embedding_model)

In [None]:
# reset_chroma_database(embedding_model)

In [None]:
# add_to_faiss(csv_chunks, embedding_model)

In [None]:
PROMPT_TEMPLATE = """
Answer the question based ONLY on the following context:

Context:
{context}

Instructions:
You are an assistant specializing in food safety and root cause analysis. Your expertise includes contamination incidents, recalls, and outbreaks.

Question: {question}
Answer:
"""

In [None]:
def query_rag(query_text, retriever, llm):
    """
    Query the Chroma database with a question using RetrievalQA.

    Args:
        query_text (str): The question to query.
        retriever: The retriever instance for Chroma.
        llm: The language model to use for generating responses.

    Returns:
        tuple: A tuple containing the response text and formatted sources.
    """
    print("🔍 Querying the Chroma database...")

    # Create a prompt template
    prompt_template = PromptTemplate(
        input_variables=["context", "question"],
        template=PROMPT_TEMPLATE
    )

    # Set up the QA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
        chain_type_kwargs={"prompt": prompt_template}
    )

    # Execute the query
    result = qa_chain.invoke({"query": query_text})
    response_text = result['result']
    source_documents = result['source_documents']

    print(source_documents)

    # Format sources
    sources = "\n".join([doc.metadata.get("id", "Unknown Source") for doc in source_documents])
    return response_text, sources

In [None]:
from transformers import pipeline

# Create a pipeline
pipe = pipeline(
    task="text-generation",
    model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
    return_full_text=False,
    max_new_tokens=500,
    do_sample=True,
    temperature=0.2,
)

# Wrap the Hugging Face pipeline with LangChain
llm = HuggingFacePipeline(pipeline=pipe)

In [None]:
# Initialize Chroma database
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_model)

# Initialize the retriever
retriever = db.as_retriever(search_type="mmr", search_kwargs={"k": 20, "fetch_k": 50})

In [None]:
queries = [
    # Case #1: Lead Poisoning in Applesauce Products
    "What specific products were involved in the Lead Poisoning in Applesauce Products incidents?",
    "For the Lead Poisoning in Applesauce Products, was the cinnamon intentionally adulterated? / Was this economically motivated adulteration?",
    # Case #2: Salmonella spp. in Cantaloupe Melons, 2023
    "For the Salmonella spp. in Cantaloupe Melons, 2023 incidents, had there been any similar incidents with the same product or facility/brand in the past?",
    "For the Salmonella spp. in Cantaloupe Melons, 2023 incidents, what was the origin of the pathogen that caused the outbreak?",
    # Case #3: E. Coli in Raw Milk Cheese
    "For the E. Coli in Raw Milk Cheese incidents, what was the scale of this outbreak? (number of affected manufacturing plants, geographical spread, people hospitalized. etc.)",
    "For the E. Coli in Raw Milk Cheese incidents, ss the problem (as in Root Cause) traced back to raw milk quality or absence of good manufacturing practices during production?",
    # Case #4: Listeria in Fresh Peaches, Plums & Nectarines
    "For the Listeria in Fresh Peaches, Plums & Nectarines incidents, why were the fruits recalled since they were past their expiration date?",
    "For the Listeria in Fresh Peaches, Plums & Nectarines incidents, how was the Listeria contamination initially detected?",
    # Case #5: Norovirus in Oysters
    "For the Norovirus in Oysters incidents, were the oysters processed post harvest?",
    "For the Norovirus in Oysters incidents, were there any recent environmental changes or events (for example heavy rain)that could have contributed to the initial contamination of the oysters?",
    # Case #6: Salmonella in Fresh Diced Onions
    "For the Salmonella in Fresh Diced Onions incidents, were there any similar incidents in the past involving the same supplier or distributor? If yes, were these linked to diced vegetables?",
    "For the Salmonella in Fresh Diced Onions incidents, were the findings concerning the root cause analysis conflicting?",
    # Case #7: Salmonella in Alfalfa Sprouts
    "For the Salmonella in Alfalfa Sprouts incidents, how was the contamination initially detected?",
    "For the Salmonella in Alfalfa Sprouts incidents, were there findings of poor food safety and hygiene practices in the manufacturing facilities?",
    # Case #8: Salmonella in Raw Cookie Dough
    "For the Salmonella in Raw Cookie Dough incidents, what ingredients were used in this product and which one/ones were the source of contamination?",
    "For the Salmonella in Raw Cookie Dough incidents, were all food safety and hygiene practices met during production handling and packaging of the cookie dough?",
    # Case #9: Listeria in Leafy Greens
    "For the Listeria in Leafy Greens incidents, was the strain identified either in the harvesting equipment, the manufacturing facilities or both?",
    "For the Listeria in Leafy Greens incidents, what prompted the FDA investigation?",
    # Case #10: Salmonella in Ground Beef
    "For the Salmonella in Ground Beef incidents, what is the most likely cause of contamination?",
    "What was the scale of the Salmonella in Ground Beef outbreak?",
    # Case #11: Listeria in Deli Meat and Cheese, 2023
    "For the Listeria in Deli Meat and Cheese, 2023 incidents, what is the most likely cause of contamination?",
    "What was the scale of the Listeria in Deli Meat and Cheese, 2023 outbreak?",
    # Case #12: Salmonella in Flour
    "For the Salmonella in Flour incidents, what is the most likely cause of contamination?",
    "What was the scale of the Salmonella in Flour outbreak?",
    # Case #13: Listeria in Enoki Mushrooms
    "For the Listeria in Enoki Mushrooms incidents, are enoki mushrooms subject to processing (heat treatment) by the consumer before consumption or are they consumed raw?",
    "For the Listeria in Enoki Mushrooms incidents, was the manufacturer involved in similar incidents in the past?",
    # Case #14: Salmonella in Cocoa
    "For the Salmonella in Cocoa incidents, what is the most likely cause of contamination?",
    "What was the scale of the Salmonella in Cocoa outbreak?",
    # Case #15: Cronobacter Sakazakii in Infant Formulae Power
    "What prompted the Cronobacter Sakazakii in Infant Formulae Power investigation?",
    "Was the Cronobacter Sakazakii in Infant Formulae Power outbreak linked to poor manufacturing practices?",
]

In [None]:
query_text="What specific products were involved in the Lead Poisoning in Applesauce Products incidents?"

response, sources = query_rag(query_text, retriever, llm)

In [None]:
for query_text in queries:
    response, sources = query_rag(query_text, retriever, llm)
    print("########################################")
    print("==========Query==========")
    print(query_text)
    print("==========Response==========")
    print(response)
    print("==========Sources==========")
    print(sources)
    print()