In [2]:
import logging
import os


def setup_logging():
    # Ensure logs directory exists
    os.makedirs("logs", exist_ok=True)

    # Configure root logger
    logging.basicConfig(
        filename="logs/data.log",
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    )

    # Configure specific loggers
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("openai").setLevel(logging.WARNING)

    # Add console handler if needed (useful during development)
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter("%(levelname)s - %(message)s")
    console.setFormatter(formatter)
    logging.getLogger("").addHandler(console)


def get_logger(name):
    """
    Returns a configured logger with the given name.

    Args:
        name (str): Usually __name__ of the calling module

    Returns:
        logging.Logger: Configured logger instance
    """
    return logging.getLogger(name)

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# PostgreSQL configuration
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5433")
POSTGRES_DB = os.getenv("POSTGRES_DB", "albert_data")
POSTGRES_USER = os.getenv("POSTGRES_USER", "user")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "password")

# Paths for configurations and data history
config_file_path = "config/data_config.json"
data_history_path = "config/data_history.json"

# Export folders
parquet_files_folder = "data/parquet"

# Data folders
CNIL_DATA_FOLDER = os.getenv("CNIL_DATA_FOLDER", "data/unprocessed/cnil")
CONSTIT_DATA_FOLDER = os.getenv("CONSTIT_DATA_FOLDER", "data/unprocessed/constit")
LOCAL_DIRECTORY_FOLDER = os.getenv(
    "LOCAL_DIRECTORY_FOLDER", "data/unprocessed/local_directory"
)
NATIONAL_DIRECTORY_FOLDER = os.getenv(
    "NATIONAL_DIRECTORY_FOLDER", "data/unprocessed/national_directory"
)
DOLE_DATA_FOLDER = os.getenv("DOLE_DATA_FOLDER", "data/unprocessed/dole")
LEGI_DATA_FOLDER = os.getenv("LEGI_DATA_FOLDER", "data/unprocessed/legi")
TRAVAIL_DATA_FOLDER = os.getenv(
    "TRAVAIL_DATA_FOLDER", "data/unprocessed/travail"
)
SERVICES_PUBLIC_PRO_DATA_FOLDER = os.getenv(
    "SERVICES_PUBLIC_PRO_DATA_FOLDER",
    "data/unprocessed/services_public_pro",
)
SERVICES_PUBLIC_PART_DATA_FOLDER = os.getenv(
    "SERVICES_PUBLIC_PART_DATA_FOLDER",
    "data/unprocessed/
)


# Pyalbert configuration
SHEET_SOURCES = ["service-public", "travail-emploi"]


# OpenAI API configuration
API_URL = os.getenv("API_URL", "https://albert.api.staging.etalab.gouv.fr/v1")
API_KEY = os.getenv("API_KEY", "your_api_key_here")


In [None]:
import hashlib
import json
import os
import string
import unicodedata
from collections import defaultdict

import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
from bs4.element import NavigableString, Tag
from langchain_text_splitters import RecursiveCharacterTextSplitter

logger = get_logger(__name__)

### Imported functions from the pyalbert library

# *********
# * Utils *
# *********


def normalize(text: str) -> str:
    # Like removing non-breaking space in latin-1 (\xa0)
    return unicodedata.normalize("NFKC", text)


def get_text(obj):
    if isinstance(obj, NavigableString):
        t = obj.string.strip()
    else:
        t = obj.get_text(" ", strip=True)
    return normalize(t)


def extract(soup: Tag, tag: str, pop=True, recursive=True) -> str:
    t = soup.find(tag, recursive=recursive)
    if not t:
        return ""

    if pop:
        t.extract()

    return get_text(t)


def extract_all(soup: Tag, tag: str, pop=True) -> list[str]:
    if not soup.find(tag):
        return []

    elts = []
    for t in soup.find_all(tag):
        if pop:
            t.extract()
        elts.append(get_text(t))

    return elts


# ***************
# * Sheet parsing *
# ***************


def _get_xml_files(path):
    xml_files = []

    if os.path.isfile(path):
        # Use to test result on a particular file
        xml_files = [path]
    else:
        for root, _, files in os.walk(path):
            for file in files:
                if file.endswith(".xml") and file.startswith(("N", "F")):
                    # Keep only "fiches pratiques", "fiches questions-réponses",
                    # "fiches thème", "fiches dossier".
                    fullpath = os.path.join(root, file)
                    xml_files.append(fullpath)
    return sorted(xml_files)


def _get_metadata(soup):
    url = ""
    if soup.find("Publication") is not None:
        if "spUrl" in soup.find("Publication").attrs:
            url = soup.find("Publication")["spUrl"]

    doc = {
        "url": url,
        "audience": ", ".join(extract_all(soup, "Audience")),
        "theme": ", ".join(extract_all(soup, "Theme")),
        "surtitre": extract(soup, "SurTitre"),
        "subject": extract(soup, "dc:subject"),
        "title": extract(soup, "dc:title"),
        "description": extract(soup, "dc:description"),
        "introduction": extract(soup, "Introduction"),
    }
    if not doc["introduction"]:
        doc["introduction"] = doc["description"]

    return doc


def _table_to_md(soup):
    # FIXME: multi-column header are lost
    #        e.g.: https://entreprendre.service-public.fr/vosdroits/F3094
    md_table = ""

    # Headers
    for row in soup.find_all("Rangée", {"type": "header"}):
        md_table += "| "
        for cell in row.find_all("Cellule"):
            md_table += get_text(cell) + " | "
        md_table += "\n"

        # Separator
        md_table += "| "
        for _ in row.find_all("Cellule"):
            md_table += "- | "
        md_table += "\n"

    # Rows
    for row in soup.find_all("Rangée", {"type": "normal"}):
        md_table += "| "
        for cell in row.find_all("Cellule"):
            md_table += get_text(cell) + " | "
        md_table += "\n"

    return md_table


def _not_punctuation(s):
    return s.strip()[-1:] not in string.punctuation


def _parse_xml_text_structured(
    current: list[str], context: list[str], soups: list[Tag], recurse=True, depth=0
) -> list[dict]:
    """
    Separate text on Situation and Chapitre.
    Keep the contexts (the history of titles) while iterating.

    Args:
        current: the current text of the cursor, that will be joined to a string
        context: the current history of title (Titre)
        soups: the tree cursor
        recurse: continue to split chunks

    Returns: a list of {text, context}
    """
    # TODO:
    #  - could probably be optimized by not extracting tag, just reading it; see extract()
    #  - <TitreAlternatif> can be present next to title
    #  - extract text and reference law (legifrance.fr); see <Reference>
    state = []

    for part in soups:
        if isinstance(part, NavigableString):
            current.append(get_text(part))
            # New chunk
            state.append({"text": current, "context": context})
            current = []
            continue

        for child in part.children:
            # Purge current
            current = [c for c in current if c]

            if isinstance(child, NavigableString):
                current.append(get_text(child))
                continue

            if child.name in ("Situation", "Chapitre", "SousChapitre") and recurse:
                if child.name in ("Situation", "Chapitre"):
                    new_recurse = True
                else:  # "SousChapitre"
                    new_recurse = False

                # New chunk
                if current:
                    state.append({"text": current, "context": context})
                    current = []

                title = extract(child, "Titre", recursive=False)
                if title:
                    new_context = context + [title]
                else:
                    new_context = context

                s = _parse_xml_text_structured(
                    [], new_context, [child], recurse=new_recurse, depth=depth + 1
                )
                state.extend(s)

            elif child.name in ("BlocCas", "Liste"):
                scn = "Cas" if child.name == "BlocCas" else "Item"
                blocs = "\n"
                for subchild in child.children:
                    if subchild.name != scn:
                        if subchild.string.strip():
                            print(f"XML warning: {child.name} has orphan text")
                        continue

                    if child.name == "BlocCas":
                        title = extract(subchild, "Titre", recursive=False)
                    s = _parse_xml_text_structured(
                        [], context, [subchild], recurse=False, depth=depth + 1
                    )
                    content = " ".join([" ".join(x["text"]) for x in s]).strip()

                    if child.name == "BlocCas":
                        blocs += f"Cas {title}: {content}\n"
                    else:  # Liste
                        blocs += f"- {content}\n"

                current.append(blocs)

            elif child.name == "Tableau":
                title = extract(child, "Titre", recursive=False)
                sep = ":" if _not_punctuation(title) else ""
                table = _table_to_md(child)
                table = f"\n{title}{sep}\n{table}"
                current.append(table)

            elif child.name in ("ANoter", "ASavoir", "Attention", "Rappel"):
                title = extract(child, "Titre", recursive=False)
                sep = ": " if title else ""
                current.append(f"({title}{sep}{get_text(child)})\n")

            elif child.name == "Titre":
                # Format title inside chunks (sub-sub sections etc)
                title = get_text(child)
                sep = ":" if _not_punctuation(title) else ""
                current.append(f"{title}{sep}")

            else:
                # Space joins
                s = _parse_xml_text_structured(
                    current, context, [child], recurse=recurse, depth=depth + 1
                )
                current = []
                sub_state = []
                for x in s:
                    if len(x["context"]) == len(context):
                        # Same context
                        if child.name in (
                            "Chapitre",
                            "SousChapitre",
                            "Cas",
                            "Situation",
                        ):
                            # Add a separator
                            x["text"][-1] += "\n"
                        elif (
                            child.name in ("Paragraphe",)
                            and child.parent.name not in ("Item", "Cellule")
                            and _not_punctuation(x["text"][-1])
                        ):
                            # Title !
                            x["text"][-1] += ":"

                        current.extend(x["text"])
                    else:
                        # New chunk
                        sub_state.append(x)

                if sub_state:
                    state.append({"text": current, "context": context})
                    current = []
                    state.extend(sub_state)

        # New chunk
        if current:
            state.append({"text": current, "context": context})
            current = []

    if depth == 0:
        state = [d for d in state if "".join(d["text"])]
        punctuations = (".", ",", ";")
        for d in state:
            texts = ""
            for i, x in enumerate(d["text"]):
                if not x:
                    continue

                if i > 0 and x.startswith(punctuations):
                    # Stretch join / fix punctuations extra space
                    texts += x
                elif x.startswith("\n") or texts.endswith("\n"):
                    # Strech join / do not surround newline with space
                    texts += x
                else:
                    # Space join
                    texts += " " + x

            d["text"] = texts.strip()

    return state


def _parse_xml_text(xml_file, structured=False) -> dict:
    with open(xml_file, mode="r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "xml")

    doc = _get_metadata(soup)
    doc["sid"] = doc["url"].split("/")[-1]
    doc["source"] = "service-public"

    # Clean document / Remove potential noise
    extract_all(soup, "OuSAdresser")
    extract_all(soup, "RefActualite")

    def drop_duplicates(data: list[dict], k: str):
        seen = []
        keeps = []
        for x in data:
            if x[k] in seen:
                continue

            keeps.append(x)
            seen.append(x[k])

        return keeps

    def sp_url_encoder(sid, audience):
        audience_to_uri = {
            "Associations": "associations",
            "Particuliers": "particuliers",
            "Professionnels": "professionnels-entreprises",
        }
        # Do not fail silently
        audience_uri = audience_to_uri[audience]
        return f"https://www.service-public.fr/{audience_uri}/vosdroits/{sid}"

    # Get related questions
    questions = [
        {
            "question": get_text(q),
            "sid": q["ID"],
            "url": sp_url_encoder(q["ID"], q["audience"]),
        }
        for q in soup.find_all("QuestionReponse")
    ]
    questions = drop_duplicates(questions, "question")
    doc["related_questions"] = questions

    # Get the Service/contact ressources
    web_services = []
    for q in soup.find_all("ServiceEnLigne"):
        if not q.get("URL"):
            continue

        title_tag = q.find("Titre")
        source_tag = q.find("Source")

        if not title_tag:
            continue

        service = {
            "title": normalize(title_tag.get_text(" ", strip=True)),
            "institution": normalize(source_tag.get_text(" ", strip=True))
            if source_tag
            else "",
            "url": q["URL"],
            "type": q["type"],
        }
        web_services.append(service)

    web_services = drop_duplicates(web_services, "title")
    doc["web_services"] = web_services

    # Clean document / Remove potential noise
    extract_all(soup, "OuSAdresser")
    extract_all(soup, "ServiceEnLigne")
    extract_all(soup, "QuestionReponse")
    extract_all(soup, "RefActualite")

    # Get all textual content
    # --
    # Introduction
    current = [doc["introduction"]]
    if structured:
        # Save sections for later (de-duplicate keeping order)
        sections = [
            get_text(a.Titre)
            for a in soup.find_all("Chapitre")
            if a.Titre and not a.find_parent("Chapitre")
        ]
        sections = list(dict.fromkeys(sections))

        context = []
        top_list = []
        for x in soup.find_all("Publication"):
            for obj in x.children:
                if obj.name in ("Texte", "ListeSituations"):
                    top_list.append(obj)
        texts = _parse_xml_text_structured(current, context, top_list)

        if texts and sections:
            # Add all sections title at the end of the introduction
            sections = "\n".join(f"- {section}" for section in sections)
            sections = (
                "\n\nVoici une liste de différentes questions ou thématiques relatives à ce sujet :\n"
                + sections
            )
            texts[0]["text"] += sections

    else:
        if soup.find("ListeSituations") is not None:
            current.append("Liste des situations :")
            for i, situation in enumerate(
                soup.find("ListeSituations").find_all("Situation")
            ):
                if not situation.find("Titre"):
                    print("warning: Situation > Titre, not found")
                    continue

                if not situation.find("Texte"):
                    print("warning: Situation > Texte, not found")
                    continue

                situation_title = normalize(
                    situation.find("Titre").get_text(" ", strip=True)
                )
                situation_texte = normalize(
                    situation.find("Texte").get_text(" ", strip=True)
                )
                current.append(f"Cas n°{i + 1} : {situation_title} : {situation_texte}")

        if soup.find("Publication") is not None:
            t = soup.find("Publication").find("Texte", recursive=False)
            if t is not None:
                current.append(normalize(t.get_text(" ", strip=True)))

        texts = [" ".join(current)]

    doc["text"] = texts
    return doc


def _parse_xml_questions(xml_file: str) -> list[dict]:
    with open(xml_file, mode="r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "xml")

    docs = []
    tags = (
        ("QuestionReponse", lambda x: get_text(x)),
        ("CommentFaireSi", lambda x: f"Comment faire si {get_text(x)} ?"),
    )
    for tag, f in tags:
        for t in soup.find_all(tag):
            audience = t["audience"].lower()
            doc = {
                "question": f(t),
                "url": f"https://www.service-public.fr/{audience}/vosdroits/{t['ID']}",
                "tag": tag,
            }
            docs.append(doc)

    return docs


def _parse_xml(
    target_dir: str, parse_type: str, structured: bool = False
) -> list[dict]:
    if parse_type not in ("text", "questions"):
        raise ValueError()

    if not os.path.exists(target_dir):
        raise FileNotFoundError(f"path {target_dir} to xml sheets not found.")

    xml_files = _get_xml_files(target_dir)

    docs = []
    current_pct = 0
    n = len(xml_files)
    for i, xml_file in enumerate(xml_files):
        pct = (100 * i) // n
        if pct > current_pct:
            current_pct = pct
            print(f"Processing sheet: {current_pct}%\r", end="")

        if parse_type == "text":
            doc = _parse_xml_text(xml_file, structured=structured)
            if doc:
                docs.append(doc)
        elif parse_type == "questions":
            _docs = _parse_xml_questions(xml_file)
            docs.extend(_docs)

    return docs


def _parse_travailEmploi(target_dir: str, structured: bool = False) -> list[dict]:
    with open(os.path.join(target_dir)) as f:
        data = json.load(f)

    if structured:

        def join_sections(sections):
            texts = []
            for section in sections:
                texts.append(
                    {
                        "text": normalize(section["text"]),
                        "context": [normalize(section["title"])],
                    }
                )
            return texts

    else:

        def join_sections(sections):
            text = ""
            for section in sections:
                text += normalize(f"{section['title']}\n\n{section['text']}")

            return [text]

    docs = []
    for doc in data:
        sheet = {
            "title": normalize(doc["title"]),
            "url": doc["url"],
            "date": doc["date"],
            "sid": doc["pubId"],
            "introduction": get_text(BeautifulSoup(doc["intro"], "html.parser")),
            "text": join_sections(doc["sections"]),
            "surtitre": "Travail-Emploi",
            "source": "travail-emploi",
        }

        docs.append(sheet)

    return docs


class RagSource:
    # At this point a sheet is an hybrid dict data structure with with only a set of mandatory fields:
    # - "sid" -> unique identifier
    # - "title -> sheet title
    # - "text" -> main payload
    # - "context" -> successive subtitle (if structured=True)
    # - "source" -> The source of the sheet (service-public, vie-publique, legifrance, etc)
    # - "url" -> URL of the source
    # Depending on the source, they can have many more attribute...

    @classmethod
    def is_valid(cls, source):
        return source in cls.__dict__.values()

    @classmethod
    def get_sheets(cls, storage_dir: str | None, structured: bool = False):
        if not storage_dir:
            raise ValueError("You must give a storage directory.")

        # if isinstance(sources, str):
        #     sources = [sources]

        # for source in sources:
        #     if not cls.is_valid(source):
        #         raise ValueError("This RAG source is not known: %s" % source)

        sheets = []
        if (
            storage_dir == SERVICES_PUBLIC_PRO_DATA_FOLDER
            or storage_dir == SERVICES_PUBLIC_PART_DATA_FOLDER
        ):
            # storage_dir: the base path where files are gonna be written.
            # target_dir: read-only base path where sheets are read.
            target_dir = f"{storage_dir}/{storage_dir.split('/')[-1]}"
            sheets.extend(_parse_xml(target_dir, "text", structured=structured))
        elif storage_dir == 
            base_name = os.path.basename(storage_dir)
            target_dir = next(
                (
                    os.path.join(storage_dir, f)
                    for f in os.listdir(storage_dir)
                    if f.startswith(base_name)
                ),
                None,
            )
            if target_dir is None:
                logger.error(
                    f"No file starting with '{base_name}' found in {storage_dir}"
                )
                raise FileNotFoundError()
            sheets.extend(_parse_travailEmploi(target_dir, structured=structured))
        else:
            raise NotImplementedError("Rag source unknown")

        # Remove duplicate
        sids = [x["sid"] for x in sheets]
        seen = set()
        to_remove = [i for i, sid in enumerate(sids) if sid in seen or seen.add(sid)]
        n_dup = len(to_remove)
        if n_dup > 0:
            logger.info(f"Dropping {n_dup} duplicated sheets")
            logger.info([sheets[i]["sid"] for i in to_remove])
        for ix in sorted(to_remove, reverse=True):
            sheets.pop(ix)

        return sheets


In [None]:
from openai import OpenAI
import pandas as pd
import numpy as np
import os
import json
import hashlib
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import Optional
from collections import defaultdict
import re
from abc import ABC, abstractmethod
from typing import Generator
from tqdm import tqdm


logger = get_logger(__name__)


class CorpusHandler(ABC):
    def __init__(self, name, corpus):
        self._name = name
        self._corpus = corpus

    @classmethod
    def create_handler(cls, corpus_name: str, corpus: list[dict]) -> "CorpusHandler":
        """Get the appropriate handler subclass from the string corpus name."""
        corpuses = {
            # "spp_experiences": SppExperiencesHandler, # Legacy from pyalbert, unused here
            TRAVAIL_DATA_FOLDER.split("/")[-1]: SheetChunksHandler,
            SERVICES_PUBLIC_PRO_DATA_FOLDER.split("/")[-1]: SheetChunksHandler,
            C_PART_DATA_FOLDER.split("/")[-1]: SheetChunksHandler,
        }
        if corpus_name not in corpuses:
            raise ValueError(f"Corpus '{corpus_name}' is not recognized")
        return corpuses[corpus_name](corpus_name, corpus)

    def iter_docs(
        self, batch_size: int, desc: str = None
    ) -> Generator[list, None, None]:
        if not desc:
            desc = f"Processing corpus: {self._name}..."

        corpus = self._corpus
        num_chunks = len(corpus) // batch_size
        if len(corpus) % batch_size != 0:
            num_chunks += 1

        for i in tqdm(range(num_chunks), desc=desc):
            start_idx = i * batch_size
            end_idx = min(start_idx + batch_size, len(corpus))
            yield corpus[start_idx:end_idx]

    def iter_docs_embeddings(
        self, batch_size: int
    ) -> Generator[tuple[list], None, None]:
        desc = f"Processing corpus {self._name} with embeddings..."
        for batch in self.iter_docs(batch_size=batch_size, desc=desc):
            batch_embeddings = generate_embeddings(
                [self.doc_to_chunk(x) for x in batch]
            )
            if len([x for x in batch_embeddings if x is not None]) == 0:
                continue
            yield batch, batch_embeddings

    @abstractmethod
    def doc_to_chunk(self, doc: dict) -> str:
        raise NotImplementedError("Subclasses should implement this!")


# class SppExperiencesHandler(CorpusHandler):
#     def doc_to_chunk(self, doc: dict) -> str | None:
#         text = doc["reponse_structure_1"]
#         if not text:
#             return None
#         # Clean some text garbage
#         # --
#         # Define regular expression pattern to match non-breaking spaces:
#         # - \xa0 for Latin-1 (as a raw string)
#         # - \u00a0 for Unicode non-breaking space
#         # - \r carriage return
#         # - &nbsp; html non breaking space
#         text = re.sub(r"[\xa0\u00a0\r]", " ", text)
#         text = re.sub(r"&nbsp;", " ", text)

#         # Add a space after the first "," if not already followed by a space.
#         text = re.sub(r"\,(?!\s)", ". ", text, count=1)
#         return text


class SheetChunksHandler(CorpusHandler):
    def doc_to_chunk(self, doc: dict) -> str | None:
        context = ""
        if "context" in doc:
            context = "  ( > ".join(doc["context"]) + ")"

        text = "\n".join([doc["title"] + context, doc["introduction"], doc["text"]])
        return text


# def embed(data: None | str | list[str]) -> None | list:
#     if data is None:
#         return None

#     if isinstance(data, list):
#         # Keep track of None positions
#         indices_of_none = [i for i, x in enumerate(data) if x is None]
#         filtered_data = [x for x in data if x is not None]
#         if not filtered_data:
#             return [None] * len(data)

#         # Apply the original function on filtered data
#         try:
#             embeddings = LlmClient.create_embeddings(filtered_data)
#         except Exception as err:
#             print(filtered_data)
#             raise err

#         # Reinsert None at their original positions in reverse order
#         for index in reversed(indices_of_none):
#             embeddings.insert(index, None)

#         return embeddings

#     # Fall back to single data input
#     return LlmClient.create_embeddings(data)


def generate_embeddings(data: str | list[str], model: str = "BAAI/bge-m3"):
    """
    Generates embeddings for a given text using a specified model.

    Args:
        data (str or list[str]): The input to generate embeddings for.
        model (str, optional): The model identifier to use for generating embeddings. Defaults to "BAAI/bge-m3".

    Returns:
        list[float]: The embedding vector for the input text.

    Raises:
        Any exceptions raised by the OpenAI client during the embedding generation process.

    Note:
        Requires properly configured API_URL and API_KEY for the OpenAI client.
    """
    client_openai = OpenAI(base_url=API_URL, api_key=API_KEY)
    vector = client_openai.embeddings.create(
        input=data, model=model, encoding_format="float"
    )
    embeddings = [item.embedding for item in vector.data]

    # if isinstance(data, str):
    #     return embeddings[0]
    # elif isinstance(data, list) and len(data) == 1:
    #     return embeddings[0]
    # else:
    #     return embeddings
    return embeddings


def make_chunks(text: str, chunk_size: int = 1500, chunk_overlap: int = 200):
    """
    Splits the input text into overlapping chunks using a recursive character-based text splitter.
    Args:
        text (str): The input text to be split into chunks.
        chunk_size (int, optional): The maximum size of each chunk. Defaults to 512.
        chunk_overlap (int, optional): The number of overlapping characters between consecutive chunks. Defaults to 100.
    Returns:
        List[str]: A list of text chunks generated from the input text.
    """

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""],
    )
    chunks = text_splitter.split_text(text)
    return chunks


def make_chunks_directories(
    nom: str, mission: Optional[str], types: Optional[str], adresses: Optional[list]
) -> str:
    """
    Generates a concatenated string from provided entity information and a list of addresses for embedding purposes.

    Args:
        nom (str): The name of the entity.
        mission (str): The mission or purpose of the entity.
        types (str): The type(s) or category of the entity.
        adresses (list): A list of dictionaries, each representing an address with possible keys:
            - 'complement1' (str, optional): Additional address information.
            - 'complement2' (str, optional): Additional address information.
            - 'numero_voie' (str, optional): Street number.
            - 'code_postal' (str, optional): Postal code.
            - 'nom_commune' (str, optional): City or commune name.
            - 'pays' (str, optional): Country name.

    Returns:
        str: A single string containing the concatenated and formatted information, suitable for embedding or search optimization.
    """
    adresses_to_concatenate = []
    for adresse in adresses:
        adresses_to_concatenate.append(
            f" {adresse.get('complement1', '')} {adresse.get('complement2', '')} {adresse.get('numero_voie', '')}, {adresse.get('code_postal', '')} {adresse.get('nom_commune', '')} {adresse.get('pays', '')}".strip()
        )

    # Concatenate all addresses in order to add them to the data to embed
    adresses_to_concatenate = " ".join(adresses_to_concatenate)

    # Text to embed in order to makes the search more efficient
    fields = [
        nom,
        mission if mission else "",
        types if types else "",
    ]
    text_to_embed = ". ".join([f for f in fields if f]).strip()

    return text_to_embed


def make_chunks_sheets(
    storage_dir: str, structured=False, chunk_size=1500, chunk_overlap=200
) -> None:
    """Chunkify sheets and save to a JSON file"""

    if structured:
        chunk_overlap = 20

    if storage_dir is None:
        raise ValueError(
            "You must give a datas directory to chunkify in the param 'storage_dir'."
        )

    sheets = RagSource.get_sheets(storage_dir, structured=structured)

    chunks = []
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        is_separator_regex=False,
    )
    hashes = []
    info = defaultdict(lambda: defaultdict(list))
    for data in sheets:
        texts = data["text"]
        surtitre = data["surtitre"]

        if not texts:
            continue
        if surtitre in ("Dossier", "Recherche guidée"):
            # TODO: can be used for cross-reference, see also <LienInterne>
            continue

        if structured:
            s = [x["text"] for x in texts]
        else:
            s = texts

        info[surtitre]["len"].append(len(" ".join(s).split()))
        index = 0
        for natural_chunk in texts:
            if isinstance(natural_chunk, dict):
                natural_text_chunk = natural_chunk["text"]
            else:
                natural_text_chunk = natural_chunk

            for fragment in text_splitter.split_text(natural_text_chunk):
                if not fragment:
                    logger.warning("Warning: empty fragment")
                    continue

                info[surtitre]["chunk_len"].append(len(fragment.split()))

                chunk = {
                    **data,
                    "chunk_index": index,
                    "text": fragment,  # overwrite previous value
                }
                if isinstance(natural_chunk, dict) and "context" in natural_chunk:
                    chunk["context"] = natural_chunk["context"]
                    chunk_content = "".join(chunk["context"]) + fragment
                else:
                    chunk_content = fragment

                # add an unique hash/id
                h = hashlib.blake2b(chunk_content.encode(), digest_size=8).hexdigest()
                if h in hashes:
                    # print("Warning: duplicate chunk (%s)" % chunk["sid"])
                    # print(chunk_content)
                    continue
                hashes.append(h)
                chunk["hash"] = h

                chunks.append(chunk)
                index += 1

    json_file_target = os.path.join(storage_dir, "sheets_as_chunks.json")
    with open(json_file_target, mode="w", encoding="utf-8") as f:
        json.dump(chunks, f, ensure_ascii=False, indent=4)

    info_summary = ""
    for k, v in info.items():
        v_len = v["len"]
        v_chunk_len = v["chunk_len"]
        template = "{}: {:.0f} ± {:.0f}    max:{} min:{}\n"
        info_summary += f"### {k}\n"
        info_summary += f"total doc: {len(v_len)}\n"
        info_summary += template.format(
            "mean length", np.mean(v_len), np.std(v_len), np.max(v_len), np.min(v_len)
        )
        info_summary += f"total chunk: {len(v_chunk_len)}\n"
        info_summary += template.format(
            "mean chunks length",
            np.mean(v_chunk_len),
            np.std(v_chunk_len),
            np.max(v_chunk_len),
            np.min(v_chunk_len),
        )
        info_summary += "\n"

    logger.info(f"Info summary:\n {str(json_file_target)}")

    logger.info(f"Chunks created in : {str(json_file_target)}")


def make_questions(storage_dir: str) -> None:
    target_dir = storage_dir
    if storage_dir.split("/")[-1].strip("/") != "data.gouv":
        target_dir = os.path.join(storage_dir, "data.gouv")
    questions = _parse_xml(target_dir, "questions")
    df = pd.DataFrame(questions)
    df = df.drop_duplicates(subset=["question"])
    questions = df.to_dict(orient="records")
    q_fn = os.path.join(storage_dir, "questions.json")
    with open(q_fn, mode="w", encoding="utf-8") as f:
        json.dump(questions, f, ensure_ascii=False, indent=4)
    print("Questions created in", q_fn)


In [23]:
import os
import json

# process_sheets(target_dir: str):
target_dir = "../data/unprocessed/fiches_services_public_part"
with open(os.path.join(target_dir, "sheets_as_chunks_structured.json")) as f:
    documents = json.load(f)
    corpus_name = target_dir.split("/")[-1]
    batch_size = 15
    corpus_handler = CorpusHandler.create_handler(corpus_name, documents)
    for batch_documents, batch_embeddings in corpus_handler.iter_docs_embeddings(
        batch_size
    ):
        bd = batch_documents
        be = batch_embeddings
        # for document, embeddings in zip(batch_documents, batch_embeddings):
        #     chunk_id = document["hash"].encode("utf8").hex()
        #     sid = document["sid"]
        #     chunk_index = document["chunk_index"]
        #     title = document["title"]
        #     surtitre = document["surtitre"]
        #     source = document["source"]
        #     introduction = document["introduction"]
        #     date = document["date"]
        #     url = document["url"]
        #     context = document["context"] if "context" in document else ""
        #     chunk_text = document["text"]

        #     new_data = (
        #         chunk_id,
        #         sid,
        #         chunk_index,
        #         title,
        #         surtitre,
        #         source,
        #         introduction,
        #         date,
        #         url,
        #         context,
        #         chunk_text,
        #         embeddings
        #     )
        break

Processing corpus fiches_services_public_part with embeddings...:   0%|          | 0/2353 [00:00<?, ?it/s]


In [24]:
new_data

NameError: name 'new_data' is not defined

In [25]:
len(bd)

15

In [26]:
len(be)

15

In [27]:
bd

[{'url': 'https://www.service-public.fr/particuliers/vosdroits/F1',
  'audience': 'Particuliers',
  'theme': 'Argent - Impôts - Consommation, Argent - Impôts - Consommation, Argent - Impôts - Consommation, Argent - Impôts - Consommation',
  'surtitre': 'Fiche pratique',
  'subject': 'Argent - Impôts - Consommation',
  'title': "Impôt sur le revenu - Versement de cotisations syndicales (crédit d'impôt)",
  'description': "Le versement de cotisations syndicales permet de bénéficier d'un crédit d'impôt. Son montant dépend des sommes versées",
  'introduction': "Le versement de cotisations syndicales vous permet de bénéficier d'un crédit d'impôt. Son montant dépend des sommes versées. Nous vous indiquons les informations à connaître.",
  'sid': 'F1',
  'source': 'service-public',
  'related_questions': [{'question': "Déduction, réduction d'impôt, crédit d'impôt : quelles différences ?",
    'sid': 'F823',
    'url': 'https://www.service-public.fr/particuliers/vosdroits/F823'}],
  'web_serv

In [74]:
be[0]

[-0.03062629,
 -0.017049594,
 -0.027243363,
 -0.01340575,
 -0.014404433,
 -0.045345623,
 -0.01173228,
 -0.025623875,
 0.04714505,
 0.008038951,
 0.030896204,
 0.013765635,
 -0.03301953,
 0.026379636,
 -0.002422482,
 -0.043546192,
 -0.033775292,
 -0.059057273,
 0.010643626,
 0.02141321,
 0.0083988365,
 -0.056178186,
 -0.052147467,
 0.05333509,
 0.031454027,
 -0.00021368224,
 0.03390125,
 0.011192451,
 0.044949748,
 -0.0049349354,
 -0.028844854,
 0.011282423,
 0.0697099,
 0.0150882155,
 0.003232225,
 0.013612684,
 -0.034836955,
 -0.040955015,
 -0.023860434,
 0.01211016,
 0.0059876014,
 0.040415186,
 0.01999166,
 -0.05819355,
 0.00014775003,
 0.008025455,
 -0.021215273,
 -0.014233487,
 0.002314516,
 -0.016014922,
 -0.0027868664,
 -0.029816546,
 0.06870221,
 -0.03296555,
 -0.01705859,
 0.05175159,
 0.0073101823,
 -0.039155584,
 -0.073416725,
 0.014188501,
 -0.013225807,
 0.016527759,
 -0.014089532,
 -0.023626508,
 0.0400553,
 -0.012703972,
 -2.5163896e-05,
 0.018858021,
 0.017400483,
 -0.0

In [None]:
(
    hash,
    sid,
    chunk_index,
    title,
    surtitre,
    source,
    introduction,
    date,
    url,
    context,
    text,
    embedding,
)