In [1]:
from dotenv import load_dotenv
import os
import openai

# Charger les variables d'environnement
load_dotenv()

# Config OpenAI via Azure
openai.api_type = "azure"
openai.api_key = os.getenv("OPENAI_API_KEY")
openai.api_base = os.getenv("OPENAI_API_BASE")
openai.api_version = os.getenv("OPENAI_API_VERSION")

DEPLOYMENT_NAME = os.getenv("AZURE_MODEL_DEPLOYMENT")
EMBEDDING_NAME = os.getenv("AZURE_EMBEDDING_DEPLOYMENT")

In [2]:
from azure.storage.blob import BlobServiceClient
from io import BytesIO

BLOB_CONTAINER = "nolan-rag-files"
blob_service_client = BlobServiceClient.from_connection_string(os.getenv("AZURE_STORAGE_CONNECTION_STRING"))
container_client = blob_service_client.get_container_client(BLOB_CONTAINER)

In [3]:
response = openai.ChatCompletion.create(
    engine=DEPLOYMENT_NAME,
    messages=[{"role": "user", "content": "dis 'ok'"}],
    temperature=0,
    max_tokens=2,
)
print(response.choices[0].message.content)

"OK


In [4]:
from pathlib import Path
from docx import Document
import pandas as pd
import fitz  # pymupdf

def extract_text_from_file(file_path: Path) -> str:
    ext = file_path.suffix.lower()
    try:
        if ext == ".txt":
            return file_path.read_text(encoding="utf-8", errors="ignore")
        elif ext == ".pdf":
            doc = fitz.open(str(file_path))
            return "\n".join(page.get_text() for page in doc)
        elif ext == ".docx":
            doc = Document(str(file_path))
            return "\n".join(p.text for p in doc.paragraphs)
        elif ext == ".csv":
            try:
                df = pd.read_csv(file_path, encoding="utf-8")
            except UnicodeDecodeError:
                df = pd.read_csv(file_path, encoding="latin1")
            return df.to_string(index=False)

        else:
            print(f"[!] Format non supporté : {file_path.name}")
            return ""
    except Exception as e:
        print(f"[!] Erreur lecture {file_path.name} : {e}")
        return ""

def load_all_files_from_blob() -> list[str]:
    docs = []
    for blob in container_client.list_blobs():
        print(f"[...] Extraction : {blob.name}")
        try:
            downloader = container_client.download_blob(blob.name)
            data = BytesIO(downloader.readall())
            ext = Path(blob.name).suffix.lower()

            if ext == ".txt":
                content = data.read().decode("utf-8", errors="ignore")
            elif ext == ".pdf":
                doc = fitz.open(stream=data, filetype="pdf")
                content = "\n".join(page.get_text() for page in doc)
            elif ext == ".docx":
                doc = Document(data)
                content = "\n".join(p.text for p in doc.paragraphs)
            elif ext == ".csv":
                try:
                    df = pd.read_csv(data, encoding="utf-8")
                except UnicodeDecodeError:
                    data.seek(0)
                    df = pd.read_csv(data, encoding="latin1")
                content = df.to_string(index=False)
            else:
                print(f"[!] Format non supporté : {blob.name}")
                continue

            if content.strip():
                docs.append(content)
            else:
                print(f"[!] Aucun texte dans : {blob.name}")
        except Exception as e:
            print(f"[!] Erreur sur {blob.name} : {e}")
    return docs


# Chunks + RAG

In [5]:
def chunk_text(text, max_tokens=500):
    import tiktoken
    enc = tiktoken.get_encoding("cl100k_base")
    paragraphs = text.split("\n\n")
    chunks = []
    current = ""
    for para in paragraphs:
        if len(enc.encode(current + para)) > max_tokens:
            chunks.append(current.strip())
            current = para
        else:
            current += "\n\n" + para
    chunks.append(current.strip())
    return chunks

In [6]:
texts = load_all_files_from_blob()
chunks = []
for text in texts:
    chunks.extend(chunk_text(text))

print(f"Nombre total de chunks : {len(chunks)}")

[...] Extraction : CONTRAT DE TRAVAIL À DURÉE INDÉTERMINÉE.docx
[...] Extraction : Convention_collective_syntec.pdf
[...] Extraction : Fiche de paie.pdf
[...] Extraction : Indemnités et primes de transport.docx
[...] Extraction : Règlement Intérieur de l’Entreprise (Contoso) et Accord d’Entreprise sur le Temps de Travail et les RTT.pdf
[...] Extraction : hr_dataset_fr_new.csv
Nombre total de chunks : 69


In [7]:
import tiktoken
import time
from openai.error import RateLimitError

enc = tiktoken.get_encoding("cl100k_base")
MAX_EMBED_TOKENS = 8192  # Ajuste selon la limite de ton modèle

def truncate_text_to_max_tokens(text, max_tokens=MAX_EMBED_TOKENS):
    tokens = enc.encode(text)
    if len(tokens) > max_tokens:
        tokens = tokens[:max_tokens]
    return enc.decode(tokens)

def safe_embed(text):
    for _ in range(3):  # 3 tentatives max en cas de RateLimitError
        try:
            return openai.Embedding.create(input=text, engine=EMBEDDING_NAME)
        except RateLimitError:
            print("Quota atteint, pause 60s...")
            time.sleep(60)
    raise RuntimeError("Échec embedding après 3 essais")

chunk_vectors = []
for chunk in chunks:
    short_chunk = truncate_text_to_max_tokens(chunk, MAX_EMBED_TOKENS)
    res = safe_embed(short_chunk)
    vec = res["data"][0]["embedding"]
    chunk_vectors.append((chunk, vec))  # stocker chunk original pour contexte


In [8]:
import numpy as np

def cosine_sim(v1, v2):
    a, b = np.array(v1), np.array(v2)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def retrieve_relevant_chunks(question, top_k=3):
    q_vec = openai.Embedding.create(input=question, engine=EMBEDDING_NAME)["data"][0]["embedding"]
    ranked = sorted(chunk_vectors, key=lambda x: cosine_sim(x[1], q_vec), reverse=True)
    return [c[0] for c in ranked[:top_k]]

In [9]:
def build_prompt(chunks, question):
    context = "\n\n".join(chunks)
    return f"""
Tu es un assistant professionnel. Réponds strictement à la question en utilisant uniquement les données fournies.

NE PAS INVENTER. Ne pas générer d'informations personnelles. Aucun contenu sensible.

CONNAISSANCES :
{context}

QUESTION :
{question}

RÉPONSE :
"""

In [10]:
def ask_rag(question):
    top_chunks = retrieve_relevant_chunks(question)
    prompt = build_prompt(top_chunks, question)
    response = openai.ChatCompletion.create(
        engine=DEPLOYMENT_NAME,
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )
    return response.choices[0].message.content.strip()

### Poser questions au modèle RAG

In [11]:
ask_rag("Quelles sont les modalités de validation et de prise des jours de RTT pour les cadres en forfait jours chez Contoso SAS ?")

'Les modalités de validation et de prise des jours de RTT pour les cadres en forfait jours chez Contoso SAS sont les suivantes :  \n- Les jours de RTT doivent être posés via le portail RH, avec un délai de préavis de 7 jours minimum, sauf cas exceptionnels.  \n- La validation des RTT est soumise à l’accord du manager, qui s’assure de la continuité des activités.  \n- Les RTT doivent être pris avant le 31 décembre de l’année en cours. Aucun report ou indemnisation n’est prévu pour les jours non pris.'

In [14]:
import pandas as pd

df = pd.read_csv("docs/hr_dataset_fr_new.csv", encoding="latin1", on_bad_lines="skip")
print(df.columns.tolist())


['id;nom;email;departement;Manager;poste;responsable;date_embauche;conges.droit_annuel;conges.utilises;conges.planifies;conges.restants;conges_maladie.droit;conges_maladie.utilises;conges_maladie.restants;remuneration.salaire;remuneration.eligible_prime;remuneration.date_prochaine_evaluation;avantages.regime_sante']
