### RAG pipeline - Data Ingestion to Vector DB Pipeline

In [23]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pathlib import Path

In [24]:
### Read all pdf inside the directory
def process_all_pdfs_in_directory(directory_path):
    all_documents = []
    pdf_dir = Path(directory_path)

    # Find all PDF files in the directory
    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print (f"\nFound {len(pdf_files)} PDF files in the directory '{directory_path}':")

    for pdf_file in pdf_files:
        print(f"- {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            # Add source information to metadata
            for doc in documents:
                doc.metadata["source"] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            all_documents.extend(documents)
            print(f"  Loaded {len(documents)} pages from {pdf_file.name}")
   
        except Exception as e:
            print(f" Error: {e}")
    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

all_docs = process_all_pdfs_in_directory("../data/files")  # Update with your directory path


Found 3 PDF files in the directory '../data/files':
- business model canvas v1.pdf
  Loaded 3 pages from business model canvas v1.pdf
- Projeto de intervenção.pdf
  Loaded 18 pages from Projeto de intervenção.pdf
- Ricardo Miguel.pdf
  Loaded 1 pages from Ricardo Miguel.pdf

Total documents loaded: 22


In [25]:
### Text splitting get into chunks
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    """Split documents into smaller chunks."""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""],
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"\nTotal documents after splitting: {len(split_docs)}")

    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")
    return split_docs


In [26]:
chunks = split_documents(all_docs)


Total documents after splitting: 40

Example chunk:
Content: B u sin ess M od el
C an vas 1.0...
Metadata: {'producer': 'Canva', 'creator': 'Canva', 'creationdate': '2025-09-22T01:32:10+00:00', 'title': 'Helix Fusion Bistro', 'moddate': '2025-09-22T01:32:09+00:00', 'keywords': 'DAGzd4j7FzY,BACibNMnYqs,0', 'author': 'Ricardo Barros', 'source': 'business model canvas v1.pdf', 'total_pages': 3, 'page': 0, 'page_label': '1', 'file_type': 'pdf'}


### Embedding and vectorStoreDB 

In [27]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity


In [28]:
class EmbeddingManager:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.mode_name = model_name
        self.model = None
        self._load_model()
    def _load_model(self):
        """Load the embedding model."""
        try:
            self.model = SentenceTransformer(self.mode_name)
            print(f"Loaded embedding model: {self.mode_name}")
            print(f"Model loaded sucessfully. Embedding dimension {self.model.get_sentence_embedding_dimension()}.")
        except Exception as e:
            print(f"Error loading model {self.mode_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for a list of texts."""
        if not self.model:
            raise ValueError("Model not loaded.")
        embeddings = self.model.encode(texts, convert_to_numpy=True)
        return embeddings

embedding_manager = EmbeddingManager()
embedding_manager

Loaded embedding model: all-MiniLM-L6-v2
Model loaded sucessfully. Embedding dimension 384.


<__main__.EmbeddingManager at 0x148ea01a0>

### Vector Store

In [None]:
class VectorStore:
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection."""
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            self.collection = self.client.get_or_create_collection(name=self.collection_name, metadata={"description": "PDF Documents Collection"})
            print(f"ChromaDB initialized with collection: {self.collection_name}")
        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        
        if len(documents) != len(embeddings):
            raise ValueError("The number of documents and embeddings must match.")
        """Add documents and their embeddings to the collection."""
        print(f"Adding {len(documents)} documents to the collection '{self.collection_name}'...")
        ids = []
        metadatas = []
        documents_texts = []
        embeddings_list = [] 

        for i, (doc,embedding) in enumerate(zip(documents, embeddings)):
            # Generate a unique ID for each document
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_texts.append(doc.page_content)

            # Embedding
            embeddings_list.append(embedding.tolist())

        # Add to collection
        try: 
            self.collection.add(
                ids=ids,
                metadatas=metadatas,
                documents=documents_texts,
                embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to the collection.")
            print(f"Collection now has {self.collection.count()} documents.")
            
        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            raise
    
vector_store=VectorStore()
vector_store

ChromaDB initialized with collection: pdf_documents


<__main__.VectorStore at 0x1696dabd0>

In [30]:
## convert the text to embeddings
texts = [doc.page_content for doc in chunks]

## generate embeddings
embeddings = embedding_manager.generate_embeddings(texts)

## store in vector db
vector_store.add_documents(chunks, embeddings)

Adding 40 documents to the collection 'pdf_documents'...
Successfully added 40 documents to the collection.
Collection now has 166 documents.


### Retriever Pipeline From VectorStore

In [31]:
class RAGRetriever:
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever with a vector store and embedding manager.
        
        Args:
            vector_store (VectorStore): The vector store instance.
            embedding_manager (EmbeddingManager): The embedding manager instance.
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a given query.

        Args:
            query (str): The input query string.
            top_k (int): The number of top documents to retrieve.
            score_threshold (float): Minimum similarity score to consider a document relevant.

        Returns:
            List[Dict[str, Any]]: A list of retrieved documents with metadata and scores.
        
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score Threshold: {score_threshold}")

        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # search in the vector store
        try: 
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k,
            )
            retrieved_docs = []

            if results['documents'] and results['documents'][0]:
                documents=results['documents'][0]
                metadatas=results['metadatas'][0]
                distances=results['distances'][0]
                ids=results['ids'][0]
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # convert distance to similarity score (ChormaDB uses cosine distance)
                    similarity_score = 1 - distance
                    if similarity_score >= score_threshold: 
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'distance': distance,
                            'rank': i + 1
                        })
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            return retrieved_docs
        
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever = RAGRetriever(vector_store, embedding_manager)
    


In [32]:
rag_retriever.retrieve("Manutenção de computadores")

Retrieving documents for query: 'Manutenção de computadores'
Top K: 5, Score Threshold: 0.0
Retrieved 5 documents (after filtering)


[{'id': 'doc_31845d84_0',
  'content': 'Projeto  de  intervenção  \n \nManutenção  e  Reparação  de  Computadores  \n   \n                       \n1',
  'metadata': {'doc_index': 0,
   'creationdate': '',
   'page_label': '1',
   'title': 'Projeto de intervenção',
   'total_pages': 18,
   'content_length': 103,
   'page': 0,
   'producer': 'Skia/PDF m140 Google Docs Renderer',
   'file_type': 'pdf',
   'source': 'Projeto de intervenção.pdf',
   'creator': 'PyPDF'},
  'distance': 0.4315428137779236,
  'rank': 1},
 {'id': 'doc_5992a92d_4',
  'content': 'Projeto  de  intervenção  \n \nManutenção  e  Reparação  de  Computadores  \n   \n                       \n1',
  'metadata': {'producer': 'Skia/PDF m140 Google Docs Renderer',
   'page_label': '1',
   'creationdate': '',
   'content_length': 103,
   'source': 'Projeto de intervenção.pdf',
   'page': 0,
   'title': 'Projeto de intervenção',
   'creator': 'PyPDF',
   'total_pages': 18,
   'file_type': 'pdf',
   'doc_index': 4},
  'dista

### Integration vectordb context pipeline with llm output

In [None]:
### Simple RAG with Groq LLM

from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

## init the grop llm
groq_api_key = os.getenv("GROQ_API_KEY")
llm = ChatGroq(groq_api_key=groq_api_key, model_name="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)
print(os.getenv("GROQ_API_KEY"))

## simple rag function: retrieve context + generate response
def rag_simple(query, retriever, llm, top_k=5):
    ## retriver context
    results = retriever.retrieve(query,top_k=top_k)
    context = "\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "No relevant content"
    
    ## generate answer using groq
    prompt=f"""Use the following context to answer the question concisely.
        Context:
        {context}

        Question: {query}

        Answer: """

    response = llm.invoke([prompt.format(context=context, query=query)])
    return response.content

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


In [45]:
answer = rag_simple("Quem é Ricardo Barros quais sao a experiencias", rag_retriever, llm)
print(answer)

Retrieving documents for query: 'Quem é Ricardo Barros quais sao a experiencias'
Top K: 5, Score Threshold: 0.0
Retrieved 5 documents (after filtering)
Não há informações disponíveis sobre as experiências de Ricardo Barros no contexto fornecido. No entanto, posso sugerir que ele é um engenheiro com experiência em formação e desenvolvimento de habilidades, pois está listado como formador em um índice que inclui temas como motivação, gestão de tempo e aprendizagem.
