## RAG pipeline:

#### 1- Data ingestion pipeline

In [2]:
import os
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

Reading the pdf files inside the folder:

In [5]:
def process_all_pdfs(pdf_directory):
    all_documents = []
    pdf_dir_path=Path(pdf_directory)

    pdf_files = list(pdf_dir_path.glob("**/*.pdf"))

    print(f"number of pdf files found: {len(pdf_files)} files")
    for pdf_file in pdf_files:
        print(f"processing the file:{pdf_file.name}")
        try:
            loader=PyPDFLoader(str(pdf_file))
            documents=loader.load()

            #adding source info to metadata:
            for document in documents:
                document.metadata['source_file'] = pdf_file.name
                document.metadata['file_type']='pdf'

            all_documents.extend(documents)
            print(f"loaded {len(documents)} pages")
        except Exception as e:
            print(f"error loading files:{e}")
    print(f"Total of loaded documents:{len(all_documents)}")
    return all_documents

# process all pdf files inside data folder
all_pdf_docs=process_all_pdfs("../data")

number of pdf files found: 3 files
processing the file:bgDataFundamentals.pdf
loaded 20 pages
processing the file:kafka.pdf
loaded 50 pages
processing the file:rapport_de_stage.pdf
loaded 26 pages
Total of loaded documents:96


In [6]:
all_pdf_docs

[Document(metadata={'producer': 'Microsoft® PowerPoint® 2016', 'creator': 'Microsoft® PowerPoint® 2016', 'creationdate': '2025-02-16T12:01:23+01:00', 'title': 'PowerPoint Presentation', 'author': 'elmarouani', 'moddate': '2025-02-16T12:01:23+01:00', 'source': '..\\data\\pdfs\\bgDataFundamentals.pdf', 'total_pages': 20, 'page': 0, 'page_label': '1', 'source_file': 'bgDataFundamentals.pdf', 'file_type': 'pdf'}, page_content='Mohamed El Marouani\nTDIA 2\nLes fondements du Big Data'),
 Document(metadata={'producer': 'Microsoft® PowerPoint® 2016', 'creator': 'Microsoft® PowerPoint® 2016', 'creationdate': '2025-02-16T12:01:23+01:00', 'title': 'PowerPoint Presentation', 'author': 'elmarouani', 'moddate': '2025-02-16T12:01:23+01:00', 'source': '..\\data\\pdfs\\bgDataFundamentals.pdf', 'total_pages': 20, 'page': 1, 'page_label': '2', 'source_file': 'bgDataFundamentals.pdf', 'file_type': 'pdf'}, page_content='2\n1. Qu’est ce que les données?\n2. Types des données\n3. Impact des données\n4. Carac

##### step2: chunking using text splitter

In [7]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    text_splitter= RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )
    split_docs=text_splitter.split_documents(documents)
    print(f"splitted {len(documents)} into {len(split_docs)} chunks")
    return split_docs

In [8]:
chunks= split_documents(all_pdf_docs)
chunks

splitted 96 into 125 chunks


[Document(metadata={'producer': 'Microsoft® PowerPoint® 2016', 'creator': 'Microsoft® PowerPoint® 2016', 'creationdate': '2025-02-16T12:01:23+01:00', 'title': 'PowerPoint Presentation', 'author': 'elmarouani', 'moddate': '2025-02-16T12:01:23+01:00', 'source': '..\\data\\pdfs\\bgDataFundamentals.pdf', 'total_pages': 20, 'page': 0, 'page_label': '1', 'source_file': 'bgDataFundamentals.pdf', 'file_type': 'pdf'}, page_content='Mohamed El Marouani\nTDIA 2\nLes fondements du Big Data'),
 Document(metadata={'producer': 'Microsoft® PowerPoint® 2016', 'creator': 'Microsoft® PowerPoint® 2016', 'creationdate': '2025-02-16T12:01:23+01:00', 'title': 'PowerPoint Presentation', 'author': 'elmarouani', 'moddate': '2025-02-16T12:01:23+01:00', 'source': '..\\data\\pdfs\\bgDataFundamentals.pdf', 'total_pages': 20, 'page': 1, 'page_label': '2', 'source_file': 'bgDataFundamentals.pdf', 'file_type': 'pdf'}, page_content='2\n1. Qu’est ce que les données?\n2. Types des données\n3. Impact des données\n4. Carac

#### step3: embedding:

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    def __init__(self, model_name: str="all-MiniLM-L6-v2"):
        self.model_name=model_name
        self.model=None
        self._load_model()
    def _load_model(self):
        try:
            print(f"loading the embedding model:{self.model_name}")
            self.model=SentenceTransformer(self.model_name)
            print(f"the model: {self.model_name} is loaded successfuly, embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"error loading the model: {self.model_name}: {e}")
            raise
    def generate_embeddings(self, texts: List[str])->np.ndarray:
        """
        this function, takes a list of text strings to embed
        and returns the numpy array of embeddings with shape(len(texts), embedding_dimention(384 in this case))
        """
        if not self.model:
            raise ValueError("model not loaded")
        print(f"generating embeddings for {len(texts)} texts")
        embeddings= self.model.encode(texts, show_progress_bar=True)
        print(f"generated embeddings with shape: {embeddings.shape}")
        return embeddings
    
embeddingManager=EmbeddingManager()
embeddingManager

loading the embedding model:all-MiniLM-L6-v2
the model: all-MiniLM-L6-v2 is loaded successfuly, embedding dimension: 384


<__main__.EmbeddingManager at 0x24378372ba0>

#### step4: vector store

In [11]:
import chromadb
from chromadb.config import Settings
import uuid

In [None]:
class VectorStore:
    def __init__(self, collection_name: str="pdf_documents", persist_directory: str="../data/vector_store"):
        self.collection_name= collection_name
        self.persist_directory=persist_directory
        self.client=None
        self.collection=None
        self._initialize_store()
    def _initialize_store(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client=chromadb.PersistentClient(path=self.persist_directory)

            self.collection=self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF documents embeddings for rag"}
            )
            print(f"vector store initialazed,  collection:{self.collection_name}")
            print(f"existing documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"error in initializing the vector store: {e}")
            raise
    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        this function adds the list of langchain docs with there embeddings to the vector store
        Arguments:
            documents: list of langchain documents
            embeddings: correspondng embeddings for documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("the number of documents must match the number of embeddings")
        print(f"adding{len(documents)} documents to the document store")

        # 1- preparing data for chromadb
        ids=[]
        metadatas= []
        documents_content= []
        embedding_list= []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            
            #generate a unique id using uuid and the index:
            doc_id= f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            #metadata:
            metadata= dict(doc.metadata)
            metadata['doc_index']= i
            metadata['doc_length']= len(doc.page_content)
            metadatas.append(metadata)

            #document contents:
            documents_content.append(doc.page_content)

            #embeddings:
            embedding_list.append(embedding.tolist())
        # 2- adding prepared data to collection:
        try:
            self.collection.add(
                ids=ids,
                embeddings=embedding_list,
                metadatas=metadatas,
                documents=documents_content
            )
        except Exception as e:
            print(f"error adding documents to the vector store: {e}")
            raise

vectorStore=VectorStore()
vectorStore
                  

vector store initialazed,  collection:pdf_documents
existing documents in collection: 0


<__main__.VectorStore at 0x2432a32a270>

we covert the chunks texts to embeddings:

In [17]:
#1- extracting page_content from the chunks documents
texts=[doc.page_content for doc in chunks]
#2- generating embeddings
embeddings= embeddingManager.generate_embeddings(texts)
#3- storing the results in the vectore db:
vectorStore.add_documents(chunks, embeddings)

generating embeddings for 125 texts


Batches:   0%|          | 0/4 [00:00<?, ?it/s]

generated embeddings with shape: (125, 384)
adding125 documents to the document store


#### 2- Retrieval pipeline:

user--> query --> embedded-->similarity search in the vector db --> to get the context

In [18]:
class RAGRetriever:
    def __init__(self, vectorStore: VectorStore, embeddingManager: EmbeddingManager):
        """
        initialize the retriever
        Args:
            vector_store: Vector store containing documments embeddings
            embedding_manager: manager to embed the query(users question)
        """
        self.vectorStore= vectorStore
        self.embeddingManager=embeddingManager
    def retrieve(self, query: str, top_k: int= 5, score_threshold: float=0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevent documents in the vectore store based on the users embedded query

        Args:
            query: the search query in the str format
            top_k: number of top results to return
            score_threshold: minimum similarity score threshold 
        """
        print("retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}")
        #step1: embedding the str query:
        query_embedding= self.embeddingManager.generate_embeddings([query])[0]
        #step2: searching in the vector store:
        try:
            results= self.vectorStore.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            #process the results:
            retrieved_docs=[]
            if results['documents'] and results['documents'][0]:
                documents= results['documents'][0]
                metadatas= results['metadatas'][0]
                distances= results['distances'][0]
                print(f"retrieved {len(documents)} documents before filtering")
                ids= results['ids'][0]
                for i, (document, metadata, distance, id) in enumerate(zip(documents, metadatas, distances, ids)):
                    # we convert the cosine_distance calculated by default in chromadb to the similarity score
                    # the smaller the distance the higher the similarity score
                    similarity_score= 1-distance
                    if similarity_score>=score_threshold:
                        retrieved_docs.append({
                            'id': id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i+1
                        })
                print(f"retrieved {len(retrieved_docs)} documents after filtering")
            else:
                print("no corresponding documents found")
            return retrieved_docs
        except Exception as e:
            print("error retrieving the context from the knowledge base: {e}") 
rag_retriever= RAGRetriever(vectorStore, embeddingManager)       

In [19]:
rag_retriever

<__main__.RAGRetriever at 0x2432a328ec0>

In [20]:
rag_retriever.retrieve("Que veut dire les données")

retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 5 documents before filtering
retrieved 5 documents after filtering


[{'id': 'doc_d9fe184d_15',
  'content': "Data Journey\n1 - L'ingestion des données\nL'ingestion des données est la première étape du cycle de vie des données. C'est à ce stade que les données \nsont collectées à partir de diverses sources internes telles que les bases de données, les systèmes de gestion de la \nrelation client (CRM), les systèmes d'information de gestion (ERP), les systèmes existants, les sources externes telles \nque les enquêtes et les fournisseurs tiers. Il est important de s'assurer que les données acquises sont exactes et à \njour afin de pouvoir les utiliser efficacement dans les étapes suivantes du cycle.\nÀ ce stade, les données brutes sont extraites d'une ou de plusieurs sources de données, répliquées, puis intégrées \ndans un support de stockage d'atterrissage. Ensuite, vous devez prendre en compte les caractéristiques des \ndonnées que vous souhaitez acquérir pour vous assurer que l'étape d'ingestion des données dispose de la \ntechnologie et des processus a

In [21]:
rag_retriever.retrieve("envoi asynchrone kafka")

retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 5 documents before filtering
retrieved 5 documents after filtering


[{'id': 'doc_c519377b_49',
  'content': '18\nEnvoi asynchrone dans Kafka:\nL’envoi asynchrone est rapide et efficace, surtout lorsque l’on n’a pas besoin d’attendre les réponses de Kafka.\n\uf0a7 Comparaison avec l’envoi synchrone :\no Si le temps réseau aller-retour est de 10 ms, envoyer 100 messages en attendant à chaque fois prend ~1 \nseconde.\no Si on envoie tout sans attendre, c’est quasi instantané.\no En général, l’application n’a pas besoin de la réponse (topic, partition, offset) mais doit savoir si une erreur \nest survenue.\n\uf0a7 Solution : Ajouter un callback\no Permet d’envoyer les messages sans blocage tout en gérant les erreurs.\no Le callback est exécuté à la réception de la réponse Kafka, avec ou sans erreur.\nProducers: Construction',
  'metadata': {'page': 17,
   'file_type': 'pdf',
   'total_pages': 50,
   'page_label': '18',
   'moddate': '2025-05-20T15:25:22+01:00',
   'doc_index': 49,
   'creator': 'Microsoft® PowerPoint® 2016',
   'source_file': 'kafka.pdf',


In [22]:
rag_retriever.retrieve("what is big data")

retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 5 documents before filtering
retrieved 3 documents after filtering


[{'id': 'doc_6106c830_0',
  'content': 'Mohamed El Marouani\nTDIA 2\nLes fondements du Big Data',
  'metadata': {'source_file': 'bgDataFundamentals.pdf',
   'creationdate': '2025-02-16T12:01:23+01:00',
   'doc_index': 0,
   'producer': 'Microsoft® PowerPoint® 2016',
   'source': '..\\data\\pdfs\\bgDataFundamentals.pdf',
   'author': 'elmarouani',
   'total_pages': 20,
   'doc_length': 53,
   'creator': 'Microsoft® PowerPoint® 2016',
   'page_label': '1',
   'moddate': '2025-02-16T12:01:23+01:00',
   'file_type': 'pdf',
   'title': 'PowerPoint Presentation',
   'page': 0},
  'similarity_score': 0.2098078727722168,
  'distance': 0.7901921272277832,
  'rank': 1},
 {'id': 'doc_4b4c0ea6_26',
  'content': "Evolution du Big Data\nBien que le concept de Big Data soit relativement nouveau, la nécessité de gérer des jeux de données volumineux \nremonte aux années 1960 et 70, avec les premiers data centers et le développement des bases de données \nrelationnelles.\n\uf0a7 Passé: En 2005, on assis

#### 3- Augmented generation: giving the context with the query and a prompt to the llm

In [28]:
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()
groq_api_key=os.getenv("groq_api_key")

llm= ChatGroq(groq_api_key=groq_api_key, 
              model_name="openai/gpt-oss-20b", 
              temperature=0.1,
              max_tokens=1024)

def rag_simple(query, retriever: RAGRetriever, llm, top_k=3):
    results=retriever.retrieve(query, top_k=top_k)
    context= "\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "no relevent context found"
    #generating answer using groq llm:
    prompt=f"""use the following context to answer the question concisely.
        Context:
        {context}

        Question:
        {query}
        
        Answer:"""
    response= llm.invoke([prompt.format(context=context, query= query)])
    return response.content


In [29]:
answer=rag_simple("Quelles sont les differents Types des données", rag_retriever, llm)
print(answer)

retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 3 documents before filtering
retrieved 3 documents after filtering
**Types de données :**

1. **Données qualitatives (catégorielles)**  
   - **Nominales** : catégories sans ordre (ex. sexe, nationalité, couleur).  
   - **Ordinales** : catégories avec un ordre ou un classement (ex. classements, notes, niveaux de satisfaction).


In [30]:
answer=rag_simple("donne moi les 5 caracteristiques de la big data", rag_retriever, llm)
print(answer)

retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 3 documents before filtering
retrieved 3 documents after filtering
Les 5 caractéristiques clés du Big Data :  

1. **Volume** – quantités massives de données.  
2. **Vitesse (Velocity)** – génération et traitement rapides.  
3. **Variété** – diversité des formats (texte, image, vidéo, etc.).  
4. **Veracité** – qualité et fiabilité des données.  
5. **Valeur (Value)** – capacité à produire des insights utiles.


### Enhanced rag returning answer with the sources

In [32]:
def rag_advenced(query, retriever: RAGRetriever, llm, top_k=5, min_score=0.0, return_context=False):
    """
    RAG pipeline with extra features which returns the answer with the sources documents:
    Returns answer, sources, confidence score, and full context if required   
    """
    results= retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return {'answer': 'no relevent context found in the provided files', 'sources':[], 'confidence': 0.0, 'context': ""}
    context= "\n\n".join([doc['content'] for doc in results])
    sources=[{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview': doc['content'][:120]+"..."
    } for doc in results]
    confidence= max([doc['similarity_score'] for doc in results])
    #generating the answer:
    prompt=f"""Use this following context to answer the question concisely and precisely\nContext:\n{context}\nQuestion:\n{query}\n\nAnswer:"""
    response=llm.invoke([prompt.format(context=context, query=query)])

    output={
        'answer': response.content,
        'source': sources,
        'confidence': confidence
    }
    if return_context:
        output['context']=context
    return output

In [33]:
result=rag_advenced("donne moi les 5 caracteristiques de la big data", rag_retriever, llm, top_k=3, min_score=0.1, return_context=True)

print(f"answer: {result['answer']}")
print(f"sources:{result['source']}")
print(f"confidence:{result['confidence']}")
print(f"context prev:{result['context']}")


retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 3 documents before filtering
retrieved 3 documents after filtering
answer: Les 5 caractéristiques classiques du Big Data — les « V » — sont :

1. **Volume** – quantité massive de données.  
2. **Velocity** – vitesse de génération et de traitement.  
3. **Variety** – diversité des formats (texte, image, vidéo, etc.).  
4. **Veracity** – qualité, fiabilité et précision des données.  
5. **Value** – valeur ajoutée et insights que l’on peut en extraire.
sources:[{'source': 'bgDataFundamentals.pdf', 'page': 0, 'score': 0.5941737592220306, 'preview': 'Mohamed El Marouani\nTDIA 2\nLes fondements du Big Data...'}, {'source': 'bgDataFundamentals.pdf', 'page': 1, 'score': 0.35692936182022095, 'preview': '2\n1. Qu’est ce que les données?\n2. Types des données\n3. Impact des données\n4. Caractéristiques des données (les V)\n5. Da...'}, {'source': 'bgDataFundamentals.pdf', 'page': 18, 'score': 0.34568339586257935, 'preview': 'Evolution du Big Data

In [40]:
result=rag_advenced("quelle est la durrée du stage effectué au sein de l'entreprise YAZAKI", rag_retriever, llm, top_k=3, min_score=-0.1, return_context=True)

print(f"answer: {result['answer']}")
print(f"sources:{result['source']}")
print(f"confidence:{result['confidence']}")
print(f"context prev:{result['context']}")


retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 3 documents before filtering
retrieved 2 documents after filtering
answer: Le stage a duré **deux mois**.
sources:[{'source': 'rapport_de_stage.pdf', 'page': 7, 'score': 0.06437516212463379, 'preview': 'Chapitre 1\nContexte et Objectifs du Stage\n1.1 Présentation de l’Entreprise\nYazaki est un leader mondial dans la fabricat...'}, {'source': 'rapport_de_stage.pdf', 'page': 6, 'score': -0.028636693954467773, 'preview': 'Introduction\nCe stage de deux mois réalisé au sein de Yazaki Meknes avait pour objectif\nprincipal le développement d’une...'}]
confidence:0.06437516212463379
context prev:Chapitre 1
Contexte et Objectifs du Stage
1.1 Présentation de l’Entreprise
Yazaki est un leader mondial dans la fabrication de systèmes de câblage
automobile et de instruments de mesure. Fondée en 1929 au Japon, l’entreprise
emploie plus de 200 000 personnes dans 45 pays et possède une forte présence dans
le secteur automobile avec des innovations te

In [44]:
result=rag_advenced("Présentation de l’Entreprise", rag_retriever, llm, top_k=3, min_score=-1, return_context=True)

print(f"answer: {result['answer']}")
print(f"sources:{result['source']}")
print(f"confidence:{result['confidence']}")
print(f"context prev:{result['context']}")


retrieving documents for the query: '{query}' with top k: {top_k}, and the score threshold of similarity: {score_threshold}
generating embeddings for 1 texts


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

generated embeddings with shape: (1, 384)
retrieved 3 documents before filtering
retrieved 3 documents after filtering
answer: **Présentation de l’entreprise : Yazaki**

Yazaki est un fournisseur mondial de composants et de systèmes automobiles.  
Spécialisée dans les câblages, les connecteurs, les systèmes de gestion de l’énergie et les solutions de communication embarquée, l’entreprise conçoit, fabrique et distribue des pièces destinées aux constructeurs automobiles et aux équipementiers. Elle opère à l’échelle internationale, avec des sites de production et de R&D répartis dans le monde entier, et se distingue par son engagement en matière d’innovation, de qualité et de durabilité.
sources:[{'source': 'rapport_de_stage.pdf', 'page': 6, 'score': -0.1468815803527832, 'preview': 'renseignant l’adresse d’un ou de plusieurs sous-réseaux, ce qui permet d’élargir la portée\nde la supervision et d’obteni...'}, {'source': 'rapport_de_stage.pdf', 'page': 1, 'score': -0.25188279151916504, 'pre