# **RAG Implementation**

Basic RAG implementation using Gemini API, a sentence transformer and FAISS (manual processing with cosine similarity also available for small datasets). Youtube videos, Wikipedia pages, PDF and TXT supported.

In [None]:
#Write the page titles and video titles you want to feed your RAG with, you can also let empty lists []
wikipedia_page_titles = ["World War I", "World War II"]
youtube_video_titles = ["Cold War", "World War"]

#Drag and drop your files and write the path to the folder containing the documents
path = '/content'

MODE = "FAISS" #"FAISS" or "manual"

from google.colab import userdata
#Write your API key and choose your AI model
API_KEY = userdata.get('GRO_API_KEY')   #'GEM_API_KEY'
ai_type = "groq"  #"gemini", "groq", "mistral"
ai_model = "llama-3.3-70b-versatile" # ex : "gemini-2.0-flash-lite" "mistral-small-latest"

YT_API_KEY = userdata.get('YT_API_KEY')

# Libraries

In [None]:
!pip install PyMuPDF
!pip install wikipedia
!pip install faiss-cpu
#!pip install faiss-gpu
!pip install mistralai
!pip install groq
!pip install youtube-transcript-api
!pip install --upgrade google-api-python-client

import numpy as np
import re
import fitz
import wikipedia
import faiss

from pathlib import Path

#LLM import
from google import genai
from google.genai import types
from mistralai import Mistral
from groq import Groq

#Youtube transcripts
from youtube_transcript_api import YouTubeTranscriptApi
from googleapiclient.discovery import build

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")
#model = SentenceTransformer("intfloat/multilingual-e5-small")    #pour celui-ci, créer des embeddings sous forme [query : , passage : ]
#model = SentenceTransformer("google/embeddinggemma-300m")

# Dataset

In [None]:
def clean_text(text: str) -> str:
    text = re.sub(r"http[s]?://\S+", "", text)
    text = re.sub(r"\s*\n\s*", " ", text)
    text = re.sub(r"\s{2,}", " ", text)
    return text.strip()

In [None]:
dataset = []
folder = Path(path)

##TXT files

In [None]:
for file in folder.glob('*.txt'):
    with open(file) as file:
        print(f"Opening {file.name}")
        txt = file.readlines()
        print(f'Loaded {len(txt)} entries')
        for i in range(len(txt)):
            txt[i] = clean_text(txt[i])

        # Group sentences into chunks of 10
        for i in range(0, len(txt), 10):
            chunk = " ".join(txt[i:i+10])
            dataset.append(chunk)

#print(dataset[:12])
#print(len(dataset))
#print(dataset)


##PDF files

In [None]:
for file in folder.glob('*.pdf'):
    doc = fitz.open(file)
    full_text = ""
    for page in doc:
        full_text += page.get_text()

    full_text_clean = clean_text(full_text)

    # Split into sentences
    sentences = re.split(r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s", full_text_clean)

    # Group sentences into chunks of 10
    for i in range(0, len(sentences), 10):
        chunk = " ".join(sentences[i:i+10])
        dataset.append(chunk)


##Wikipedia pages

In [None]:
wikipedia.set_lang("en")
paragraphs = []

# Excluded sections
excluded = {"see also", "notes", "references", "sources"}

for title in wikipedia_page_titles :
    try :
        page = wikipedia.page(title, auto_suggest=False)
    except Exception as e:
        print(f"The page {title} doesn't exist, check the title. The process doesn't include it.")
        continue

    # Section splitting
    sections = re.split(r"\n==+ (.*?) ==+\n", page.content)

    structured_sections = []
    for i in range(1, len(sections), 2):
        title = sections[i].strip()
        content = sections[i+1].strip()

        if title.lower() not in excluded and content:
            structured_sections.append((title, content))

    # Paragraph extraction
    for title, content in structured_sections:
        for para in content.split("\n\n"):
            if para:
                para = clean_text(para)
                paragraphs.append(title + ": " + para)

dataset.extend(paragraphs)

##Youtube Videos

In [None]:
def search_youtube_video(query, max_results=3):
    youtube = build('youtube', 'v3', developerKey=YT_API_KEY)
    request = youtube.search().list(
        q=query,
        part="id",
        maxResults=max_results,
        type="video"
    )
    response = request.execute()
    video_IDs = []
    for item in response.get("items", []):
        print(item)
        if item["id"]["kind"] == "youtube#video":
            video_IDs.append(item["id"]["videoId"])
    return video_IDs

In [None]:
for title in youtube_video_titles :
    video_IDs = search_youtube_video(title)
    for video_ID in video_IDs :
      yt_api = YouTubeTranscriptApi()
      try :
        transcript = yt_api.fetch(video_id=video_ID, languages=['en'])
      except Exception as e:
        print(f"Subtitles not available")
        continue
      string =""
      for snippet in transcript:
        string += snippet.text + " "
      sentences = re.split(r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s", string)
      # Group sentences into chunks of 10
      for i in range(0, len(sentences), 10):
        chunk = " ".join(sentences[i:i+10])
        dataset.append(chunk)


#Manual Processing (Python list and for-loop)

##Vector DB

In [None]:
VECTOR_DB = []

def add_chunk_to_database(chunk):
  embedding = model.encode(chunk)
  VECTOR_DB.append((chunk, embedding))

##Distance Functions

###Cosine similarity

In [None]:
def cosine_similarity(a,b) :
  return np.dot(a,b) / (np.linalg.norm(a) * np.linalg.norm(b))

##Retrieval function

In [None]:
def retrieve(query, top_n=3) :
  query_embedding = model.encode(query)
  similarities = []
  for chunk, embedding in VECTOR_DB :
    similarity = cosine_similarity(query_embedding, embedding)
    similarities.append((chunk,similarity))
  similarities.sort(key=lambda x: x[1], reverse=True)
  return similarities[:top_n]


#FAISS processing

In [None]:
if MODE == "FAISS" :
    d = model.get_sentence_embedding_dimension()  #embeddings dimension
    index = faiss.IndexFlatIP(d)

    VECTOR_DB = []

    def add_chunk_to_database(chunk):
        embedding = model.encode(chunk)
        embedding = embedding / np.linalg.norm(embedding)  # normalization for cosinus
        embedding = np.array([embedding]).astype("float32")
        index.add(embedding)
        VECTOR_DB.append(chunk)

    def retrieve(query, top_n=3):
        query_embedding = model.encode(query)
        query_embedding = query_embedding / np.linalg.norm(query_embedding)
        query_embedding = np.array([query_embedding]).astype("float32")

        D, I = index.search(query_embedding, top_n)
        results = [(VECTOR_DB[idx], float(D[0][j])) for j, idx in enumerate(I[0])]
        return results

#Chunk to database

In [None]:
for i, chunk in enumerate(dataset) :
  add_chunk_to_database(chunk)
  if (i+1) % 10 == 0:
    print(f'Added chunk {i+1}/{len(dataset)} to the database')

print("All chunks successfully added")

#Generation

In [None]:
input_query = input('Ask a question:')
retrieved_knowledge = retrieve(input_query,9)
print('Retrieved knowledge:')
for chunk, similarity in retrieved_knowledge:
  print(f' - (similarity: {similarity:.2f}) {chunk}')

instruction_prompt = f'''You're a helpful assistant. Use only the following pieces of context to answer the question. Don't make up any new information:
{'\n'.join([f' - {chunk}' for chunk, similarity in retrieved_knowledge])}
'''

if ai_type == "gemini" :
    client = genai.Client(api_key=API_KEY)
    response = client.models.generate_content_stream(model=ai_model, config=types.GenerateContentConfig(system_instruction=instruction_prompt), contents=input_query)
    print("\n\n")
    for chunk in response:
        print(chunk.text, end="")

elif ai_type == "mistral" :
    client = Mistral(api_key=API_KEY)
    response = client.chat.stream(model=ai_model,messages = [{"role":"system","content":instruction_prompt},{"role":"user","content":input_query}])
    print("\n\n")
    for chunk in response :
        print(chunk.data.choices[0].delta.content, end="")

elif ai_type == "groq" :
    client = Groq(api_key=API_KEY)
    response = client.chat.completions.create(model=ai_model,messages = [{"role":"system","content":instruction_prompt},{"role":"user","content":input_query}],stream=True)
    print("\n\n")
    for chunk in response:
        print(chunk.choices[0].delta.content, end="")



Ask a question:Which war was the most murderous ?
Retrieved knowledge:
 - (similarity: 0.48) Casualties and war crimes: In Asia and the Pacific, the number of people killed by Japanese troops remains contested. According to R.J. Rummel, the Japanese killed between 3 million and more than 10 million people, with the most probable case of almost 6,000,000 people. According to the British historian M. R. D. Foot, civilian deaths are between 10 million and 20 million, whereas Chinese military casualties (killed and wounded) are estimated to be over five million. Other estimates say that up to 30 million people, most of them civilians, were killed. The most infamous Japanese atrocity was the Nanjing Massacre, in which fifty to three hundred thousand Chinese civilians were raped and murdered. Mitsuyoshi Himeta reported that 2.47 million casualties occurred during the Three Alls policy. General Yasuji Okamura implemented the policy in Hebei and Shandong. Axis forces employed biological and ch