# Agentic Retrieval-Augmented Generation (RAG) with Local Llama 2 & ChromaDB

## Overview
This notebook implements an **Agentic Retrieval-Augmented Generation (RAG) pipeline**. It focuses on transcribing audio data, potentially from an Omi streaming device, storing both the transcription and audio, and then using the transcription with a local **Ai Studio** model and **ChromaDB** for intelligent question-answering. The system determines whether additional context is needed before generating responses.

### Key Features:
- **Audio Transcription Workflow** for processing data from devices like Omi.
- **Storage of Audio and Transcriptions** for AI processing.
- **Llama 2 Model** for high-quality text generation.
- **ChromaDB Vector Store** for efficient semantic search on transcriptions.
- **Dynamic Context Retrieval** to improve answer accuracy.
- **Two Answering Modes**:
  - With RAG (Retrieves relevant document content before responding).
  - Without RAG (Directly generates responses).

In [None]:
# Force Python to use the latest system sqlite3 (if available)
import sys
import os

# Unset any pysqlite3 monkeypatching
if "pysqlite3" in sys.modules:
    del sys.modules["pysqlite3"]
if "sqlite3" in sys.modules:
    del sys.modules["sqlite3"]

try:
    import pysqlite3
    sys.modules["sqlite3"] = pysqlite3
    sys.modules["pysqlite3"] = pysqlite3
    print("Using pysqlite3 as sqlite3 backend.")
except ImportError:
    print("pysqlite3 not found, using default sqlite3 module.")

import sqlite3
print("Loaded sqlite3 version:", sqlite3.sqlite_version)

In [None]:
# Install required packages if you have not installed them already
%pip install -r requirements.txt --verbose --quiet
%pip install -q --upgrade pip

## 🔧 Step 1: Model Setup

We will set up **Llama 2 (7B)** for text generation. If the model is not found locally, it will be downloaded from Hugging Face.

In [None]:
%pip install -q huggingface-hub

import os
from huggingface_hub import hf_hub_download

MODEL_FILENAME = "llama-2-7b-chat.Q4_K_M.gguf"
MODEL_DIR = "model"
EXPECTED_PATH = os.path.join(MODEL_DIR, MODEL_FILENAME)

# Ensure model directory exists
os.makedirs(MODEL_DIR, exist_ok=True)

# Check if model already exists
if os.path.exists(EXPECTED_PATH):
    print(f"Model already exists at: {EXPECTED_PATH}")
    model_path = EXPECTED_PATH
else:
    print("Model not found locally. Downloading Llama 2 model...")
    
    # Download the model
    model_path = hf_hub_download(
        repo_id="TheBloke/Llama-2-7B-Chat-GGUF",
        filename=MODEL_FILENAME,
        local_dir=MODEL_DIR
    )
    print(f"Model downloaded to: {model_path}")

print(f"Using model at: {model_path}")

In [None]:
%pip install -q llama-cpp-python
# Check if the model file exists
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Model file not found at {model_path}")

# Import the Llama class from llama_cpp
from llama_cpp import Llama

# Initialize the model with the local path and GPU acceleration
llm = Llama(
    model_path=EXPECTED_PATH,
    temperature=0.25,
    max_tokens=2000,
    n_ctx=4096,
    top_p=1.0,
    verbose=False,
    n_gpu_layers=30,  # Utilize some available GPU layers
    n_batch=512,      # Optimize batch size for parallel processing
    f16_kv=True,      # Enable half-precision for key/value cache
    use_mlock=True,   # Lock memory to prevent swapping
    use_mmap=True     # Utilize memory mapping for faster loading
)

## 📄 Step 2: Loading, Transcribing, and Storing Audio Data

This step outlines the process for loading audio data (e.g., from an Omi streaming device), transcribing it, and preparing it for storage and further processing. Both the raw audio and its transcription are valuable assets.


In [None]:
# --- Load the Audio File Document ---
# --- Load the Audio File Document and Audio Collection System ---

# Install whisper if not already installed
%pip install -q openai-whisper

# Install ffmpeg-python bindings if not already installed
%pip install -q ffmpeg-python

import shutil
import sys
import subprocess

# Check if ffmpeg is available and working
try:
    subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
    print("ffmpeg found and working.")
except FileNotFoundError:
    raise FileNotFoundError("ffmpeg not found. Please install ffmpeg and ensure it's in your PATH.")
except subprocess.CalledProcessError as e:
    print(f"Error running ffmpeg: {{e}}")
    raise RuntimeError("ffmpeg is not working correctly.") from e

import whisper

# No need to manually set ffmpeg_dir if ffmpeg is installed system-wide

# Define the Audio File file path
AUDIO_PATH = "./data/tester.mp3"
# Check if the file exists
print(f"Loading AUDIO from: {AUDIO_PATH}")

# Define the audio collection system
AUDIO_COLLECTION_SYSTEM = "MyAudioSystem"

# Load and transcribe the audio file using whisper
model = whisper.load_model("base")
result = model.transcribe(AUDIO_PATH)
text_content = result["text"]

# For compatibility with the rest of your code, wrap the text in a document-like object
class AudioDocument:
    def __init__(self, text):
        self.page_content = text
        self.metadata = {}  # Optionally add metadata if needed
    def getPageText(self):
        return self.page_content

documents = [AudioDocument(text_content)]

print(f"Successfully loaded {len(documents)} document(s) from the AUDIO.")
# Initialize an empty list for the audio files
audio_files = []

# Iterate through each document
for document in documents:
    # Get the current page's text content
    text_content = document.getPageText()

    # Extract relevant information from the text, e.g., keywords or phrases
    def extractRelevantInfo(text):
        # Placeholder: just return the text itself
        return text
    extracted_info = extractRelevantInfo(text_content)

    # Define a placeholder for createAudioFile
    def createAudioFile(system, info):
        # Placeholder: just return a tuple for demonstration
        return (system, info)

    # Create an audio file based on the extracted information
    audio_file = createAudioFile(AUDIO_COLLECTION_SYSTEM, extracted_info)

    # Add the audio file to the collection system's list
    audio_files.append(audio_file)
print(f"Successfully loaded {len(documents)} document(s) from the AUDIO and created {len(audio_files)} audio file(s).")




In [None]:
# --- Audio Event Detection (Sound Tagging) ---

%pip install -q torchaudio
%pip install -q panns-inference

import torch
import torchaudio
from panns_inference import AudioTagging

# Initialize the PANNs audio tagging model
panns_model = AudioTagging(checkpoint_path=None, device='cpu')  # Uses default Cnn14 weights

# Load and preprocess audio
waveform, sr = torchaudio.load(AUDIO_PATH)
if sr != 32000:
    waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=32000)(waveform)
    sr = 32000

# PANNs expects mono audio
if waveform.shape[0] > 1:
    waveform = torch.mean(waveform, dim=0, keepdim=True)


# --- Load the full AudioSet class labels for PANNs output mapping ---
# Download the official label file from: https://github.com/qiuqiangkong/panns-inference/blob/master/panns_inference/labels/audioset_labels.txt
# If you cannot download, here is a fallback: print the indices instead of labels.

import os
import csv

LABELS_CSV_PATH = "class_labels_indices.csv"
labels = None
if os.path.exists(LABELS_CSV_PATH):
    with open(LABELS_CSV_PATH, newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        labels = [row['display_name'] for row in reader]
else:
    print("WARNING: AudioSet class label CSV not found. Will print indices instead of class names.")
    labels = None

with torch.no_grad():
    output = panns_model.inference(waveform)
    clipwise_output = output[0]
    topk = torch.topk(torch.tensor(clipwise_output[0]), 3)
    if labels and max(topk.indices.tolist()) < len(labels):
        sound_tags = [labels[i] for i in topk.indices.tolist()]
    else:
        sound_tags = [f"Class index {i}" for i in topk.indices.tolist()]

print("Detected sound types:", sound_tags)

## ✂️ Step 3: Chunking Audio Transcriptions for RAG

The transcribed text from the audio data is split into **small overlapping chunks** (approximately **500 characters**). These chunks are then used for embedding and storage in ChromaDB to enable semantic search for the RAG pipeline.


In [None]:
# --- Split the Audio Content into Manageable Chunks with Sound Tags ---
%pip install -q langchain

from langchain.text_splitter import CharacterTextSplitter

CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)


# Split the transcription into chunks
docs = text_splitter.split_documents(documents)

# Attach sound tags to each chunk as metadata
for doc in docs:
    doc.page_content = f"Transcription: {doc.page_content}\nSound tags: {', '.join(sound_tags)}"

## 🔍 Step 4: Initializing the Embedding Model

To convert text into numerical representations for efficient similarity search, we use **all-MiniLM-L6-v2** from `sentence-transformers`.

In [None]:
# --- Initialize the Embedding Model ---
%pip install -q sentence-transformers
from sentence_transformers import SentenceTransformer

# Define the embedding model name
MODEL_NAME = "all-MiniLM-L6-v2"

# Load the embedding model
embedding_model = SentenceTransformer(MODEL_NAME)

print(f"Successfully loaded embedding model: {MODEL_NAME}")

## 🧠 Step 5: Computing Embeddings for Document Chunks

Each chunk is converted into a **vector representation** using our embedding model. This allows us to perform **semantic similarity searches** later.

In [None]:
# --- Compute Embeddings for Each Text Chunk ---

# Extract text content from each chunk
doc_texts = [doc.page_content for doc in docs]

# Compute embeddings for the extracted text chunks
document_embeddings = embedding_model.encode(doc_texts, convert_to_numpy=True)

# Display the result
print("Successfully computed embeddings for each text chunk.")
print(f"Embeddings Shape: {document_embeddings.shape}")

## 🗄️ Step 6: Storing Audio Transcription Embeddings in ChromaDB

We initialize **ChromaDB**, a high-performance **vector database**, and store our computed embeddings to enable efficient retrieval of relevant text chunks.

In [None]:
# --- Initialize and Populate the Chroma Vector Database ---

# Define Chroma database path and collection name
CHROMA_DB_PATH = "./chroma_db"
COLLECTION_NAME = "document_embeddings"

# Initialize Chroma client
import chromadb
chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)

# Add document embeddings to the Chroma collection
for i, embedding in enumerate(document_embeddings):
    collection.add(
        ids=[str(i)],  # Chroma requires string IDs
        embeddings=[embedding.tolist()],
        metadatas=[{"text": doc_texts[i]}]
    )

print("Successfully populated Chroma database with document embeddings.")

## 🔎 Step 7: Implementing Vector Search Tool

To retrieve relevant text passages from the database, we define a **vector search function** that finds the most relevant chunks based on a user query.

In [None]:
def vector_search_tool(query: str) -> str:
    """
    Searches the Chroma database for relevant text chunks based on the query.
    Computes the query embedding, retrieves the top 5 most relevant text chunks,
    and returns them as a formatted string.
    """
    query_embedding = embedding_model.encode(query, convert_to_numpy=True).tolist()
    TOP_K = 5
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=TOP_K
    )
    retrieved_chunks = [metadata["text"] for metadata in results["metadatas"][0]]
    return "\n\n".join(retrieved_chunks)


## 🤖 Step 8: Context Need Assessment

Instead of always retrieving context, we determine if the query **requires external document context** before generating a response. This creates an agentic workflow that makes autonomous decisions to complete the task at hand.

In [None]:
def find_relevant_audio_segments(query: str):
    # Retrieve top relevant chunks
    results = collection.query(
        query_embeddings=[embedding_model.encode(query, convert_to_numpy=True).tolist()],
        n_results=5
    )
    # Get the matched text chunks
    matched_texts = [metadata["text"] for metadata in results["metadatas"][0]]

    # If you have segment info, match text to segment
    relevant_segments = []
    for segment in result["segments"]:
        for text in matched_texts:
            if segment["text"].strip() in text:
                relevant_segments.append(segment)
    return relevant_segments

# Example usage:
relevant_segments = find_relevant_audio_segments("vocal techniques")
for seg in relevant_segments:
    print(f"Start: {seg['start']}s, End: {seg['end']}s, Text: {seg['text']}")


## 💡 Step 9: Answer Generation with Agentic RAG

