In [None]:
def is_module_installed(module_name):
    """
    Vérifie si un module Python est déjà installé.
    Retourne True s'il est installé, False sinon.
    """
    return importlib.util.find_spec(module_name) is not None

In [None]:
def download_things():
    global nlp_pipeline

    if '/content/drive/' in folder_path:
        # Définition des bibliothèques à vérifier
        libraries = {
            # Bibliothèque  : (nom_module_import, commande_pip)
            'seaborn': ('seaborn', 'pip install -q seaborn'),
            'sklearn': ('sklearn', 'pip install -U -q scikit-learn'),
            'scipy': ('scipy', 'pip install -U -q scipy'),
            'spacy': ('spacy', 'pip install -q spacy'),
            'pyate': ('pyate', 'pip install -q pyate'),
            'bs4': ('bs4', 'pip install -q beautifulsoup4'),
            'unidecode': ('unidecode', 'pip install -q unidecode'),
            'charset_normalizer': ('charset_normalizer', 'pip install -q charset-normalizer'),
            'datasketch': ('datasketch', 'pip install -q datasketch'),
            'tslearn': ('tslearn', 'pip install -q tslearn'),
            'ortools': ('ortools', 'pip install -q ortools')  # <-- Ajout de OR-Tools
        }
    else:
        # Définition des bibliothèques à vérifier
        libraries = {
            'gensim': ('gensim', 'conda install gensim -y'),
            'torch': ('torch', 'conda install pytorch torchvision torchaudio -c pytorch-nightly -c conda-forge -y'),
            'transformers': ('transformers', 'conda install -c conda-forge transformers -y'),
            'seaborn': ('seaborn', 'conda install -y seaborn'),
            'sklearn': ('sklearn', 'conda install -y scikit-learn'),
            'scipy': ('scipy', 'conda install -y scipy'),
            'spacy': ('spacy', 'conda install -y spacy'),
            'bs4': ('bs4', 'conda install -y beautifulsoup4'),
            'unidecode': ('unidecode', 'conda install -y unidecode'),
            'charset_normalizer': ('charset_normalizer', 'conda install -y -c conda-forge charset-normalizer'),
            'datasketch': ('datasketch', 'conda install -y -c conda-forge datasketch'),
            'tslearn': ('tslearn', 'conda install -y -c conda-forge tslearn'),
            'ortools': ('ortools', 'conda install -y -c conda-forge ortools')  # <-- Ajout de OR-Tools
        }


    # Installation conditionnelle et import pour chaque bibliothèque
    for lib_key, (module_name, install_cmd) in libraries.items():
        print(f"Vérification de {lib_key}…")
        if not is_module_installed(module_name):
            print(f"{module_name} n'est pas installé. Installation en cours… {install_cmd}")
            subprocess.run(install_cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        # Import effectif après installation (ou si déjà installé)
        importlib.import_module(module_name)
        print(f"✔ {lib_key} est prête.")

    # Installation du modèle spaCy selon la langue
    if language == 'fr':
        !python -m spacy download fr_core_news_sm --quiet
    elif language == 'en':
        !python -m spacy download en_core_web_sm --quiet
    elif language == 'es':
        !python -m spacy download es_core_news_sm --quiet
    elif language == 'de':
        !python -m spacy download de_core_news_sm --quiet
    elif language == 'ca':
        !python -m spacy download ca_core_news_sm --quiet
    elif language == 'zh':
        !python -m spacy download zh_core_web_sm --quiet
    elif language == 'da':
        !python -m spacy download da_core_news_sm --quiet
    elif language == 'ja':
        !python -m spacy download ja_core_news_sm --quiet
    elif language == 'sl':
        !python -m spacy download sl_core_news_sm --quiet
    elif language == 'uk':
        !python -m spacy download uk_core_news_sm --quiet


    try:
        if language == 'fr':
            nlp_pipeline = spacy.load('fr_core_news_sm', disable=['ner'])
        elif language == 'en':
            nlp_pipeline = spacy.load('en_core_web_sm', disable=['ner'])
        elif language == 'es':
            nlp_pipeline = spacy.load('es_core_news_sm', disable=['ner'])
        elif language == 'de':
            nlp_pipeline = spacy.load('de_core_news_sm', disable=['ner'])
        elif language == 'ca':
            nlp_pipeline = spacy.load('ca_core_news_sm', disable=['ner'])
        elif language == 'zh':
            nlp_pipeline = spacy.load('zh_core_web_sm', disable=['ner'])
        elif language == 'da':
            nlp_pipeline = spacy.load('da_core_news_sm', disable=['ner'])
        elif language == 'ja':
            nlp_pipeline = spacy.load('ja_core_news_sm', disable=['ner'])
        elif language == 'sl':
            nlp_pipeline = spacy.load('sl_core_news_sm', disable=['ner'])
        elif language == 'uk':
            nlp_pipeline = spacy.load('uk_core_web_sm', disable=['ner'])
        else:
            raise ValueError(f"Modèle non supporté pour la langue {language}")
    except Exception as e:
        raise

    # Optionnel : évite les avertissements de multiprocessing pour spaCy
    os.environ["TOKENIZERS_PARALLELISM"] = "true"

In [None]:
# =====================
# Standard Libraries
# =====================
import contextlib
import copy
import csv
import gc
import html
import importlib
import io
import itertools
import json
import locale
import logging
import math
import multiprocessing
from multiprocessing import cpu_count
import os
import pickle
import random
import re
import string
import subprocess
import time

from collections import Counter, defaultdict
from datetime import datetime, timedelta
from functools import partial
from math import sqrt
from urllib import request
import spacy

# Cette fonction semble être un appel interne (pas un import),
# on la laisse à la suite des imports standards si c’est nécessaire
download_things()

# =====================
# Data Handling & Analysis
# =====================
import numpy as np
import pandas as pd
import psutil
import requests
import statsmodels.api as sm
from ortools.linear_solver import pywraplp

from charset_normalizer import from_path
from dateutil.parser import parse
from gensim.corpora import Dictionary
from gensim.models import CoherenceModel
from scipy import stats
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.ndimage import gaussian_filter
from scipy.spatial import distance
from scipy.spatial.distance import cosine, pdist
from statsmodels.stats.outliers_influence import variance_inflation_factor


# =====================
# NLP & Text Processing
# =====================
import torch
import unidecode
from bs4 import BeautifulSoup
from pyate import cvalues
from pyate.term_extraction_pipeline import TermExtractionPipeline
from spacy.language import Language
from spacy.tokens import Doc


# =====================
# Machine Learning
# =====================
from sklearn.decomposition import NMF, PCA
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.feature_extraction.text import (
    CountVectorizer,
    TfidfTransformer,
    TfidfVectorizer
)
from sklearn.inspection import permutation_importance
from sklearn.metrics import roc_auc_score
from sklearn.metrics.pairwise import (
    cosine_distances,
    cosine_similarity,
    manhattan_distances
)
from sklearn.model_selection import (
    StratifiedKFold,
    cross_val_score
)
from sklearn.preprocessing import label_binarize


# =====================
# Transformers & NLP Models
# =====================
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    CamembertTokenizer,
    pipeline
)


# =====================
# Visualisation
# =====================
import matplotlib.colors as mcolors
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
import seaborn as sns
from matplotlib.cm import ScalarMappable
from matplotlib.colors import LinearSegmentedColormap, Normalize
from matplotlib.gridspec import GridSpec


# =====================
# Divers
# =====================
if '/content/drive/' in folder_path:
    from tqdm.notebook import tqdm, trange
else:
    from tqdm import tqdm, trange

from datasketch import MinHash, MinHashLSH
from joblib import Parallel, delayed
from tslearn.metrics import dtw_path_from_metric
from dateutil import parser

In [None]:
def plot_clustered_heatmap(
    pivot_df,
    num_topic=20,
    scale_axis=1,
    safe_pad_px=5  # marge de sécurité supplémentaire (en pixels) de chaque côté
):

    # -----------------------------
    # 1) Normalisation min–max
    # -----------------------------
    df_scaled = pivot_df.apply(
        lambda x: (x - x.min()) / (x.max() - x.min()),
        axis=scale_axis
    )

    # Nettoyage des noms de colonnes (optionnel)
    df_scaled.columns = (
        df_scaled.columns
        .str.split(':')
        .str[0]
        .str.strip()
    )

    if scale_axis == 1:
        df_scaled = df_scaled.T

    # 2) Ordre des lignes et des colonnes
    row_order = (
        df_scaled
        .sum(axis=1)
        .sort_values()
        .index
    )
    col_order = (
        df_scaled
        .sum(axis=0)
        .sort_values()
        .index
    )
    df_scaled = df_scaled.loc[row_order, col_order]

    # ======================================================
    # PASSE 1 : figure temporaire pour mesurer
    # ======================================================
    temp_height_inch = 6
    temp_fig, temp_ax = plt.subplots(
        figsize=(FIGURE_WIDTH_INCH, temp_height_inch),
        dpi=DPI
    )

    sns.heatmap(
        df_scaled,
        ax=temp_ax,
        cbar=False,
        cmap="YlGnBu",
        square=False,
        linewidths=1,
        linecolor='white'
    )
    temp_ax.tick_params(
        axis='x',
        pad=2,
        length=0,
        labeltop=True,   # labels en haut
        labelbottom=True,
        top=True,
        bottom=True,
        labelrotation=90
    )
    temp_ax.tick_params(axis='y', pad=2, length=0)

    temp_fig.canvas.draw()
    renderer = temp_fig.canvas.get_renderer()

    # 1) Mesure bounding box Xlabels
    xlabels = temp_ax.get_xticklabels()
    xlabels_bboxes = [lbl.get_window_extent(renderer=renderer) for lbl in xlabels]
    max_label_height_px = max(bbox.height for bbox in xlabels_bboxes) if xlabels_bboxes else 0

    # 2) Mesure bounding box Ylabels
    ylabels = temp_ax.get_yticklabels()
    ylabels_bboxes = [lbl.get_window_extent(renderer=renderer) for lbl in ylabels]
    max_label_width_px = max(bbox.width for bbox in ylabels_bboxes) if ylabels_bboxes else 0

    plt.close(temp_fig)

    # ======================================================
    # CALCUL DES DIMENSIONS FINALES
    # ======================================================
    nb_lignes = len(df_scaled)

    # Hauteur stricte occupée par les lignes de la heatmap
    heatmap_height_px = nb_lignes * PX_PER_TOPIC

    # Hauteur totale : zone heatmap + marge X en haut et en bas + safe_pad
    # Ici, on ajoute safe_pad_px en haut ET safe_pad_px en bas
    # => total = heatmap + 2*(labelsX + safe_pad)
    total_height_px = heatmap_height_px + 2 * (max_label_height_px + safe_pad_px)
    figure_height_inch = total_height_px / DPI

    # Largeur : on garde FIGURE_WIDTH_INCH, mais on doit
    # réserver de la place à gauche pour Y + safe_pad (et éventuellement à droite).
    total_width_px = FIGURE_WIDTH_INCH * DPI

    # Marge gauche en pixels = (largeurY + safe_pad)
    margin_left_px = max_label_width_px + safe_pad_px
    # Marge droite en pixels : par exemple on en met 0 ou un safe_pad_px
    margin_right_px = safe_pad_px  # si tu veux vraiment 0, tu mets 0

    # Conversion en fraction de la largeur totale
    left_margin_fraction  = margin_left_px / float(total_width_px)
    right_margin_fraction = 1.0 - (margin_right_px / float(total_width_px))

    # Marge bas = (hauteurX + safe_pad)
    # Marge haut = idem
    margin_bottom_px = max_label_height_px + safe_pad_px
    margin_top_px    = max_label_height_px + safe_pad_px

    bottom_margin_fraction = margin_bottom_px / float(total_height_px)
    top_margin_fraction    = 1.0 - (margin_top_px / float(total_height_px))

    # ======================================================
    # PASSE 2 : figure finale
    # ======================================================
    fig = plt.figure(
        figsize=(FIGURE_WIDTH_INCH, figure_height_inch),
        dpi=DPI
    )
    ax = sns.heatmap(
        df_scaled,
        cbar=False,
        square=False,
        cmap="YlGnBu",
        linewidths=1,
        linecolor='white'
    )
    ax.set_aspect("auto")

    ax.tick_params(
        axis='x',
        pad=2,
        length=0,
        labeltop=True,
        labelbottom=True,
        top=True,
        bottom=True,
        labelrotation=90
    )
    ax.tick_params(axis='y', pad=2, length=0)

    # Ajustement manuel des marges
    fig.subplots_adjust(
        left=left_margin_fraction,
        right=right_margin_fraction,
        bottom=bottom_margin_fraction,
        top=top_margin_fraction
    )

    # Nettoyage
    ax.set_xlabel("")
    ax.set_ylabel("")
    plt.title("")

    # ======================================================
    # Sauvegarde
    # ======================================================
    if scale_axis == 1:
        output_filename = (
            f"{results_path}{base_name}_RANDOM_FORESTS_RESIDUALS_ANALYSIS/"
            f"{base_name}_random_forests_residual_analysis_topic_normalized_heatmap_{num_topic}tc_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_{web_paper_differentiation}wp_"
            f"_correlationaveragech.png"
        )
    else:
        output_filename = (
            f"{results_path}{base_name}_RANDOM_FORESTS_RESIDUALS_ANALYSIS/"
            f"{base_name}_random_forests_residual_analysis_group_normalized_heatmap_{num_topic}tc_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_{web_paper_differentiation}wp_"
            f"_correlationaveragech.png"
        )

    plt.savefig(
        output_filename,
        dpi=DPI,
        pad_inches=0,      # pas d'espace supplémentaire autour
        bbox_inches=None
    )

    plt.close(fig)

In [None]:
def random_forests_residuals_analysis(group_column=None):
    if group_column == None:
        print('group_column est None')
        return

    if not os.path.exists(f"{results_path}{base_name}_RANDOM_FORESTS_RESIDUALS_ANALYSIS/"):
        os.makedirs(f"{results_path}{base_name}_RANDOM_FORESTS_RESIDUALS_ANALYSIS/")


    for num_topic in all_nmf_W:
        rows = []
        W_matrix = all_nmf_W[20]

        # On itère sur les "lignes" de la matrice, i.e. chaque article
        for doc_idx, topic_scores in enumerate(W_matrix):
            # === Récupération du nom du journal selon la source ===
            # doc_idx (au lieu de num_article)
            if source_type == 'europresse':
                header = all_soups[doc_idx].header
                journal_text = extract_information(header, '.rdp__DocPublicationName')
                journal_text = normalize_journal(journal_text)

            elif source_type == 'istex':
                journal_text = columns_dict['journal'][doc_idx]

            elif source_type == 'csv':
                if group_column not in columns_dict:
                    print(f"La colonne '{group_column}' n'a pas été trouvée dans le fichier CSV.")
                    return
                journal_text = columns_dict[group_column][doc_idx]

            # Construire un dictionnaire pour cette ligne
            row_dict = {
                'doc_idx': doc_idx,
                'Journal': journal_text
            }
            # Ajouter les scores de tous les topics
            # topic_scores est un array de taille k
            for t_idx, score in enumerate(topic_scores):
                row_dict[f'Topic{t_idx}'] = score

            rows.append(row_dict)


        from collections import Counter

        # 1) Compter la fréquence de chaque journal dans rows
        journal_counts = Counter(row['Journal'] for row in rows)

        # 2) Filtrer : ne conserver que les rows dont le journal apparaît au moins threshold fois
        rows = [row for row in rows if journal_counts[row['Journal']] >= threshold]

        unique_journals = {row['Journal'] for row in rows}

        if len(unique_journals) == 1:
            print("Il n'y a qu'un seul groupe")
            return

        df_wide = pd.DataFrame(rows)

        k = W_matrix.shape[1]  # nombre de topics

        residuals_df = df_wide.copy()  # On duplique pour y stocker les résidus

        for j in tqdm(range(k), desc='RÉGRESSIONS : ANALYSE DES RÉSIDUS'):
            # Nom de la colonne cible
            col_target = f'Topic{j}'

            # Features = tous les topics sauf le j-ème
            feature_cols = [f'Topic{x}' for x in range(k) if x != j]

            X = df_wide[feature_cols]
            y = df_wide[col_target]

            # Entraîner un modèle de régression
            rf = RandomForestRegressor(n_estimators=20, random_state=42)
            rf.fit(X, y)

            # Prédire
            y_pred = rf.predict(X)

            # Calcul du résidu "brut"
            resid = y - y_pred

            # Stocker le résidu brut (optionnel)
            residuals_df[f'Resid_Topic{j}'] = resid

            # Standardisation (z-scoring) du résidu
            mu = resid.mean()
            sigma = resid.std()  # ou np.std(resid, ddof=1) pour l'échantillon
            if sigma == 0:
                # Éventuellement, gérer le cas où le résidu est toujours identique (très rare)
                resid_z = resid  # ou resid_z = 0
            else:
                resid_z = (resid - mu) / sigma

            # Stocker le résidu normalisé
            residuals_df[f'ResidZ_Topic{j}'] = resid_z

        # 1) Préparer un dictionnaire de renommage
        rename_map = {}
        for j in range(k):
            old_col = f"ResidZ_Topic{j}"
            # Récupérer le vrai nom du topic
            # Exemple : "Politique", "Économie", etc.
            real_name = topic_labels_by_config[num_topic][j]
            new_col = f"ResidZ_{real_name}"
            rename_map[old_col] = new_col

        # 2) Renommer les colonnes dans un nouveau DataFrame
        residuals_df_renamed = residuals_df.rename(columns=rename_map)

        # 3) Faire le melt : on sélectionne les nouvelles colonnes 'Resid_<topic_name>'
        value_vars_list = list(rename_map.values())  # ex: ['Resid_Politique', 'Resid_Sport', ...]

        table_resid = residuals_df_renamed.melt(
            id_vars=['Journal'],         # on garde la colonne 'Journal' telle quelle
            value_vars=value_vars_list,  # on fait fondre les colonnes résidu renommées
            var_name='Topic',            # le nom de la colonne contenant l'ancien nom de variable
            value_name='Resid'           # la valeur numérique du résidu
        )

        # Maintenant, 'Topic' sera de la forme 'Resid_<NomDuTopic>'
        # On peut, si on veut, enlever le préfixe 'Resid_' pour un affichage plus clair :
        table_resid['Topic'] = table_resid['Topic'].str.replace('ResidZ_', '', regex=False)

        # 4) Calculer la moyenne des résidus par (Journal, Topic), puis faire un pivot
        pivot = table_resid.groupby(['Journal','Topic'])['Resid'].mean().unstack(fill_value=0)

        # ===================================================================
        # Exemple d’utilisation pour générer deux heatmaps :
        #   - l’une avec normalisation min-max par ligne
        #   - l’autre avec normalisation min-max par colonne
        # ===================================================================

        for num_topic in all_nmf_W:
            # Heatmap avec normalisation par colonne
            plot_clustered_heatmap(
                pivot_df=pivot,
                scale_axis=0,
                num_topic=num_topic
            )

            # Heatmap avec normalisation par ligne
            plot_clustered_heatmap(
                pivot_df=pivot,
                scale_axis=1,
                num_topic=num_topic
            )

In [None]:
def go_tfidf_vectorization_sentences(
    gclasses,
    count_vectorizer,       # CountVectorizer déjà "fit"
    tfidf_transformer,      # TfidfTransformer (ou pipeline) déjà "fit"
    all_sentence_pos        # liste de données (phrases + infos POS, etc.) à tokeniser
):
    # Sans `tokenize_and_stem`
    tokenized_documents = []
    for atb in all_sentence_pos:
        tokenized_document = [t[0] for t in atb if t[0] in unigrams]
        tokenized_documents.append(tokenized_document)

    # 4) Filtrer les stop words via spaCy
    spacy_stopwords = nlp_pipeline.Defaults.stop_words
    # On itère avec tqdm sur tokenized_documents
    filtered_docs = []
    for doc in tokenized_documents:
        filtered_doc = [token for token in doc if token.lower() not in spacy_stopwords]
        filtered_docs.append(filtered_doc)
    tokenized_documents = filtered_docs

    # Exemple : transformation batch pour profiter de tqdm (optionnel)
    word_count = count_vectorizer.transform(tokenized_documents)

    # Transformation TF-IDF
    X_sentences = tfidf_transformer.transform(word_count)

    # 7) Retourner la matrice TF-IDF et la version tokenisée
    return X_sentences

In [None]:
def write_sentences_results(topic_num, final_top_ngrams_per_topic):
    with open(
        f"{results_path}{base_name}_EXPLORE_TOPICS/"
        f"{base_name}_topic_modeling_sentences_{topic_num}tc_"
        f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
        f"{go_remove_duplicates}dup.csv",
        "w",
        encoding='utf-8'
    ) as file_object:
        writer = csv.writer(file_object)

        # Écrire les en-têtes si nécessaire
        headers = []
        for i in range(len(final_top_ngrams_per_topic)):
            headers.extend([f'{i}_sentences', f'{i}_scores'])
        writer.writerow(headers)

        # Écrire les données
        for i in range(20):
            row = []
            for sub_array in final_top_ngrams_per_topic:
                if i < len(sub_array):
                    row.extend(sub_array[i])
                else:
                    row.extend(('', ''))
            writer.writerow(row)

In [None]:
def extract_relevant_sentences(nmf_models):
    for num_topic in nmf_models:
        X_sentences = go_tfidf_vectorization_sentences(grammatical_classes, tfidf_vectorizer, tfidf_transformer, all_sentence_pos)

        score_phrases = nmf_models[num_topic].transform(X_sentences)

        final_top_ngrams_per_topic = []

        top_n = 20
        candidate_size = 100  # on récupère 30 phrases candidates au lieu de 5
        similarity_threshold = 0.8  # seuil de similarité au-dessus duquel on considère que c’est “trop proche”

        n_topics = nmf_models[num_topic].n_components

        for topic_idx in range(n_topics):
            topic_scores = score_phrases[:, topic_idx]
            # Tri décroissant
            top_indices_candidate = np.argsort(topic_scores)[::-1][:candidate_size]

            # On extrait les vecteurs TF-IDF correspondants
            # (On suppose X_sentences est la matrice TF-IDF de toutes les phrases)
            candidate_vectors = X_sentences[top_indices_candidate]
            candidate_phrases = [sentences_norms[i] for i in top_indices_candidate]

            # Filtrage de similarité
            selected_indices = []
            for i, vec_i in enumerate(candidate_vectors):
                # On calcule la similarité de cette phrase avec celles déjà retenues
                # (on compare le vecteur i avec les vecteurs des phrases déjà sélectionnées)
                if not selected_indices:
                    selected_indices.append(i)
                    continue

                # Comparaison avec chaque phrase déjà incluse
                is_similar_to_selected = False
                for j in selected_indices:
                    # Similarité cosinus entre le vecteur i et le vecteur j
                    sim_ij = cosine_similarity(vec_i, candidate_vectors[j])
                    # sim_ij est une matrice 1x1, il faut extraire la valeur
                    if sim_ij[0, 0] >= similarity_threshold:
                        is_similar_to_selected = True
                        break

                if not is_similar_to_selected:
                    selected_indices.append(i)

                # Si on a assez de phrases “différentes”, on arrête
                if len(selected_indices) >= top_n:
                    break

            sub_array = []
        #   print(f"\n=== Topic {topic_idx} ===")
            for idx_in_candidates in selected_indices[:top_n]:
            #print("•", candidate_phrases[idx_in_candidates])

                phrase_brute = candidate_phrases[idx_in_candidates]
                score_value = topic_scores[top_indices_candidate[idx_in_candidates]]

                sub_array.append((phrase_brute, round(score_value, 4)))  # arrondi ou non

            final_top_ngrams_per_topic.append(sub_array)


        write_sentences_results(num_topic, final_top_ngrams_per_topic)

In [None]:
def detecter_date(chaine, jour_en_premier=True):
    try:
        return parse(chaine, dayfirst=jour_en_premier)
    except ValueError:
        return None

In [None]:
def formater_date(date):
    return date.strftime('%d/%m/%Y')

In [None]:
def formater_liste_dates(liste_dates, jour_en_premier=True):
    return [formater_date(detecter_date(date_str, jour_en_premier)) for date_str in liste_dates if detecter_date(date_str, jour_en_premier)]

In [None]:
def truncate_texts(texts, max_length=30):
    # 1. Vérifier si tous les textes contiennent ":"
    if all(':' in text for text in texts):
        # Si oui, on ne garde que la partie avant le premier ":" (trim)
        return [text.split(':', 1)[0].strip() for text in texts]

    # 2. Sinon, on applique la logique de troncature initiale
    truncated_texts = []
    for text in texts:
        if len(text) <= max_length:
            truncated_texts.append(text)
            continue

        # Trouve le dernier espace avant max_length
        last_space_index = text.rfind(' ', 0, max_length)
        if last_space_index == -1:
            # S'il n'y a pas d'espace, on coupe jusqu'à max_length et on ajoute "..."
            truncated_texts.append(text[:max_length] + "...")
        else:
            # Sinon, on coupe jusqu'au dernier espace et on ajoute "..."
            truncated_texts.append(text[:last_space_index] + "...")

    return truncated_texts

In [None]:
def solve_label_placement_matplotlib_2passes(
    ax,
    positions_and_labels,
    x_min, x_max, y_min, y_max,
    offsets_x, offsets_y,
    possible_ha=('left','center','right'),
    possible_va=('bottom','center','top')
):
    """
    -------------------------------------------------------------------------
    SOLVEUR EN DEUX PASSES (méthode "lexicographique") en MILP :
      1) Minimiser la somme des distances (distance * 100000).
      2) À distance minimale égale, maximiser le nombre de labels
         en (ha='center', va='center').

    positions_and_labels : liste de ((x_i, y_i), label_text).
    x_min, x_max, y_min, y_max : cadre à ne pas dépasser.
    offsets_x, offsets_y : listes des offsets qu'on souhaite tester.

    Retourne : [(i, X, Y, ha, va, bbox, distance_reelle), ...]
       - i = indice du label
       - (X, Y) = position choisie
       - ha, va = alignements
       - bbox = (xmin, xmax, ymin, ymax)
       - distance_reelle = distance euclidienne (en "data coords") entre
         (x_i, y_i) et (X, Y)
    -------------------------------------------------------------------------
    """

    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    #  1) GÉNÉRATION DE TOUTES LES POSITIONS CANDIDATES
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    candidate_positions = []
    for i, ((x_i, y_i), label_text) in enumerate(positions_and_labels):
        cands_for_i = []
        for dx in offsets_x:
            for dy in offsets_y:
                for ha in possible_ha:
                    for va in possible_va:
                        # Coordonnées où on place le label
                        X = x_i + dx
                        Y = y_i + dy

                        # On mesure la bounding box (en coords data)
                        bbx_min, bbx_max, bby_min, bby_max = bounding_box_with_patch(
                            ax, label_text, X, Y, ha=ha, va=va,
                            bbox_style=dict(facecolor='white', edgecolor='black', alpha=0.1, boxstyle='square,pad=0.0')
                        )

                        # On vérifie qu'elle reste dans le cadre
                        EPS = 1e-9
                        if (
                            bbx_min >= x_min - EPS and
                            bbx_max <= x_max + EPS and
                            bby_min >= y_min - EPS and
                            bby_max <= y_max + EPS
                        ):
                            # Calcul de la distance * 100000
                            dist = math.dist((x_i, y_i), (X, Y))
                            dist_int = int(round(dist * 100000))

                            # Stockage
                            bbox_tup = (bbx_min, bbx_max, bby_min, bby_max)
                            cands_for_i.append((X, Y, ha, va, bbox_tup, dist_int))
        candidate_positions.append(cands_for_i)

    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    #  2) PREMIÈRE PASSE : MINIMISER LA SOMME DES DISTANCES (MILP)
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    # Création du solveur MILP (CBC, par exemple)
    # - vous pouvez aussi essayer "SCIP", "BOP", etc. si installés
    solver_1 = pywraplp.Solver.CreateSolver('CBC')
    # Optionnel : on peut paramétrer, ex. solver_1.SetNumThreads(multiprocessing.cpu_count())

    # Variables booléennes z1_{i,p}
    z1_vars = {}
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            z1_vars[(i, p)] = solver_1.BoolVar(f"z1_{i}_{p}")

    # Contrainte : un seul candidat p par label i
    for i, cands_i in enumerate(candidate_positions):
        solver_1.Add(
            sum(z1_vars[(i, p)] for p in range(len(cands_i))) == 1
        )

    # Contrainte de non-chevauchement
    # On suppose que 'overlap(bbox1, bbox2)' renvoie True si overlap
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            bbox_p = cand_p[4]
            for j in range(i+1, len(candidate_positions)):
                for q, cand_q in enumerate(candidate_positions[j]):
                    bbox_q = cand_q[4]
                    if overlap(bbox_p, bbox_q):
                        solver_1.Add(z1_vars[(i, p)] + z1_vars[(j, q)] <= 1)

    # Création de l'objectif : somme des distances
    distance_expr_1 = solver_1.Sum(
        cand_p[5] * z1_vars[(i,p)]
        for i, cands_i in enumerate(candidate_positions)
        for p, cand_p in enumerate(cands_i)
    )

    # Minimiser la somme des distances
    solver_1.Minimize(distance_expr_1)

    # Résolution de la première passe
    status_1 = solver_1.Solve()
    if status_1 != pywraplp.Solver.OPTIMAL and status_1 != pywraplp.Solver.FEASIBLE:
        print("[2passes] Aucune solution lors de la première passe.")
        return []

    # Récupération de la valeur de l'objectif (distance totale)
    dist_min_float = solver_1.Objective().Value()
    dist_min_int = int(round(dist_min_float))

    # Calcul "à la main" de sum_of_dist_int depuis la solution
    sum_of_dist_int = 0
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            if z1_vars[(i,p)].solution_value() > 0.5:
                sum_of_dist_int += cand_p[5]

    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    #  3) DEUXIÈME PASSE : distance = dist_min, maximiser #center
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    # Nouveau solveur
    solver_2 = pywraplp.Solver.CreateSolver('CBC')
    # solver_2.SetNumThreads(multiprocessing.cpu_count())  # si vous voulez multi-threads

    # Variables booléennes z2_{i,p}
    z2_vars = {}
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            z2_vars[(i, p)] = solver_2.BoolVar(f"z2_{i}_{p}")

    # Contrainte : un seul candidat p par label
    for i, cands_i in enumerate(candidate_positions):
        solver_2.Add(
            sum(z2_vars[(i, p)] for p in range(len(cands_i))) == 1
        )

    # Même contrainte de non-chevauchement
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            bbox_p = cand_p[4]
            for j in range(i+1, len(candidate_positions)):
                for q, cand_q in enumerate(candidate_positions[j]):
                    bbox_q = cand_q[4]
                    if overlap(bbox_p, bbox_q):
                        solver_2.Add(z2_vars[(i,p)] + z2_vars[(j,q)] <= 1)

    # Expression de la distance totale en 2ᵉ passe
    distance_expr_2 = solver_2.Sum(
        cand_p[5] * z2_vars[(i,p)]
        for i, cands_i in enumerate(candidate_positions)
        for p, cand_p in enumerate(cands_i)
    )

    # On fixe la distance = dist_min_int
    #   Selon les besoins, on peut autoriser un +/- 1e-9 si nécessaire.
    solver_2.Add(distance_expr_2 == dist_min_int)

    # On veut maximiser le nombre de "center"
    #   On compte 1 si ha == 'center', +1 si va == 'center'
    #   => total = sum over i,p of center_score(i,p)*z2_{i,p}
    center_expr_terms = []
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            (_, _, ha, va, _, _) = cand_p
            center_score = 0
            if ha == 'center':
                center_score += 1
            if va == 'center':
                center_score += 1

            if center_score > 0:
                # contribution = center_score * z2_{i,p}
                center_expr_terms.append(center_score * z2_vars[(i,p)])

    center_expr = solver_2.Sum(center_expr_terms)

    # Objectif 2 : maximiser la somme des "center scores"
    solver_2.Maximize(center_expr)

    # Résolution de la 2ᵉ passe
    status_2 = solver_2.Solve()
    if status_2 != pywraplp.Solver.OPTIMAL and status_2 != pywraplp.Solver.FEASIBLE:
        print("[2passes] Aucune solution en 2e passe.")
        return []

    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    #  4) EXTRACTION FINALE DE LA SOLUTION
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    solution = []
    for i, cands_i in enumerate(candidate_positions):
        for p, cand_p in enumerate(cands_i):
            if z2_vars[(i,p)].solution_value() > 0.5:
                (X, Y, ha, va, bbox_tup, dist_int) = cand_p
                dist_reelle = dist_int / 100000.0
                solution.append((i, X, Y, ha, va, bbox_tup, dist_reelle))
                break  # on a trouvé le p sélectionné pour ce i

    return solution

In [None]:
def overlap(bbox1, bbox2):
    """
    Teste si deux bounding boxes se chevauchent strictement.
    bbox = (xmin, xmax, ymin, ymax)
    """
    return not (
        bbox1[1] < bbox2[0] or  # bbox1.xmax < bbox2.xmin
        bbox1[0] > bbox2[1] or  # bbox1.xmin > bbox2.xmax
        bbox1[3] < bbox2[2] or  # bbox1.ymax < bbox2.ymin
        bbox1[2] > bbox2[3]     # bbox1.ymin > bbox2.ymax
    )

In [None]:
def bounding_box_with_patch(ax,
                            label_text,
                            x, y,
                            ha='left', va='center',
                            bbox_style=None):
    """
    Crée *temporairement* un texte invisible,
    avec EXACTEMENT le bbox dict(...) que vous utiliserez pour l’affichage,
    puis récupère la bbox de ce patch, en coordonnées data (ax).

    Retourne (xmin, xmax, ymin, ymax).
    """

    # 1) On crée un objet Text, invisible (color='none'),
    #    MAIS avec le même bbox que l'affichage final
    t = ax.text(
        x, y, label_text,
        ha=ha, va=va,
        color='none',
        bbox=dict(facecolor='white', edgecolor='black', alpha=0.1, boxstyle='square,pad=0.0')
    )

    # 2) Forcer le dessin pour que le patch soit calculé
    ax.figure.canvas.draw()

    # 3) Récupérer la bounding box du patch (le cadre gris)
    patch = t.get_bbox_patch()
    if patch is not None:
        bbox = patch.get_window_extent()
    else:
        # fallback, au cas où (rare)
        bbox = t.get_window_extent()

    # 4) Convertir la bbox en coords "data"
    bbox_data = bbox.transformed(ax.transData.inverted())

    # 5) Supprimer le texte temporaire
    t.remove()

    # 6) Retour (xmin, xmax, ymin, ymax)
    return (bbox_data.x0, bbox_data.x1, bbox_data.y0, bbox_data.y1)

In [None]:
def plot_pca(matrix_type='W'):
    """
    all_nmf_H : dict[ int -> ndarray ]
        Dictionnaire où all_nmf_H[topic_count] est une matrice H (shape = (k, m))
        k = nombre de topics, m = taille du vocabulaire.
    all_nmf_W : iterable
        Liste (ou clés du dict) indiquant les différents topic_count disponibles.
    """

    if not os.path.exists(f"{results_path}{base_name}_L2_{matrix_type}_PCA_PLOTS/"):
        os.makedirs(f"{results_path}{base_name}_L2_{matrix_type}_PCA_PLOTS/")

    for topic_count in all_nmf_H:
        if matrix_type == 'W':
            M = all_nmf_W[topic_count].T  # Matrice (k x m)
        else:
            M = all_nmf_H[topic_count]

        # ---------------------------
        # 1) Normalisation L2 par topic (chaque ligne)
        # ---------------------------
        # On calcule la norme L2 de chaque ligne (axis=1)
        norms = np.linalg.norm(M, axis=1, keepdims=True)
        # Pour éviter la division par zéro si une ligne est totalement nulle
        norms[norms == 0] = 1e-16

        M_norm = M / norms  # Division élément par élément

        # ---------------------------
        # 2) PCA sur H normalisé
        # ---------------------------
        pca = PCA(n_components=2)
        pca_result = pca.fit_transform(M_norm)  # shape = (k, 2)


        # 2) Préparation de la figure/axe
        fig, ax = plt.subplots(
            figsize=(FIGURE_WIDTH_INCH, FIGURE_WIDTH_INCH),
            dpi=DPI
        )

        # 3) Construire la liste (positions, labels)
        labels = [f'{i}' for i in range(len(pca_result))]
        truncated_texts = truncate_texts(topic_labels_by_config[topic_count])

        # On associe chaque point PCA à un label
        positions_and_labels = [
            (tuple(coords), truncated_texts[int(lbl)])
            for coords, lbl in zip(pca_result, labels)
        ]

        # 4) Calculer le min/max pour x et y (cadre à ne pas dépasser)
        all_x = [pos_lbl[0][0] for pos_lbl in positions_and_labels]
        all_y = [pos_lbl[0][1] for pos_lbl in positions_and_labels]
        x_min, x_max = min(all_x), max(all_x)
        y_min, y_max = min(all_y), max(all_y)

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)
        ax.autoscale(False)

        # 5) Paramètres pour solve_label_placement_matplotlib
        # 1) Calcul de l'étendue (range) de l'axe
        range_x = x_max - x_min
        range_y = y_max - y_min

        # 2) Choix du nombre d'offsets
        num_offsets = 11

        # 3) Génération des pourcentages entre -5% et +5% (en 10 pas)
        #    => np.linspace(-0.05, 0.05, num_offsets)
        #    sera par exemple [-0.05, -0.0388, ..., 0.05]

        percentages_x = np.linspace(-0.05, 0.05, num_offsets)
        percentages_y = np.linspace(-0.05, 0.05, num_offsets)

        # 4) Conversion de ces pourcentages en offsets dans les coordonnées du graphique
        offsets_x = [0] #[p * range_x for p in percentages_x]
        offsets_y = [0] #[p * range_y for p in percentages_y]

        possible_ha = ['left', 'center', 'right']
        possible_va = ['bottom', 'center', 'top']

        # 6) Appel du solveur (qui va mesurer la bbox via ax)
        solution = solve_label_placement_matplotlib_2passes(
            ax=ax,
            positions_and_labels=positions_and_labels,
            x_min=x_min, x_max=x_max,
            y_min=y_min, y_max=y_max,
            offsets_x=offsets_x, offsets_y=offsets_y,
            possible_ha=possible_ha, possible_va=possible_va
        )

        # 7) Affichage de la solution
        for (i, X, Y, ha, va, bbox, cost) in solution:
            (ox, oy), text_label = positions_and_labels[i]

            # Points d'origine (optionnel si on veut les voir en plus du scatter)
            ax.plot(ox, oy, color='red', marker='o', alpha=0.5, markersize=10, markeredgewidth=0)

            # Le label positionné
            ax.text(X, Y, text_label, ha=ha, va=va,
                    bbox=dict(facecolor='white', edgecolor='black', alpha=0.1, boxstyle='square,pad=0.0'))

            # Une flèche qui relie le point d'origine au label
            ax.annotate(
                "",
                xy=(ox, oy),
                xytext=(X, Y),
                arrowprops=dict(arrowstyle="->", color='black', alpha=0.2)
            )

        explained_var_ratio = pca.explained_variance_ratio_

        manual_tick_placement_continuous(
            ax=ax,
            xmin=x_min,
            xmax=x_max,
            spacing_factor_min=1.02,
            spacing_factor_max=1.2,
            step=0.001
        )
        manual_tick_placement_continuous_Y(
            ax=ax,
            ymin=y_min,
            ymax=y_max,
            spacing_factor_min=1.02,
            spacing_factor_max=1.2,
            step=0.001
        )

        # 15) Labels des axes, etc.
        plt.xlabel(
            f'Facteur 1 - Variance expliquée={explained_var_ratio[0]*100:.2f}%',
            labelpad=35
        )
        plt.ylabel(
            f'Facteur 2 - Variance expliquée={explained_var_ratio[1]*100:.2f}%',
            labelpad=34
        )

        # Supprimer la bordure du haut et de droite
        ax.spines['top'].set_visible(False)
        ax.spines['right'].set_visible(False)

        # Ligne horizontale y=0
        plt.axhline(0, color='black', linewidth=1, alpha=0.3)
        # Ligne verticale x=0
        plt.axvline(0, color='black', linewidth=1, alpha=0.3)

        # On désactive la grille
        plt.grid(False)

        class_suffix = "_".join(grammatical_classes)

        if not os.path.exists(f"{results_path}{base_name}_L2_{matrix_type}_PCA_PLOTS/"):
            os.makedirs(f"{results_path}{base_name}_L2_{matrix_type}_PCA_PLOTS/")

        # 9) Afficher la figure
        plt.savefig(
            f"{results_path}{base_name}_L2_{matrix_type}_PCA_PLOTS/"
            f"{base_name}_{matrix_type.lower()}_pca_plot_{topic_count}tc_l2_{class_suffix}_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup.png",
            dpi=DPI,
            bbox_inches='tight',
            pad_inches=0
        )
        plt.close()

In [None]:
def is_overlapping(text, other_texts, tolerance=0.0, buffer=0.0):
    """
    Vérifie si un texte se chevauche avec d'autres, avec une tolérance très permissive.

    Args:
        text: L'objet texte à tester.
        other_texts: Liste des objets textes existants.
        tolerance: Proportion de tolérance (plus grand = plus tolérant).
        buffer: Distance minimale entre les boîtes pour ignorer un chevauchement léger.

    Returns:
        bool: True si chevauchement significatif, sinon False.
    """
    bbox = text.get_window_extent(renderer=plt.gcf().canvas.get_renderer())
    bbox_data = bbox.transformed(plt.gca().transData.inverted())  # Conversion en coordonnées data

    for other in other_texts:
        other_bbox = other.get_window_extent(renderer=plt.gcf().canvas.get_renderer())
        other_bbox_data = other_bbox.transformed(plt.gca().transData.inverted())

        # Calcul des dimensions avec "buffer" pour agrandir légèrement les boîtes existantes
        bbox_data_inflated = [
            bbox_data.xmin - buffer, bbox_data.xmax + buffer,
            bbox_data.ymin - buffer, bbox_data.ymax + buffer
        ]
        other_bbox_data_inflated = [
            other_bbox_data.xmin - buffer, other_bbox_data.xmax + buffer,
            other_bbox_data.ymin - buffer, other_bbox_data.ymax + buffer
        ]

        # Vérifier le chevauchement agrandi
        overlap_x = max(0, min(bbox_data_inflated[1], other_bbox_data_inflated[1]) -
                           max(bbox_data_inflated[0], other_bbox_data_inflated[0]))
        overlap_y = max(0, min(bbox_data_inflated[3], other_bbox_data_inflated[3]) -
                           max(bbox_data_inflated[2], other_bbox_data_inflated[2]))

        # Surface d'intersection
        overlap_area = overlap_x * overlap_y

        # Aire minimale de chevauchement tolérée
        area_threshold = tolerance * (bbox_data.width * bbox_data.height)

        # Ignorer les chevauchements inférieurs à la tolérance
        if overlap_area > area_threshold:
            return True
    return False

In [None]:
def tokenize_and_stem(args):
    atb, unigrams = args
    tokenized_sents = []
    for t in atb:
        if t[0] in unigrams:
            tokenized_sents.append(t[0])

    return tokenized_sents

In [None]:
def go_tfidf_vectorization(gclasses):
    # 1) Traiter les classes grammaticales définies globalement
    #    On ajoute tqdm pour visualiser l'avancement
    global unigrams

    for grammatical_class in tqdm(gclasses, desc="Mise à jour des unigrams"):
        unigrams = update_candidates_for_unigram(grammatical_class, unigrams)

    tokenized_documents = []
    for atb in all_tab_pos:
        tokenized_document = [t[0] for t in atb if t[0] in unigrams]
        tokenized_documents.append(tokenized_document)

    # 4) Suppression des stop words via le pipeline spaCy
    spacy_stopwords = nlp_pipeline.Defaults.stop_words
    # Si on veut voir la progression ici, on peut boucler :
    filtered_docs = []
    for doc in tqdm(tokenized_documents, desc="Filtrage des stopwords"):
        filtered_docs.append(
            [token for token in doc if token.lower() not in spacy_stopwords]
        )
    tokenized_documents = filtered_docs

    # 5) Création des vecteurs TF-IDF
    #    Pour avoir une barre de progression, on peut découper manuellement en batches.
    #    Toutefois, si la liste n'est pas trop grosse, on peut juste faire fit_transform d'un coup.

    def identity_analyzer(tokens):
        return tokens

    count_vectorizer = CountVectorizer(analyzer=identity_analyzer, lowercase=False)

    # Si vous voulez découper en batches pour CountVectorizer.fit_transform,
    # il faut recourir à un autre mécanisme (car le fit_transform standard ne propose pas de batch).
    # Par défaut, on fait donc un fit_transform "classique" :
    word_count = count_vectorizer.fit_transform(tokenized_documents)

    tfidf_transformer = TfidfTransformer(norm=None, sublinear_tf=False, smooth_idf=True)
    X = tfidf_transformer.fit_transform(word_count)

    tfidf_feature_names = count_vectorizer.get_feature_names_out()

    return count_vectorizer, X, tfidf_feature_names, tokenized_documents, tfidf_transformer

In [None]:
def write_unigrams_results(nb_words, tfidf_feature_names, nmf_H):
    tab_words_nmf = []
    for topic_idx, topic in enumerate(nmf_H):
      subtab_words_nmf = []
      for i in topic.argsort()[:-nb_words - 1:-1]:
        subtab_words_nmf.append([(tfidf_feature_names[i]), topic[i]])

      tab_words_nmf.append(subtab_words_nmf)


    new_tab_words_nmf = []
    for t in tab_words_nmf:
      sorted_t = sorted(t, key = lambda x: (-x[1]))

      new_tab_words_nmf.append(sorted_t)



    max_rows_nb = 0
    for to in new_tab_words_nmf:
      if len(to) > max_rows_nb:
        max_rows_nb = len(to)


    if not os.path.exists(f"{results_path}{base_name}_EXPLORE_TOPICS/"):
        os.makedirs(f"{results_path}{base_name}_EXPLORE_TOPICS/")

    class_suffix = "_".join(grammatical_classes)
    with open(
        f"{results_path}{base_name}_EXPLORE_TOPICS/"
        f"{base_name}_{len(nmf_H)}tc_topic_modeling_unigrams_{class_suffix}_"
        f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
        f"{go_remove_duplicates}dup_{web_paper_differentiation}wp.csv",
        "w",
        encoding="utf-8"
    ) as file_object:
        writer = csv.writer(file_object)

        i = 0
        while i < max_rows_nb:
            new_row = ""
            for to in new_tab_words_nmf:
                if i < len(to):
                    if len(new_row) > 0:
                        new_row = new_row + "," + (to[i][0]) + "," + str(to[i][1])
                    else:
                        new_row = (to[i][0]) + "," + str(to[i][1])

            file_object.write(new_row)
            file_object.write("\n")

            i += 1

In [None]:
def determine_nmf(topic_list, alpha_W, alpha_H, l1_ratio, n_top_words=15, window_size=100):
    """
    Entraîne un modèle NMF pour chaque nombre de topics dans topic_list
    et calcule la métrique de cohérence (type sliding window) c_npmi.

    Paramètres
    ----------
    topic_list : list
        Liste des nombres de topics à tester (ex. [5, 10, 15]).
    n_top_words : int
        Nombre de mots que l’on va extraire pour chaque topic (top words).
    window_size : int
        Taille de la fenêtre de co-occurrence pour la cohérence (sliding window).

    Retour
    ------
    None. (Les résultats sont directement enregistrés dans les dictionnaires globaux.)
    """

    # On indique qu’on va modifier à ces variables globales
    global all_nmf_H, all_nmf_W
    global coherence_scores

    # 1. Construire le dictionary Gensim à partir des documents tokenisés
    dictionary = Dictionary(tokenized_documents)
    # Optionnel : filtrer les tokens trop rares ou trop fréquents
    # dictionary.filter_extremes(no_below=5, no_above=0.5)

    # 2. Pour chaque valeur de topics dans topic_list, on entraîne un modèle NMF
    nmf_models = {}
    for num_topic in tqdm(topic_list, desc="PROCESSUS DES TOPICS"):
        nmf_model = NMF(
            n_components=num_topic,
            random_state=1,
            max_iter=10000,
            alpha_W=alpha_W,      # remplace alpha=0.2
            alpha_H=alpha_H,      # idem
            l1_ratio=l1_ratio,  # Ratio proche de 0 => plus de L2
            init='nndsvd'
        ).fit(tfidf)

        nmf_W = nmf_model.transform(tfidf)
        nmf_H = nmf_model.components_

        # Stocker les matrices W et H dans les dictionnaires globaux
        all_nmf_W[num_topic] = nmf_W
        all_nmf_H[num_topic] = nmf_H

        # Initialiser pour ce num_topic
        all_topics_and_scores_by_document = {}

        # 3. Extraire les top words de chaque topic
        topic_words = []
        for t in range(num_topic):
            # Trouver les indices des "top n_top_words" en ordre décroissant
            top_word_indexes = nmf_H[t].argsort()[:-n_top_words-1:-1]
            # Récupérer les mots associés à ces indices
            words_for_topic_t = [tfidf_feature_names[idx] for idx in top_word_indexes]
            topic_words.append(words_for_topic_t)

        # 4. Calculer la cohérence par fenêtre glissante
        coherence_model = CoherenceModel(
            topics=topic_words,
            texts=tokenized_documents,
            dictionary=dictionary,
            coherence='c_npmi',   # ou 'c_uci' si souhaité
            window_size=window_size
        )
        coherence_score = coherence_model.get_coherence()
        coherence_scores[num_topic] = coherence_score

        nmf_models[num_topic] = nmf_model

    # 7. Fonctions de post-traitement (optionnelles)
    write_topics_unigrams()

    return nmf_models

In [None]:
def process_documents(documents):
    # Calcul du nombre de cœurs disponibles
    gpu_activated = spacy.prefer_gpu()  # True si GPU détecté, sinon None

    print('gpu_activated', gpu_activated)

    # n_process=1 si on a un GPU, sinon on utilise tous les CPU
    n_process = 1 if gpu_activated else multiprocessing.cpu_count()

    print(f"Utilisation de {n_process} processus parallèles pour spaCy.")

    # On s'assure de (ré)initialiser les tableaux globaux si nécessaire
    global documents_lemmatized, all_tab_pos, sentences_norms, all_sentence_pos
    documents_lemmatized = []
    all_tab_pos = []
    all_sentence_pos = []
    sentences_norms = []

    # Préparer une barre de progression
    pbar = tqdm(total=len(documents), desc='DOCUMENTS PROCESSÉS')

    # Traitement en parallèle avec nlp.pipe
    try:
        # Par défaut, spaCy divise en batch de ~1000 tokens.
        # On peut ajuster batch_size si besoin (ex: batch_size=20 ou 50).
        for spacy_doc in nlp_pipeline.pipe(documents, n_process=n_process, batch_size=20):
            doc_for_ngrams = ''
            tab_pos = []

            for sent in spacy_doc.sents:
                sentence_pos = []
                norms = []
                lemmes = []

                for token in sent:
                    pos = token.pos_
                    lemma = token.lemma_.lower()

                    # Exemple: unidecode si c'est un PROPN
                    if pos == 'PROPN':
                        lemma = unidecode.unidecode(lemma)

                    if lemma not in [" ", "\n", "\t"]:
                        doc_for_ngrams += lemma + ' '
                        tab_pos.append([lemma, pos])
                        sentence_pos.append([lemma, pos])
                        lemmes.append(lemma)
                        norms.append(token.norm_)

                sentences_norms.append(" ".join(norms))

                all_sentence_pos.append(sentence_pos)

            documents_lemmatized.append(doc_for_ngrams)
            all_tab_pos.append(tab_pos)

            pbar.update(1)

    except Exception as e:
        print(f"Erreur lors du traitement des documents : {e}")

    pbar.close()

    # Écriture des résultats sur disque (ou autre)
    write_raw_documents()
    write_lemmatized_documents()

In [None]:
def extract_and_convert_date(date_str):
    try:
        return parser.parse(date_str)
    except (parser.ParserError, ValueError):
        return None

In [None]:
def write_raw_documents():
    if not os.path.exists(f"{results_path}{base_name}_RAW/"):
            os.makedirs(f"{results_path}{base_name}_RAW/")

    with open(f"{results_path}{base_name}_RAW/raw_documents.txt", "w", encoding='utf-8') as file_object:
        for dfn in documents:
            file_object.write(dfn + '\n')

In [None]:
def write_lemmatized_documents():
    with open(f"{results_path}{base_name}_RAW/{base_name}_lemmatized_documents.txt",
              "w",
              encoding='utf-8') as file_object:
        for dfn in documents_lemmatized:
            file_object.write(dfn + '\n')

In [None]:
def extract_information(header, selector):
    elements = header.select(selector)
    if elements:
        return "////".join([get_text_from_tag(el).replace(';', ',') for el in elements])
    else:
        return "N/A"

In [None]:
def get_text_from_tag(tag):
    return ''.join(tag.strings)

In [None]:
def normalize_journal(t):
    t = t.strip()

    # Supprimer tout ce qui se trouve entre parenthèses (y compris les parenthèses)
    t = re.sub(r'\(.*?\)', '', t)

    # Supprimer tout ce qui se trouve après la première virgule
    t = re.sub(r',.*', '', t)

    # Supprimer tout ce qui se trouve après le premier tiret précédé d'un espace
    t = re.sub(r' -.*', '', t)

    # Supprimer tout ce qui suit trois espaces vides ou plus
    t = re.sub(r' {3,}.*', '', t)

    if not web_paper_differentiation:
        # Supprimer les préfixes "www."
        t = re.sub(r'^www\.', '', t)

        # Supprimer les extensions de domaine
        t = re.sub(r'(\.\w{2,3})+$', '', t)

    # Trim le texte
    t = t.strip()

    return t.lower()

In [None]:
def extract_date_info(date_text, language='fr'):
    if language == 'fr':
        regex = "([1-3]?[0-9]\\s(janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\\s20[0-2][0-9])"
    elif language == 'en':
        regex = "([1-3]?[0-9]\\s(January|February|March|April|May|June|July|August|September|October|November|December)\\s20[0-2][0-9])"

    date_text_clean = re.search(regex, date_text)
    return date_text_clean.group() if date_text_clean else date_text

In [None]:
def normalise_date(date_text):
    # Dictionnaire combiné des mois en anglais, français, espagnol et allemand avec leurs variations
    month_dict = {
        # Mois en anglais
        'january': '01', 'jan': '01',
        'february': '02', 'feb': '02',
        'march': '03', 'mar': '03',
        'april': '04', 'apr': '04',
        'may': '05',
        'june': '06', 'jun': '06',
        'july': '07', 'jul': '07',
        'august': '08', 'aug': '08',
        'september': '09', 'sep': '09', 'sept': '09',
        'october': '10', 'oct': '10',
        'november': '11', 'nov': '11',
        'december': '12', 'dec': '12',
        # Mois en français
        'janvier': '01', 'janv.': '01', 'janv': '01',
        'février': '02', 'févr.': '02', 'févr': '02', 'fevrier': '02', 'fevr': '02',
        'mars': '03',
        'avril': '04', 'avr.': '04', 'avr': '04',
        'mai': '05',
        'juin': '06',
        'juillet': '07', 'juil.': '07', 'juil': '07',
        'août': '08', 'aout': '08', 'aôut': '08',
        'septembre': '09', 'sept.': '09', 'sept': '09',
        'octobre': '10', 'oct.': '10', 'oct': '10',
        'novembre': '11', 'nov.': '11', 'nov': '11',
        'décembre': '12', 'déc.': '12', 'déc': '12', 'decembre': '12', 'dec': '12',
        # Mois en espagnol
        'enero': '01', 'ene.': '01', 'ene': '01',
        'febrero': '02', 'feb.': '02', 'feb': '02',
        'marzo': '03', 'mar.': '03', 'mar': '03',
        'abril': '04', 'abr.': '04', 'abr': '04',
        'mayo': '05', 'may.': '05', 'may': '05',
        'junio': '06', 'jun.': '06', 'jun': '06',
        'julio': '07', 'jul.': '07', 'jul': '07',
        'agosto': '08', 'ago.': '08', 'ago': '08',
        'septiembre': '09', 'sept.': '09', 'sep': '09', 'setiembre': '09', 'set.': '09', 'set': '09',
        'octubre': '10', 'oct.': '10', 'oct': '10',
        'noviembre': '11', 'nov.': '11', 'nov': '11',
        'diciembre': '12', 'dic.': '12', 'dic': '12',
        # Mois en allemand
        'januar': '01', 'jan.': '01', 'jan': '01',
        'februar': '02', 'feb.': '02', 'feb': '02',
        'märz': '03', 'maerz': '03', 'mär.': '03', 'marz': '03', 'mar.': '03', 'mar': '03',
        'april': '04', 'apr.': '04', 'apr': '04',
        'mai': '05',
        'juni': '06', 'jun.': '06', 'jun': '06',
        'juli': '07', 'jul.': '07', 'jul': '07',
        'august': '08', 'aug.': '08', 'aug': '08',
        'september': '09', 'sept.': '09', 'sep': '09', 'sept': '09',
        'oktober': '10', 'okt.': '10', 'okt': '10',
        'november': '11', 'nov.': '11', 'nov': '11',
        'dezember': '12', 'dez.': '12', 'dez': '12'
    }

    # Nettoyer le texte de la date
    date_text = date_text.lower().strip()

    # Liste unifiée des formats de dates à essayer
    date_formats = [
        # Exemples : 19 de noviembre de 2021, 19 novembre 2021, 19 november 2021, 19. November 2021
        r"(?:\b\w+\b,\s+)?(\d{1,2})(?:\.|\s+de|\s+)?\s*([\w\.\-]+)(?:\s+de)?\s+(\d{4})",
        # Exemples : noviembre 19, 2021, november 19, 2021
        r"(?:\b\w+\b,\s+)?([\w\.\-]+)\s+(\d{1,2}),?\s+(\d{4})",
        # Formats numériques : 19/11/2021, 11/19/2021
        r"(\d{1,2})/(\d{1,2})/(\d{4})",
        # Formats numériques avec tirets : 19-11-2021, 11-19-2021
        r"(\d{1,2})-(\d{1,2})-(\d{4})",
        # Année en premier : 2021-11-19
        r"(\d{4})-(\d{1,2})-(\d{1,2})",
        # Année en premier avec slash : 2021/11/19
        r"(\d{4})/(\d{1,2})/(\d{1,2})",
        # Formats avec points : 19.11.2021
        r"(\d{1,2})\.(\d{1,2})\.(\d{4})",
    ]

    for pattern in date_formats:
        match = re.search(pattern, date_text, re.IGNORECASE)
        if match:
            groups = match.groups()
            # Déterminer l'ordre des éléments en fonction du motif
            if pattern.startswith(r"(?:\b\w+\b,\s+)?(\d{1,2})"):
                # Motif : Jour [de] Mois [de] Année (ex : 19 de noviembre de 2021)
                day, month, year = groups
            elif pattern.startswith(r"(?:\b\w+\b,\s+)?([\w\.\-]+)"):
                # Motif : Mois Jour, Année (ex : noviembre 19, 2021)
                month, day, year = groups
            elif pattern.startswith(r"(\d{1,2})/(\d{1,2})/"):
                # Motif : Numérique avec slash (ambigu)
                first, second, year = groups
                if int(first) > 12:
                    # Probablement Jour/Mois/Année
                    day, month = first, second
                elif int(second) > 12:
                    # Probablement Mois/Jour/Année
                    month, day = first, second
                else:
                    # Ambigu, par défaut Jour/Mois/Année
                    day, month = first, second
                day = day.zfill(2)
                month = month.zfill(2)
                return f"{year}-{month}-{day}"
            elif pattern.startswith(r"(\d{1,2})-(\d{1,2})-"):
                # Motif : Numérique avec tirets (ambigu)
                first, second, year = groups
                if int(first) > 12:
                    day, month = first, second
                elif int(second) > 12:
                    month, day = first, second
                else:
                    day, month = first, second
                day = day.zfill(2)
                month = month.zfill(2)
                return f"{year}-{month}-{day}"
            elif pattern.startswith(r"(\d{4})-(\d{1,2})-(\d{1,2})"):
                # Motif : Année-Mois-Jour
                year, month, day = groups
            elif pattern.startswith(r"(\d{4})/(\d{1,2})/(\d{1,2})"):
                # Motif : Année/Mois/Jour
                year, month, day = groups
            elif pattern.startswith(r"(\d{1,2})\.(\d{1,2})\.(\d{4})"):
                # Motif : Jour.Mois.Année
                day, month, year = groups
            else:
                # Motif non reconnu
                continue

            month = month.lower().replace('.', '').strip()
            day = day.zfill(2)

            # Convertir le mois en chiffre
            if month.isdigit():
                month_num = month.zfill(2)
            elif month in month_dict:
                month_num = month_dict[month]
            else:
                print(f"Attention, mois non reconnu : {month}")
                continue

            return f"{year}-{month_num}-{day}"

    print('Attention, date non gérée :', date_text)
    # Retourner None si aucun format n'est reconnu
    return None

In [None]:
def standardize_name(name):
    words = name.split()
    words.sort()
    return ' '.join(words)

In [None]:
def split_names(s):
    words = s.split()
    if len(words) == 4:
        first_name = ' '.join(words[:2])
        second_name = ' '.join(words[2:])
        return [first_name, second_name]
    elif len(words) == 6:
        first_name = ' '.join(words[0:2])
        second_name = ' '.join(words[2:4])
        third_name = ' '.join(words[4:6])
        return [first_name, second_name, third_name]
    elif len(words) == 8:
        first_name = ' '.join(words[0:2])
        second_name = ' '.join(words[2:4])
        third_name = ' '.join(words[4:6])
        fourth_name = ' '.join(words[4:6])
        return [first_name, second_name, third_name, fourth_name]

    return [s]

In [None]:
def transform_text(text):
    text = text.replace('\n', ' ')
    text = text.replace('\t', ' ')

   # text = re.sub(r'[-–—‑‒−]', ' ', text)
    text = re.sub(r'\s+', ' ', text)

    # écriture inclusive
    text = text.replace('(e)', '')
    text = text.replace('(E)', '')
    text = text.replace('.e.', '')
    text = text.replace('.E.', '')

    return text

In [None]:
def extract_names(line):
    if len(line) > 150:
        return None

    # Supprimer tout ce qui est entre parenthèses
    line = re.sub(r'\(.*?\)', '', line)

    # Ignorer les lignes qui contiennent des domaines ou "N/A"
    if re.search(r'(\.fr|\.com|n/a)', line):
        return None

    line = re.sub(r'\s?@\w+', '', line)
    line = line.replace('.', '')
    line = line.replace('"', '')
    line = line.replace('«', '')
    line = line.replace('»', '')
    line = re.sub(r'\s+', ' ', line).strip()

    # Si la ligne contient "////", supprimez tout ce qui est à gauche et "////" lui-même
    if "////" in line:
        line = line.split("////")[1].strip()

    line = line.replace(',', ', ')
    line = re.sub(r'\s+', ' ', line).strip()

    # Si la ligne contient des virgules ou "et", divisez la ligne et prenez les noms

    names = []
    if len(line.split()) > 3:
        parts = re.split(',| et', line)
        for part in parts:
            names.extend(split_names(part.strip()))
    else:
        line = line.replace(',', '')
        names.extend(split_names(line.strip()))

    return set(names)

In [None]:
def write_info_europresse(scores, article, actual_doc):
    header = article.header

    # Extraire les informations (adaptez en fonction de vos fonctions extract_information, etc.)
    title_text = extract_information(header, '.titreArticle p')
    journal_text = extract_information(header, '.rdp__DocPublicationName')
    date_text = extract_information(header, '.DocHeader')

    journal_text = normalize_journal(journal_text)
    date_text_clean = extract_date_info(date_text)
    normalized_date = normalise_date(date_text_clean)

    if normalized_date is not None:
        date_normalized = normalized_date.replace(';', '').replace('&', '')
    else:
        date_normalized = date_text_clean

    # Vérifier si le tableau scores n'est pas vide
    if scores.size > 0:
        max_topic_index = np.argmax(scores)
    else:
        max_topic_index = -1

    # Calculer la clé pour récupérer le bon tableau de labels
    config_key = len(scores)

    # Récupérer le label correspondant au lieu de l'indice
    if config_key in topic_labels_by_config and 0 <= max_topic_index < len(topic_labels_by_config[config_key]):
        main_topic_label = topic_labels_by_config[config_key][max_topic_index]
    else:
        main_topic_label = "Unknown topic"

    # Convertir chaque score en chaîne
    scores_list = [str(score) for score in scores]

    # Extraction des noms
    names_raw = extract_information(header, '.sm-margin-bottomNews').lower()
    names = extract_names(names_raw)
    if names:
        actual_names = [standardize_name(name) for name in names]
        filtered_names = [
            name for name in actual_names
            if not any(
                other_name != name
                and set(name.split()) < set(other_name.split())
                for other_name in actual_names
            )
        ]
        all_names = filtered_names
    else:
        all_names = None

    chaine_authors = "None" if all_names is None else ', '.join(map(str, all_names))

    # Retourner une liste plutôt qu'une chaîne
    # Remarquez qu'on place main_topic_label à la place de l'ancien max_topic_index
    return [
        title_text.replace(';', ''),
        chaine_authors,
        names_raw,
        str(len(actual_doc)),
        journal_text.replace(';', ''),
        date_normalized,
        main_topic_label   # Le label au lieu de l'indice
    ] + scores_list

In [None]:
def write_info_another(scores, columns_dict, i, actual_doc):
    # Vérifier si le tableau scores n'est pas vide
    if scores.size > 0:
        max_topic_index = np.argmax(scores)  # indice de la valeur max
    else:
        max_topic_index = -1

    # Préparer la liste des scores en chaînes de caractères
    scores_list = [str(s) for s in scores]

    # Construire la ligne sous forme de liste
    row = []
    for key in columns_dict:
        row.append(str(columns_dict[key][i]))

    # Ajouter nb_characters (en supposant que actual_doc est une chaîne)
    row.append(str(len(actual_doc)))

    # --- Récupérer le label à la place de l'indice ---
    # La clé dans le dictionnaire : len(scores) + 1
    config_key = len(scores)

    if config_key in topic_labels_by_config and 0 <= max_topic_index < len(topic_labels_by_config[config_key]):
        main_topic_label = topic_labels_by_config[config_key][max_topic_index]
    else:
        # Au cas où la clé ou l'indice n'existe pas dans le dictionnaire
        main_topic_label = "Unknown topic"

    # Ajouter le label du sujet principal au lieu de l'indice
    row.append(main_topic_label)

    # Ajouter les scores
    row.extend(scores_list)

    return row

In [None]:
def remove_urls_hashtags_emojis_mentions_emails(text):
    # Supprimer les URLs
    text = re.sub(r'https?://\S+', '', text)

    # Supprimer les hashtags
   # text = re.sub(r'#\w+', '', text)

    # Supprimer les mentions
  #  text = re.sub(r'@\w+', '', text)

    # Supprimer les e-mails
 #   text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text)

    # Supprimer les émojis
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F700-\U0001F77F"  # alchemical symbols
                           u"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
                           u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
                           u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
                           u"\U0001FA00-\U0001FA6F"  # Chess Symbols
                           u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
                           u"\U00002702-\U000027B0"  # Dingbats
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
 #   text = emoji_pattern.sub(r'', text)

    return text

In [None]:
def extract_info(topic_nums, article):
    header = article.header

    date_text = extract_information(header, '.DocHeader')
    date_text_clean = extract_date_info(date_text)
    if normalise_date(date_text_clean) != None:
        date_normalized = normalise_date(date_text_clean).replace(';', '').replace('&', '')
    else:
        date_normalized = date_text_clean

    topics_dict = dict(topic_nums)

    return {date_normalized: topics_dict}

In [None]:
def aggregate_scores(articles_info):
    aggregated_scores = {}

    for info in articles_info:
        for date, topics in info.items():
            if date not in aggregated_scores:
                aggregated_scores[date] = {}

            for topic, score in topics.items():
                if topic not in aggregated_scores[date]:
                    aggregated_scores[date][topic] = 0
                aggregated_scores[date][topic] += score

    return aggregated_scores

In [None]:
def create_chrono_topics(sigma, apply_normalizations=False):
    """
    Fonction qui agrège les scores de topics par date,
    prépare le DataFrame final, puis appelle `plot_custom_heatmap`
    pour le tracé (avec clustering, colorbar, ticks, etc.),
    en s'assurant d'un alignement robuste entre topic_num et topic_label.
    """
    # 1) Création du répertoire de sortie
    if not os.path.exists(f"{results_path}{base_name}_TOPICS_DYNAMICS_HEATMAPS/"):
        os.makedirs(f"{results_path}{base_name}_TOPICS_DYNAMICS_HEATMAPS/")

    # 2) Pour chaque configuration (nombre de topics)
    for num_topic in tqdm(all_nmf_W, desc="CONFIGURATIONS PROCESSÉES"):
        config_path = (
            f"{results_path}{base_name}_TOPICS_DYNAMICS_HEATMAPS"
        )
        if not os.path.exists(config_path):
            os.makedirs(config_path)

        # --- 2.1) Extraction des infos articles selon la source ---
        if source_type == 'europresse':
            # On construit une liste d'objets "articles_info"
            articles_info = [
                extract_info(
                    # On crée un dictionnaire { "0": score_doc_i_topic_0, "1": score_doc_i_topic_1, ... }
                    {str(topic_num): all_nmf_W[num_topic][i, topic_num] for topic_num in range(all_nmf_W[num_topic].shape[1])},
                    article
                )
                for i, article in enumerate(all_soups)
            ]
        elif source_type in ['csv', 'istex']:
            articles_info = []
            all_dates = formater_liste_dates(columns_dict['date'])
            i = 0
            while i < len(all_dates):
                score_dict = {
                    str(topic_num): all_nmf_W[num_topic][i, topic_num]
                    for topic_num in range(all_nmf_W[num_topic].shape[1])
                }
                articles_info.append({all_dates[i]: score_dict})
                i += 1

        # --- 2.2) Agrégation par date ---
        #     aggregate_scores() doit renvoyer un dict du type :
        #     {
        #        "12/05/2020": {"0": 0.43, "1": 0.12, ...},
        #        "13/05/2020": {"0": 0.22, "1": 0.61, ...},
        #         ...
        #     }
        aggregated_scores = aggregate_scores(articles_info)

        renamed_aggregated_scores = {
            date: {
                topic_labels_by_config[num_topic][int(topic_str)]: score
                for topic_str, score in scores_dict.items()
            }
            for date, scores_dict in aggregated_scores.items()
        }

        aggregated_scores = renamed_aggregated_scores

        # --- 2.3) Calcul du nombre de documents par date ---
        doc_counts_by_date = defaultdict(int)
        for elt in articles_info:
            if isinstance(elt, dict):
                for date_str in elt.keys():
                    doc_counts_by_date[date_str] += 1

        # --- 2.4) Filtrage des dates valides & conversion ---
        valid_dates = {}
        for date_str, score_dict in aggregated_scores.items():
            date_obj = extract_and_convert_date(date_str)
            if date_obj:
                # Format unifié "jj/mm/YYYY"
                valid_dates[date_obj.strftime('%d/%m/%Y')] = score_dict

        # --- 2.5) Tri chronologique de ces dates ---
        aggregated_scores_sorted = {
            date: valid_dates[date]
            for date in sorted(valid_dates, key=lambda d: datetime.strptime(d, '%d/%m/%Y'))
        }

        # -----------------------------------------------------------------------
        # --- 3) Construction d'un DataFrame "long" (date, topic_num, score) ---
        # -----------------------------------------------------------------------
        #  L'objectif : obtenir une table de la forme :
        #
        #       date         topic_num   score
        #     "12/05/2020"       0       0.43
        #     "12/05/2020"       1       0.12
        #     "13/05/2020"       0       0.22
        #     ...
        #
        #  afin de pouvoir ensuite fusionner sur un DataFrame de mapping
        #  (topic_num -> topic_label) et enfin faire un pivot.
        # -----------------------------------------------------------------------

        rows = []
        for date_str, topics_dict in aggregated_scores_sorted.items():
            for topic_num_str, score in topics_dict.items():
                rows.append({
                    "Date": date_str,
                    "Topic": topic_num_str,
                    "Score": score
                })

        df_long = pd.DataFrame(rows)  # "long format"

        # --- 3.3) Pivot sur le label de topic (index) et la date (columns) ---
        #   Les valeurs sont "Score"
        df_pivoted = df_long.pivot(
            index="Topic",
            columns="Date",
            values="Score"
        ).fillna(0)  # on remplit à 0 les absences

        # ---------------------------------------------------------------------
        # --- 4) Optionnel : division par le nombre de documents (normalization)
        # ---------------------------------------------------------------------
        if apply_normalizations:
            for col in df_pivoted.columns:
                nb_docs = doc_counts_by_date.get(col, 1)  # éviter la division par 0
                df_pivoted[col] = df_pivoted[col] / nb_docs

        # ---------------------------------------------------------------------
        # --- 5) Conversion des colonnes en datetime & tri chronologique
        # ---------------------------------------------------------------------
        df_pivoted.columns = pd.to_datetime(df_pivoted.columns, format='%d/%m/%Y', errors='coerce')
        # On enlève d'éventuels NaT si conversion ratée (ou on pourrait ignorer)
        df_pivoted = df_pivoted.loc[:, df_pivoted.columns.notna()]

        # Tri des colonnes par ordre chronologique
        df_pivoted = df_pivoted.reindex(sorted(df_pivoted.columns), axis=1)

        # ---------------------------------------------------------------------
        # --- 6) Réindexation sur la plage complète de dates
        #         (de la plus ancienne à la plus récente)
        # ---------------------------------------------------------------------
        if not df_pivoted.columns.empty:
            oldest_date = df_pivoted.columns.min()
            newest_date = df_pivoted.columns.max()
            date_range = pd.date_range(start=oldest_date, end=newest_date)

            # On réindexe, et on remplit à 0 pour les dates manquantes
            df_pivoted = df_pivoted.reindex(columns=date_range, fill_value=0)
        else:
            # S'il n'y a pas de colonne (cas extrême), on ne fait rien
            date_range = []

        # ---------------------------------------------------------------------
        # --- 7) Préparation pour la heatmap
        # ---------------------------------------------------------------------
        # df_pivoted est un DataFrame (topics_label en lignes, dates en colonnes)
        df_normalized = df_pivoted  # pour garder le même nom que l'ancien code

        # Si sigma = 'auto', on le définit maintenant (pour plot_custom_heatmap)
        if sigma == 'auto':
            sigma = len(df_normalized.columns) / 15 if len(df_normalized.columns) > 0 else 1

        # ---------------------------------------------------------------------
        # --- 8) Appel de plot_custom_heatmap ---
        # ---------------------------------------------------------------------
        plot_custom_heatmap(
            df=df_normalized,
            sigma=sigma,
            cmap="YlGnBu",            # palette
            relative_normalizaton=apply_normalizations,  # normalisation interne
            with_colormap=False
        )

        # ---------------------------------------------------------------------
        # --- 9) Sauvegarde de la figure ---
        # ---------------------------------------------------------------------
        class_suffix = "_".join(grammatical_classes)
        plt.savefig(
            f"{results_path}{base_name}_TOPICS_DYNAMICS_HEATMAPS/"
            f"{base_name}_topics_dynamics_heatmap_{num_topic}tc_{apply_normalizations}n_{('auto' if sigma=='auto' else int(sigma))}s"
            f"_{class_suffix}_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_dtwcompletechdtw2.png",
            dpi=DPI,
            bbox_inches='tight',
            pad_inches=0,
        )
        plt.close()

In [None]:
def correct_dates(dictionary):
    corrected_dict = {}
    for date, count in dictionary.items():
        if len(date) < 10:
            # Ajouter '0' au début de la date
            corrected_date = '0' + date
        else:
            corrected_date = date
        corrected_dict[corrected_date] = count
    return corrected_dict

In [None]:
def create_chrono_group_column(group_column, sigma, apply_normalizations=False):
    """
    Prépare les données par groupe/colonne (journal ou autre) au fil du temps,
    puis crée la heatmap temporelle en utilisant `plot_custom_heatmap`,
    en s'appuyant sur la stratégie "robuste" (format long + pivot).
    """
    # 1) Création du répertoire de sortie
    output_dir = f"{results_path}{base_name}_GROUPS_DYNAMICS_HEATMAPS/"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # ---------------------------------------------------------
    # 2) Agrégation des comptes (scores) en fonction de la source
    #    => On va créer un dict: date -> {groupe: count}
    # ---------------------------------------------------------
    aggregated_scores = {}

    if source_type == 'europresse':
        for article in all_soups:
            header = article.header
            journal_text = extract_information(header, '.rdp__DocPublicationName')
            date_text = extract_information(header, '.DocHeader')

            journal_text = normalize_journal(journal_text)
            date_text_clean = extract_date_info(date_text)

            date_normalized = normalise_date(date_text_clean)
            if date_normalized is None:
                # Si on n'a pas pu normaliser, on prend la valeur brute
                date_normalized = date_text_clean
            else:
                # Nettoyage de base
                date_normalized = date_normalized.replace(';', '').replace('&', '')

            if date_normalized not in aggregated_scores:
                aggregated_scores[date_normalized] = {}
            aggregated_scores[date_normalized].setdefault(journal_text, 0)
            aggregated_scores[date_normalized][journal_text] += 1

    elif source_type == 'istex':
        for i in range(len(columns_dict['date'])):
            date_raw = columns_dict['date'][i]
            journal = columns_dict['journal'][i]

            # Nettoyage basique
            date_normalized = date_raw.replace(';', '').replace('&', '')

            if date_normalized not in aggregated_scores:
                aggregated_scores[date_normalized] = {}
            aggregated_scores[date_normalized].setdefault(journal, 0)
            aggregated_scores[date_normalized][journal] += 1

    elif source_type == 'csv':
        # Vérification de la disponibilité des colonnes requises
        if 'date' not in columns_dict:
            print("La colonne 'date' n'existe pas dans columns_dict. Elle est requise.")
            return

        if group_column not in columns_dict:
            print(f"La colonne '{group_column}' n'existe pas dans columns_dict. Vérifiez les colonnes disponibles.")
            return

        for i in range(len(columns_dict['date'])):
            date_raw = columns_dict['date'][i]
            group_value = columns_dict[group_column][i]

            # Nettoyage
            date_normalized = date_raw.replace(';', '').replace('&', '')

            if date_normalized not in aggregated_scores:
                aggregated_scores[date_normalized] = {}
            aggregated_scores[date_normalized].setdefault(group_value, 0)
            aggregated_scores[date_normalized][group_value] += 1

    else:
        # Source non gérée
        return

    # ---------------------------------------------------------
    # 3) Conversion des dates & tri chronologique
    #    => format "jj/mm/YYYY" pour éviter les ambiguïtés
    # ---------------------------------------------------------
    valid_dates = {}
    for date_str, score_dict in aggregated_scores.items():
        date_obj = extract_and_convert_date(date_str)
        if date_obj:
            valid_dates[date_obj.strftime('%d/%m/%Y')] = score_dict

    # Tri chronologique des dates
    sorted_dates = sorted(valid_dates, key=lambda d: datetime.strptime(d, '%d/%m/%Y'))
    aggregated_scores_sorted = {
        date: valid_dates[date]
        for date in sorted_dates
    }

    # ---------------------------------------------------------
    # 4) Calcul du nombre total d'articles par date (pour normaliser si besoin)
    # ---------------------------------------------------------
    aggregated_article_counts = {}
    for date, group_counts in aggregated_scores_sorted.items():
        aggregated_article_counts[date] = sum(group_counts.values())

    # ---------------------------------------------------------
    # 5) Passage en "long format"
    #    => on obtient des lignes : [Date, Group, Count]
    # ---------------------------------------------------------
    rows = []
    for date, group_dict in aggregated_scores_sorted.items():
        for group_value, count in group_dict.items():
            rows.append({
                "Date": date,
                "Group": group_value,
                "Count": count
            })

    df_long = pd.DataFrame(rows, columns=["Date", "Group", "Count"])

    # ---------------------------------------------------------
    # 6) Pivot => index = Group, columns = Date, values = Count
    # ---------------------------------------------------------
    df_pivoted = df_long.pivot(
        index='Group',
        columns='Date',
        values='Count'
    ).fillna(0)

    # ---------------------------------------------------------
    # 7) Application d’un threshold sur les lignes (si besoin)
    #    => on ne garde que les groupes dont la somme >= threshold
    # ---------------------------------------------------------
    df_pivoted = df_pivoted[df_pivoted.sum(axis=1) >= threshold]

    # Copie pour d’éventuelles normalisations
    df_normalized = df_pivoted.copy()

    # ---------------------------------------------------------
    # 8) Division par nb_docs si apply_normalizations == True
    # ---------------------------------------------------------
    if len(df_normalized) > 1:
        if apply_normalizations:
            for col in df_normalized.columns:
                nb_docs = aggregated_article_counts.get(col, 1)  # éviter division par 0
                df_normalized[col] = df_normalized[col] / nb_docs

    # ---------------------------------------------------------
    # 9) Conversion des colonnes en datetime et tri chrono
    # ---------------------------------------------------------
    df_normalized.columns = pd.to_datetime(df_normalized.columns, format='%d/%m/%Y', errors='coerce')
    # On enlève d’éventuelles colonnes non converties
    df_normalized = df_normalized.loc[:, df_normalized.columns.notnull()]

    # Tri des dates chronologiquement
    df_normalized = df_normalized.reindex(sorted(df_normalized.columns), axis=1)

    # ---------------------------------------------------------
    # 10) Réindexer pour inclure TOUTES les dates manquantes
    #     => on remplit par 0
    # ---------------------------------------------------------
    if not df_normalized.columns.empty:
        oldest_date = df_normalized.columns.min()
        newest_date = df_normalized.columns.max()
        date_range = pd.date_range(start=oldest_date, end=newest_date)
        df_normalized = df_normalized.reindex(columns=date_range, fill_value=0)
    else:
        # Pas de dates valides, on ne fait rien
        date_range = []

    # ---------------------------------------------------------
    # 11) (Optionnel) Filtrage gaussien ici
    #     => si vous préférez laisser plot_custom_heatmap s'en charger, commentez.
    # ---------------------------------------------------------
    # ...

    # ---------------------------------------------------------
    # 12) (Optionnel) MinMaxScaling par ligne
    #     => idem, si vous le faites ici, ne le refaites pas dans plot_custom_heatmap.
    # ---------------------------------------------------------
    # ...

    # ---------------------------------------------------------
    # 13) Ajustement sigma si "auto"
    # ---------------------------------------------------------
    if sigma == 'auto':
        nb_cols = len(df_normalized.columns)
        sigma = nb_cols / 15.0 if nb_cols > 0 else 1

    if len(df_normalized) == 1 and apply_normalizations:
        return
    else:
        plot_custom_heatmap(
            df=df_normalized,
            sigma=sigma,
            cmap="YlGnBu",
            relative_normalizaton=apply_normalizations,
            with_colormap=False
        )

        # ---------------------------------------------------------
        # 15) Sauvegarde de la figure
        # ---------------------------------------------------------
        plt.savefig(
            f"{output_dir}{base_name}_groups_dynamics_heatmap_{apply_normalizations}n_{('auto' if sigma=='auto' else int(sigma))}s_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_{web_paper_differentiation}wp_"
            f"{threshold}thr_dtwcompletechdtw2.png",
            dpi=DPI,
            bbox_inches='tight',
            pad_inches=0,
        )
        plt.close()

In [None]:
def create_results_folder(base_name):
    if not os.path.exists(results_path):
        os.makedirs(results_path)

    name_document = f'{base_name}.csv'

In [None]:
# =============================================================================
# Fonction principale pour charger les documents
# =============================================================================
def load_documents(name, source_type, minimum_caracters_nb_by_document, pbar):
    """
    Récupère et nettoie les documents à partir d'un fichier ou dossier donné,
    en fonction du 'source_type' (europresse, csv, istex).
    Retourne :
      - documents : liste de textes
      - all_soups : liste de BeautifulSoup (pour europresse, sinon vide)
      - columns_dict : dictionnaire contenant d'autres colonnes (pour CSV, ISTEX...)
    """
    documents = []
    all_soups = []
    columns_dict = {}

    # -------------------------------------------------------------------------
    # CAS 1 : Fichiers issus de Europresse
    # -------------------------------------------------------------------------
    if source_type == 'europresse':
        document_europresse = ''

        # Lecture du fichier HTML brut
        with open(name, 'r', encoding='utf-8', errors='xmlcharrefreplace') as file:
            for line in file:
                document_europresse += line

        # Décode les entités HTML et répare la séparation entre articles
        document_europresse = html.unescape(document_europresse)
        document_europresse = document_europresse.replace('</article> <article>', '</article><article>')
        documents_europresse = document_europresse.split('</article><article>')

        nb_not_occur = 0
        for d in documents_europresse:
            soup = BeautifulSoup(d, features="html.parser")

            # On met à jour la barre de progression pour chaque article
            pbar.update(1)

            # On retire les paragraphes "Lire aussi ..." qui ne contiennent pas assez de texte
            for p in soup.find_all('p'):
                p_text = p.get_text()
                if ("Lire aussi" in p_text and ("http" in p_text or "https" in p_text) and len(p_text) <= 1000):
                    p.decompose()

            # Si on trouve la div "docOcurrContainer", c'est là qu'est le texte
            if len(soup('div', {'class': 'docOcurrContainer'})) > 0:
                # Corrige des fins de paragraphes manquantes (ponctuation)
                for p in soup.find_all('p'):
                    # Trouver le prochain caractère alphabétique après ce paragraphe
                    next_char_match = re.search(
                        r'(?<=' + re.escape(p.text) + r')\s*(?:<[^>]*>)*\s*([a-zA-Z])',
                        str(soup)
                    )
                    # Ajoute un point si le paragraphe ne se termine pas par '.'
                    # et que le prochain char est une majuscule
                    if not p.text.endswith('.') and next_char_match and next_char_match.group(1).isupper():
                        p.string = p.text + '. '

                # On recrée la soupe après modifications
                soup = BeautifulSoup(str(soup), features='html.parser')

                candidate_text = soup('div', {'class': 'docOcurrContainer'})[0].get_text()
                if (minimum_caracters_nb_by_document <= len(candidate_text) < maximum_caracters_nb_by_document):
                    candidate_text = remove_urls_hashtags_emojis_mentions_emails(candidate_text)
                    candidate_text = transform_text(candidate_text)
                    documents.append(candidate_text)
                    all_soups.append(soup)
            else:
                nb_not_occur += 1

    # -------------------------------------------------------------------------
    # CAS 2 : Fichier CSV
    # -------------------------------------------------------------------------
    elif source_type == 'csv':
        # 1. Vérifier la taille du fichier
        file_size_bytes = os.path.getsize(name)
        file_size_mb = file_size_bytes / (1024 * 1024)
        print(f"[*] Taille du fichier CSV = {file_size_mb:.2f} Mo")

        if file_size_mb > 100:
            # 2. Si le fichier dépasse 200 Mo, on lit directement en UTF-8, sep=';'
            print("[*] Le fichier est > 200 Mo : lecture directe en UTF-8 (séparateur ';')")
            try:
                df = pd.read_csv(name, encoding='utf-8', sep=';', on_bad_lines='skip', low_memory=False)
            except Exception as e:
                print(f"[!] Erreur lors de la lecture du fichier >200Mo en UTF-8/';' : {e}")
                return ([], [], {})  # Retourne des listes/dict vides

        else:
            # 3. Fichier <= 200 Mo : détection d'encodage via charset-normalizer
            print("[*] Le fichier est <= 200 Mo : on effectue la détection d'encodage")
            results = from_path(name)
            best_guess = results.best()

            if best_guess is not None:
                detected_encoding = best_guess.encoding
                raw_data = best_guess.raw
            else:
                detected_encoding = None
                with open(name, 'rb') as f:
                    raw_data = f.read()

            # On se limite à utf-8 + encodage système éventuel
            common_encodings = ['utf-8']
            system_encoding = locale.getpreferredencoding(False)
            if system_encoding and system_encoding.lower() not in [enc.lower() for enc in common_encodings]:
                common_encodings.append(system_encoding)

            # Si charset-normalizer propose un encodage non dans la liste, on l'insère en priorité
            if detected_encoding and detected_encoding.lower() not in [enc.lower() for enc in common_encodings]:
                common_encodings.insert(0, detected_encoding)

            best_score = None
            best_df = None
            best_sep = None

            # On tente les encodages présents dans 'common_encodings'
            for enc in common_encodings:
                try:
                    # Décodage en mémoire (bytes -> str)
                    content_str = raw_data.decode(enc, errors='replace')
                    file_like = io.StringIO(content_str)

                    # Détection du séparateur via la 1ère ligne
                    first_line = file_like.readline().strip()
                    file_like.seek(0)
                    if ',' in first_line:
                        sep = ','
                    elif ';' in first_line:
                        sep = ';'
                    else:
                        sep = None

                    file_like.seek(0)
                    try:
                        # Lecture du CSV multi-colonnes
                        df_test = pd.read_csv(file_like, header=0, sep=sep, on_bad_lines='skip', low_memory=False)
                    except Exception:
                        # Si échec, on retente en "mono-colonne"
                        file_like.seek(0)
                        header = file_like.readline().strip()
                        content = [line.strip() for line in file_like]
                        df_test = pd.DataFrame(content, columns=[header])

                    # Compter les caractères invalides
                    invalid_chars = df_test.to_string().count('�')
                    if best_score is None or invalid_chars < best_score:
                        best_score = invalid_chars
                        best_df = df_test
                        best_sep = sep

                except Exception:
                    # Si l'encodage échoue, on ignore
                    continue

            # À la fin, best_df est le DataFrame "le moins corrompu"
            df = best_df
            if df is None:
                print("[!] Impossible de lire le CSV avec les encodages testés.")
                return ([], [], {})

            # Conversion en minuscules et renommage éventuel
            df.columns = df.columns.str.lower()
            df = df.rename(columns={'post created date': 'date'})
            df.fillna('', inplace=True)

        # -------------------------------------------------------------------------
        # À ce stade, on possède un DataFrame 'df', soit gros CSV (lecture directe),
        # soit petit CSV (<=200 Mo) après détection d'encodage.
        # Le code qui suit (choix de la colonne, filtrage, etc.) reste inchangé :
        # -------------------------------------------------------------------------

        # Choix de la colonne texte
        if 'text' in df.columns:
            column_to_use = 'text'
        elif 'description' in df.columns:
            column_to_use = 'description'
        else:
            print("Les colonnes 'text' ou 'description' ne sont pas présentes dans le DataFrame.")
            return ([], [], {})  # Retourne des listes/dict vides

        # Filtrage par longueur
        df = df.loc[
            (df[column_to_use].str.len() >= minimum_caracters_nb_by_document) &
            (df[column_to_use].str.len() <= maximum_caracters_nb_by_document)
        ]

        # Nettoyage de chaque document
        documents = df[column_to_use].tolist()
        for i in range(len(documents)):
            documents[i] = remove_urls_hashtags_emojis_mentions_emails(documents[i])
            documents[i] = transform_text(documents[i])

        # Stockage des autres colonnes
        for column in df.columns:
            if column not in ['text', 'description']:
                columns_dict[column] = df[column].tolist()

        # À ce stade, on a un DataFrame 'df' lisible
        # -------------------------------------------------------------------------
        # Choix de la colonne texte
        if 'text' in df.columns:
            column_to_use = 'text'
        elif 'description' in df.columns:
            column_to_use = 'description'
        else:
            print("Les colonnes 'text' ou 'description' ne sont pas présentes dans le DataFrame.")
            return ([], [], {})  # Retourne des listes/dict vides

        # Filtrage par longueur
        df = df.loc[
            (df[column_to_use].str.len() >= minimum_caracters_nb_by_document) &
            (df[column_to_use].str.len() <= maximum_caracters_nb_by_document)
        ]

        # Nettoyage de chaque document
        documents = df[column_to_use].tolist()
        for i in range(len(documents)):
            documents[i] = remove_urls_hashtags_emojis_mentions_emails(documents[i])
            documents[i] = transform_text(documents[i])

        # Stockage des autres colonnes
        for column in df.columns:
            if column not in ['text', 'description']:
                columns_dict[column] = df[column].tolist()

    # -------------------------------------------------------------------------
    # CAS 3 : ISTEX
    # -------------------------------------------------------------------------
    elif source_type == 'istex':
        def get_nested(data, keys):
            """Fonction utilitaire pour extraire des données imbriquées dans un dictionnaire."""
            for key in keys:
                if isinstance(data, dict):
                    data = data.get(key)
                else:
                    return None
            return data

        documents = []
        columns_dict = {}

        # Champs ISTEX à extraire
        fields_to_extract = [
            'date', 'title', 'doi', 'journal', 'language', 'originalGenre',
            'accessCondition', 'pdfVersion', 'abstractCharCount', 'pdfPageCount',
            'pdfWordCount', 'score', 'pdfText', 'imageCount', 'refCount',
            'sectionCount', 'paragraphCount', 'tableCount', 'categories_scopus',
            'categories_scienceMetrix', 'host_volume', 'host_issue',
            'host_publisher', 'host_pages_first', 'host_pages_last', 'host_title',
            'refBibs_count',
        ]

        field_mappings = {
            'date': ['publicationDate'],
            'title': ['title'],
            'doi': ['doi'],
            'journal': ['host', 'title'],
            'language': ['language'],
            'originalGenre': ['originalGenre'],
            'accessCondition': ['accessCondition', 'value'],
            'pdfVersion': ['qualityIndicators', 'pdfVersion'],
            'abstractCharCount': ['qualityIndicators', 'abstractCharCount'],
            'pdfPageCount': ['qualityIndicators', 'pdfPageCount'],
            'pdfWordCount': ['qualityIndicators', 'pdfWordCount'],
            'score': ['qualityIndicators', 'score'],
            'pdfText': ['qualityIndicators', 'pdfText'],
            'imageCount': ['qualityIndicators', 'xmlStats', 'imageCount'],
            'refCount': ['qualityIndicators', 'xmlStats', 'refCount'],
            'sectionCount': ['qualityIndicators', 'xmlStats', 'sectionCount'],
            'paragraphCount': ['qualityIndicators', 'xmlStats', 'paragraphCount'],
            'tableCount': ['qualityIndicators', 'xmlStats', 'tableCount'],
            'categories_scopus': ['categories', 'scopus'],
            'categories_scienceMetrix': ['categories', 'scienceMetrix'],
            'host_volume': ['host', 'volume'],
            'host_issue': ['host', 'issue'],
            'host_publisher': ['host', 'publisher'],
            'host_pages_first': ['host', 'pages', 'first'],
            'host_pages_last': ['host', 'pages', 'last'],
            'host_title': ['host', 'title'],
            'refBibs_count': ['refBibs'],
        }

        # Initialiser columns_dict avec listes vides
        for field in fields_to_extract:
            columns_dict[field] = []

        import contextlib
        if os.path.isdir(name):
            with contextlib.redirect_stdout(None):
                files_in_dir = os.listdir(name)
                txt_files = [f for f in files_in_dir if f.endswith('.txt')]
                json_files = [f for f in files_in_dir if f.endswith('.json')]

                # On cherche les basenames communs
                txt_basenames = set(os.path.splitext(f)[0] for f in txt_files)
                json_basenames = set(os.path.splitext(f)[0] for f in json_files)
                common_basenames = txt_basenames.intersection(json_basenames)

                if not common_basenames:
                    print(f"Le répertoire '{name}' est ignoré (pas de fichiers .txt et .json correspondants).")
                else:
                    for basename in common_basenames:
                        txt_file_path = os.path.join(name, basename + '.txt')
                        try:
                            with open(txt_file_path, 'r', encoding='utf-8') as f:
                                txt_content = f.read()
                        except Exception as e:
                            print(f"Erreur lors de la lecture du fichier texte '{txt_file_path}': {e}")
                            continue

                        # Filtrage de longueur
                        if (minimum_caracters_nb_by_document <= len(txt_content) <= maximum_caracters_nb_by_document):
                            txt_content = remove_urls_hashtags_emojis_mentions_emails(txt_content)
                            txt_content = transform_text(txt_content)
                            documents.append(txt_content)

                            # Lecture du JSON
                            json_file_path = os.path.join(name, basename + '.json')
                            try:
                                with open(json_file_path, 'r', encoding='utf-8') as f:
                                    json_data = json.load(f)
                            except Exception as e:
                                print(f"Erreur lors de la lecture du fichier JSON '{json_file_path}': {e}")
                                # On met des None pour chaque champ
                                for field in fields_to_extract:
                                    columns_dict[field].append(None)
                                continue

                            # Extraction champs
                            for field in fields_to_extract:
                                json_keys = field_mappings.get(field)
                                value = None
                                if json_keys is not None:
                                    if field == 'refBibs_count':
                                        # Nombre de références bibliographiques
                                        refbibs = get_nested(json_data, json_keys)
                                        value = len(refbibs) if refbibs is not None else 0
                                    else:
                                        value = get_nested(json_data, json_keys)
                                        # Si c'est une liste, on joint par virgule
                                        if isinstance(value, list):
                                            value = ', '.join(map(str, value))
                                columns_dict[field].append(value)
                        else:
                            # Document trop court ou trop long
                            continue

                    # Vérification des longueurs
                    length_documents = len(documents)
                    for field in columns_dict:
                        assert len(columns_dict[field]) == length_documents, \
                            f"Incohérence pour le champ '{field}'"
            # On modifie la date si besoin
            columns_dict["date"] = ["01/01/" + date for date in columns_dict["date"]]
        else:
            print(f"Le chemin '{name}' n'est pas un répertoire valide.")

    # -------------------------------------------------------------------------
    return documents, all_soups, columns_dict

# =============================================================================
# Fonction pour rassembler les documents de manière simplifiée
# =============================================================================
def meta_load_documents():
    """
    Identifie tous les fichiers/dossiers sources en fonction de 'source_type',
    puis appelle directement load_documents sur chacun d'entre eux.
    Fusionne le tout dans les variables globales :
      - documents
      - all_soups
      - columns_dict
    Gère la suppression de doublons si go_remove_duplicates == True.
    """
    global documents
    global all_soups
    global columns_dict

    # -------------------------------------------------------------------------
    # Construction de la liste des fichiers à traiter
    # -------------------------------------------------------------------------
    if source_type == 'europresse':
        # Tous les .html qui contiennent base_name
        fichiers_html = [
            f for f in os.listdir(f"{folder_path}DATA/")
            if f.lower().endswith('.html')
            and os.path.isfile(os.path.join(f"{folder_path}DATA/", f))
            and base_name in f
        ]

    elif source_type == 'csv':
        # Tous les .csv qui contiennent base_name
        fichiers_html = [
            f for f in os.listdir(f"{folder_path}DATA/")
            if f.lower().endswith('.csv')
            and os.path.isfile(os.path.join(f"{folder_path}DATA/", f))
            and base_name in f
        ]

    elif source_type == 'istex':
        # Vérifier si 'DATA' est déjà dans folder_path
        if folder_path.endswith("DATA") or folder_path.endswith("DATA/"):
            data_folder_path = folder_path
        else:
            data_folder_path = os.path.join(folder_path, "DATA")

        # On cherche un sous-dossier dont le nom contient base_name
        sous_dossier_principal = None
        for f in os.listdir(data_folder_path):
            if os.path.isdir(os.path.join(data_folder_path, f)) and base_name in f:
                sous_dossier_principal = f
                break

        if sous_dossier_principal:
            # Pour ISTEX, on récupère la liste des sous-sous-dossiers
            fichiers_html = [
                os.path.join(sous_dossier_principal, sub_f)
                for sub_f in os.listdir(os.path.join(data_folder_path, sous_dossier_principal))
                if os.path.isdir(os.path.join(data_folder_path, sous_dossier_principal, sub_f))
            ]
        else:
            fichiers_html = []
    else:
        fichiers_html = []

    # -------------------------------------------------------------------------
    # Barre de progression
    # -------------------------------------------------------------------------
    pbar = tqdm(
        total=len(fichiers_html),
        desc='DOCUMENTS PROCESSÉS'
    )

    # -------------------------------------------------------------------------
    # Récupération des documents et fusion des informations
    # -------------------------------------------------------------------------
    all_columns_dicts = []
    for f in fichiers_html:
        # Construction du chemin complet
        full_path = os.path.join(folder_path, 'DATA', f) \
            if source_type in ['europresse', 'csv'] else os.path.join(folder_path, 'DATA', f)

        # On appelle directement load_documents
        d, s, cd = load_documents(full_path, source_type, minimum_caracters_nb_by_document, pbar)
        documents.extend(d)
        all_soups.extend(s)
        all_columns_dicts.append(cd)

        # Mise à jour de la barre de progression
        pbar.update(1)

    # -------------------------------------------------------------------------
    # Fusionner les dictionnaires de colonnes
    # -------------------------------------------------------------------------
    for dico in all_columns_dicts:
        for cle, valeur in dico.items():
            if cle not in columns_dict:
                columns_dict[cle] = []
            columns_dict[cle].extend(valeur)

    # -------------------------------------------------------------------------
    # Suppression des doublons si demandé
    # -------------------------------------------------------------------------
    if go_remove_duplicates:
        remove_duplicates_lsh()

    # -------------------------------------------------------------------------
    # Affichage final
    # -------------------------------------------------------------------------
    print('\n')
    print(len(documents), 'documents')

In [None]:
def update_candidates_for_unigram(kind, unigrams):
    """
    Met à jour le dictionnaire `unigrams` en affectant la valeur 1
    aux tokens pour lesquels le POS majoritaire (mode) est `kind`
    et qui ont au moins 3 caractères.
    """

    # 1) Prépare la liste de travail, avec un éventuel unidecode pour les PROPN
    if kind == 'PROPN':
        all_tab_pos_for_work = copy.deepcopy(all_tab_pos)
        for sentence in all_tab_pos_for_work:
            for token_info in sentence:
                token_info[0] = unidecode.unidecode(token_info[0])
    else:
        all_tab_pos_for_work = all_tab_pos

    # 2) Un dictionnaire token -> liste des POS rencontrés
    token_pos_map = defaultdict(list)

    # 3) Un ensemble pour repérer vite lesquels ont déjà eu 'kind' et >= 3 caractères
    candidates = set()

    # 4) Parcours unique de all_tab_pos_for_work
    for sentence in all_tab_pos_for_work:
        for token, pos in sentence:
            token_pos_map[token].append(pos)
            # Si ce token a le POS recherché et au moins 3 lettres, on le "mark" comme candidat
            if pos == kind and len(token) >= 3:
                candidates.add(token)

    # 5) Calcule le POS majoritaire pour chaque candidat et met à jour unigrams
    for token in candidates:
        pos_list = token_pos_map[token]
        mode_pos, _ = Counter(pos_list).most_common(1)[0]
        if mode_pos == kind:
            unigrams[token] = 1

    return unigrams

In [None]:
def remove_duplicates_lsh(threshold=0.8, num_perm=256):
    """
    Détecte et supprime les quasi-doublons dans la variable globale `documents`
    en utilisant un MinHash LSH (Locality-Sensitive Hashing).

    Paramètres:
    -----------
    - threshold : float
        Seuil de similarité Jaccard en-deçà duquel on ne considère pas les documents comme doublons.
        (ex: 0.8 = 80% de similarité)
    - num_perm : int
        Nombre de permutations utilisées pour le MinHash. Plus ce nombre est grand,
        plus la précision est élevée, mais le coût de calcul augmente.

    Effets:
    -------
    - Modifie la liste globale `documents` en supprimant les quasi-doublons.
    - Met à jour `columns_dict` et `all_soups` (si `source_type == 'europresse'`)
      pour rester cohérents avec les documents restants.
    """
    global documents, columns_dict, all_soups

    # 1) Construire l'index LSH
    # -------------------------------------------------------------------------
    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)

    # Pour stocker les MinHash de chaque document
    doc_minhashes = []

    # On ne garde que les 100 premiers tokens (comme dans votre code initial)
    # puis on crée la signature MinHash
    for i, doc in enumerate(documents):
        tokens_100 = doc.split()[:100]  # tronque à 100 tokens

        # Construire un MinHash pour ce document
        m = MinHash(num_perm=num_perm)
        for token in tokens_100:
            # Pour éviter les collisions d'encodage, on encode en UTF-8
            m.update(token.encode('utf-8'))
        doc_minhashes.append(m)

        # On insère dans la structure LSH en associant l'ID du doc
        lsh.insert(str(i), m)

    # 2) Détecter les doublons via l'interrogation LSH
    # -------------------------------------------------------------------------
    # On va construire un ensemble d'indices à supprimer
    to_remove = set()

    # On parcourt chaque document dans l'ordre :
    # si un document n'est pas déjà marqué pour suppression,
    # on récupère tous ses quasi-doublons et on les marque pour suppression.
    for i in range(len(documents)):
        if i in to_remove:
            continue  # déjà marqué, on passe

        # Récupérer les documents similaires dans l'index
        candidates = lsh.query(doc_minhashes[i])  # renvoie la liste des "keys" insérées

        for c in candidates:
            c_idx = int(c)
            if c_idx != i:
                # c_idx est jugé quasi-doublon de i
                to_remove.add(c_idx)

    # 3) Supprimer les doublons en ordre décroissant d'indice
    # -------------------------------------------------------------------------
    # (pour ne pas invalider les indices suivants lors du 'del')
    indices_to_remove = sorted(to_remove, reverse=True)

    for idx in indices_to_remove:
        del documents[idx]
        for key in columns_dict:
            del columns_dict[key][idx]
        if source_type == 'europresse':
            del all_soups[idx]

    # 4) Éventuel affichage / log
    # -------------------------------------------------------------------------
    print(f"[LSH] {len(indices_to_remove)} quasi-doublons supprimés parmi {len(doc_minhashes)} documents initiaux.")

In [None]:
def write_topics_unigrams():
    for num_topic in all_nmf_H:
        write_unigrams_results(100,
                        tfidf_feature_names,
                        all_nmf_H[num_topic])

In [None]:
def write_documents_infos():
    # On va stocker nos données non plus sous forme de lignes strings,
    # mais en listes de valeurs. Le csv.writer se chargera d'assembler
    # correctement le tout.

    for num_topic in tqdm(all_nmf_W,
                          desc="ÉCRITURE DES FICHIERS SUR LE DISQUE"):

        # Préparation des données
        rows = []

        # Création de l'entête
        config_key = num_topic  # ou bien len(scores) + 1, selon votre logique

        # Exemple : pour Europresse
        if source_type == 'europresse':
            # On définit explicitement l'ordre des colonnes
            header = [
                'title',
                'authors',
                'raw_authors',
                'nb_characters',
                'journal',
                'date',
                'main_topic'
            ]

            # On ajoute les colonnes score_? où ? est le label associé
            for i in range(num_topic):
                if config_key in topic_labels_by_config and i < len(topic_labels_by_config[config_key]):
                    label = topic_labels_by_config[config_key][i]
                    # (Optionnel) Nettoyer/transformer le label pour éviter caractères spéciaux
                    # Par exemple :
                    # label_sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', label)
                    # header.append(f"score_{label_sanitized}")
                    # Ou simplement :
                    header.append(f"score_{label}")
                else:
                    # Si jamais la clé ou l’indice n’existe pas, on met un fallback
                    header.append(f"score_Unknown_{i}")

            rows.append(header)

        elif source_type in ['csv', 'istex']:
            # On utilise columns_dict pour construire l'entête
            header = list(columns_dict.keys())
            header.extend(['nb_characters', 'main_topic'])

            for i in range(num_topic):
                if config_key in topic_labels_by_config and i < len(topic_labels_by_config[config_key]):
                    label = topic_labels_by_config[config_key][i]
                    header.append(f"score_{label}")
                else:
                    header.append(f"score_Unknown_{i}")

            rows.append(header)

        # Remplissage des données
        if source_type == 'europresse':
            for i in range(len(all_soups)):
                if i < len(all_nmf_W[num_topic]):
                    row_data = write_info_europresse(
                        all_nmf_W[num_topic][i],
                        all_soups[i],
                        documents[i]
                    )
                    # Assurez-vous que write_info_europresse renvoie une liste et non une string
                    rows.append(row_data)

        elif source_type in ['csv', 'istex']:
            for i in range(len(columns_dict['date'])):
                row_data = write_info_another(
                    all_nmf_W[num_topic][i],
                    columns_dict,
                    i,
                    documents[i]
                )
                # Même remarque : write_info_another doit renvoyer une liste
                rows.append(row_data)

        class_suffix = "_".join(grammatical_classes)

        # Écriture du CSV en utilisant le csv.writer
        csv_path = (
            f"{results_path}{base_name}_EXPLORE_TOPICS/"
            f"{base_name}_database_{num_topic}tc_{class_suffix}_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_{web_paper_differentiation}wp.csv"
        )

        with open(csv_path, "w", encoding='utf-8', newline='') as file_object:
            writer = csv.writer(file_object, delimiter=';', quoting=csv.QUOTE_MINIMAL)
            for row in rows:
                writer.writerow(row)

In [None]:
def process_sentiments():
    sentiments = []
    dates = []
    transformed_sentiments = []

    model_name = 'nlptown/bert-base-multilingual-uncased-sentiment' # Modèle spécifique pour l'analyse de sentiments
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)

    if torch.backends.mps.is_available():
        device = "mps"
    elif torch.cuda.is_available():
        device = 0  # ou torch.device("cuda:0")
    else:
        device = -1

    print('device', device)

    sentiment_pipeline = pipeline('sentiment-analysis',
                                  model=model,
                                  tokenizer=tokenizer,
                                  truncation=True,
                                  max_length=512,
                                  device=device)

    # Imaginons que 'documents' est votre tableau de textes
    sentiments = [analyze_sentiment(doc, sentiment_pipeline) for doc in tqdm(documents, desc="Processing Documents")]

    if source_type == 'europresse':
        for soup in all_soups:
            header = soup
            date_text = extract_information(header, '.DocHeader')
            date_text_clean = extract_date_info(date_text)
            date_normalized = normalise_date(date_text_clean).replace(';', '').replace('&', '')
            dates.append(date_normalized)
    else:
        dates = formater_liste_dates(columns_dict['date'])

    # Filtrer et convertir les dates
    new_dates = []
    for date_str in dates:
        date = extract_and_convert_date(date_str)
        new_dates.append(date)

    dates = new_dates

    # Transformer les sentiments en scores basés sur les étoiles
    for doc_sentiments in sentiments:
        if doc_sentiments:  # Assurez-vous qu'il n'est pas None ou vide
            doc_scores = []
            # doc_sentiments est maintenant une liste de dictionnaires
            for sentiment_dict in doc_sentiments:
                label = sentiment_dict['label']         # ex: '4 stars'
                star_rating = int(label.split()[0])     # ex: 4
                doc_scores.append(star_rating)

            average_score = sum(doc_scores) / len(doc_scores)
            transformed_sentiments.append(average_score)
        else:
            transformed_sentiments.append(None)



    # Ici, on modifie la fonction sentiments_heatmaps (ou son appel)
    # pour qu'elle considère les arrays comme base. Par exemple :
    for relative in [False, True]:
        sentiment_by_date_and_topic = sentiments_heatmaps(
            relative_normalizaton=relative,
            sigma='auto',
            transformed_sentiments=transformed_sentiments,
            dates=dates
        )

In [None]:
# ===================================================================
# 2) nouvelle : calc_positions_for_continuous_spacing
# ===================================================================
def calc_positions_for_continuous_spacing(xmin, xmax, text_width, spacing_factor):
    """
    Calcule (sans dessiner) les positions X (en data coords)
    où l’on placerait chaque label, en avançant de 'text_width * spacing_factor'
    tant que le bord droit du label (current_x + text_width)
    ne dépasse pas xmax (avec une petite tolérance).
    """
    positions = []
    current_x = xmin + text_width / 2

    tolerance_max = xmax + (text_width / 2)
    while True:
        right_edge = current_x + text_width
        if right_edge > tolerance_max:
            break
        positions.append(current_x)
        current_x += text_width * spacing_factor

    return positions


# ===================================================================
# 3) nouvelle : find_optimal_continuous_spacing
# ===================================================================
def find_optimal_continuous_spacing(xmin, xmax, text_width,
                                    spacing_factor_min=1.02,
                                    spacing_factor_max=1.2,
                                    step=0.001):

    best_sf = spacing_factor_min
    best_diff = float('inf')
    best_positions = []

    tolerance_max = xmax + (text_width / 2)
    spacing_values = np.arange(spacing_factor_min, spacing_factor_max + step, step)

    for sf in spacing_values:
        positions = calc_positions_for_continuous_spacing(
            xmin=xmin,
            xmax=xmax,
            text_width=text_width,
            spacing_factor=sf
        )

        if not positions:
            diff = 9999
        else:
            last_x = positions[-1]
            right_edge = last_x + text_width
            diff = abs(tolerance_max - right_edge)

        if diff < best_diff:
            best_diff = diff
            best_sf = sf
            best_positions = positions
            if diff < 1e-9:
                break

    return best_sf, best_positions


# ===================================================================
# 4) nouvelle : manual_tick_placement_continuous
# ===================================================================
def manual_tick_placement_continuous(
    ax,
    xmin,
    xmax,
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001
):
    """
    Place manuellement des pseudo-ticks pour l'axe X dans [xmin..xmax].
    On coupe les ticks officiels et on dessine les labels via ax.text.
    """
    text_width = compute_text_width_in_data_coords(ax)

    # 1) Désactiver les ticks "officiels"
    ax.set_xticks([])
    ax.set_xlim(xmin, xmax)

    # 2) Recherche du spacing_factor optimal
    best_sf, _ = find_optimal_continuous_spacing(
        xmin=xmin,
        xmax=xmax,
        text_width=text_width,
        spacing_factor_min=spacing_factor_min,
        spacing_factor_max=spacing_factor_max,
        step=step
    )
    # 3) Placement effectif
    positions = calc_positions_for_continuous_spacing(
        xmin=xmin,
        xmax=xmax,
        text_width=text_width,
        spacing_factor=best_sf
    )

    # 4) Transform "Axes" + offset en points
    offset_axes_transform = mtransforms.offset_copy(
        ax.transAxes,
        fig=ax.figure,
        x=0,
        y=-1.0,
        units='points'
    )

    x_min, x_max = ax.get_xlim()
    # 5) Dessin de chaque label
    for x_val in positions:
        x_val_axes = (x_val - x_min) / (x_max - x_min)
        label_str = f"{x_val:.3f}"  # arrondi; adapter si besoin

        ax.text(
            x_val_axes,
            0.0,   # tout en bas du subplot
            label_str,
            rotation=90,
            rotation_mode='anchor',
            ha='right',
            va='center',
            transform=offset_axes_transform,
            bbox=dict(facecolor='white', edgecolor='none', boxstyle='round,pad=0.0')
        )

    return best_sf

In [None]:
# Vérification de la Multicollinéarité
# Calcul du VIF (Facteur d'Inflation de la Variance)
def calculate_vif(df):
    vif_data = pd.DataFrame()
    vif_data["feature"] = df.columns
    vif_data["VIF"] = [variance_inflation_factor(df.values, i) for i in range(len(df.columns))]

    return vif_data

In [None]:
def analyze_sentiment(text, sentiment_pipeline):
    try:
        # Analyse de sentiments directement sur le texte complet,
        # en demandant explicitement la troncation à 512 tokens
        result = sentiment_pipeline(text, truncation=True, max_length=512)
        return result
    except Exception as e:
        print(f"Error in sentiment analysis: {e}")
        return None

In [None]:
def compute_text_width_in_data_coords(ax):
    """
    Mesure la largeur (en 'data coords') d'un texte donné,
    via un placement temporaire invisible pour récupérer la bounding box en pixels.
    """
    temp_text = ax.text(
        0,
        0,
        '0123456789',
        rotation=90,
        rotation_mode='anchor',
        ha='right',
        va='center',
        alpha=0,
        transform=ax.transAxes  # On place ceci en Axes coords (peu importe où)
    )
    ax.figure.canvas.draw()
    renderer = ax.figure.canvas.get_renderer()
    bbox = temp_text.get_window_extent(renderer=renderer)
    pixel_width = bbox.width

    # Convertir la largeur (en pixels) -> (en data coords sur l'axe X) :
    x0_data, _ = ax.transData.transform((0, 0))
    x1_data, _ = ax.transData.transform((1, 0))
    one_unit_in_pixels = x1_data - x0_data
    data_width = pixel_width / one_unit_in_pixels

    temp_text.remove()
    return data_width


def compute_label_positions_for_spacing(ncols, text_width, spacing_factor, with_colormap):
    """
    Calcule (sans dessiner) les positions X (en data coords)
    où l’on placerait chaque label, sachant que TOUS les labels
    ont la même 'text_width'.

    Paramètres
    ----------
    ncols         : int
        Nombre total de labels (i.e. nombre de colonnes).
    text_width    : float
        Largeur fixe (en data coords) pour chaque label.
    spacing_factor: float
        Facteur d'espacement entre deux labels successifs.

    Retourne
    --------
    list of (x, text_width)
        La liste des positions (x, text_width) en data coords.
    """
    positions = []
    current_x = 0.0

    while True:
        col_index = int(np.floor(current_x))
        if col_index >= ncols:
            # On a dépassé le nombre de colonnes, on s’arrête
            break

        # Bord droit (en data coords) si on place le label à "current_x"
        right_edge = current_x + text_width

        if with_colormap:
            tolerance_max = (ncols - 1) + 0.17*text_width
        else:
            tolerance_max = (ncols - 1) + 0.001*text_width

        if right_edge <= tolerance_max:
            # On enregistre la position (pour info)
            positions.append((current_x, text_width))
            # On décale "current_x" pour le prochain label
            current_x += text_width * spacing_factor
        else:
            # Si on dépasse trop, on arrête
            break

    return positions


def find_best_spacing_factor(
    ncols,
    text_width,
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001,
    with_colormap=True
):
    """
    Cherche le spacing_factor qui permet d'occuper au mieux la place disponible
    sans trop déborder la tolérance max = (ncols - 1) pour le dernier label.

    Paramètres
    ----------
    ncols : int
        Nombre total de labels.
    text_width : float
        Largeur fixe (en data coords) à utiliser pour tous les labels.
    spacing_factor_min : float
        Borne inférieure pour la recherche du spacing factor.
    spacing_factor_max : float
        Borne supérieure pour la recherche du spacing factor.
    step : float
        Pas d'incrémentation pour la recherche brute force.

    Renvoie
    -------
    (best_sf, positions):
        best_sf : float
            Le spacing factor optimal.
        positions : list of (x, text_width)
            Liste des positions correspondant à best_sf.
    """
    best_sf = spacing_factor_min
    best_diff = float('inf')
    best_positions = []

    spacing_values = np.arange(spacing_factor_min, spacing_factor_max + step, step)

    for sf in spacing_values:
        positions = compute_label_positions_for_spacing(
            ncols=ncols,
            text_width=text_width,
            spacing_factor=sf,
            with_colormap=with_colormap
        )

        if not positions:
            # Aucune position => on fixe un diff arbitraire
            diff = 9999
        else:
            # On regarde la position (et la largeur) du dernier label
            last_x, last_w = positions[-1]
            right_edge = last_x + last_w
            # Tolerance max
            if with_colormap:
                tolerance_max = (ncols - 1) + 0.17*text_width
            else:
                tolerance_max = (ncols - 1) + 0.001*text_width

            # On calcule la différence
            diff = abs(tolerance_max - right_edge)

        # Mise à jour du meilleur spacing factor
        if diff < best_diff:
            best_diff = diff
            best_sf = sf
            best_positions = positions
            # Si diff est vraiment très faible, on peut s'arrêter (optionnel)
            if diff < 1e-9:
                break

    return best_sf, best_positions


def manual_tick_placement(
    ax,
    df,
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001,
    with_colormap=True
):
    """
    Place manuellement des "pseudo-ticks" et leurs labels,
    SANS utiliser ax.set_xticks / ax.set_xticklabels / ax.tick_params.

    Hypothèse simplifiée :
    - on considère que TOUS les labels ont la même largeur
      (calculée sur une date aléatoire par exemple).

    Étapes :
    1) On désactive l'axe officiel.
    2) On crée un transform mixte (X en data, Y en Axes).
    3) On prend une date aléatoire dans df.columns -> on calcule la largeur du texte.
    4) On cherche le spacing_factor optimal (find_best_spacing_factor).
    5) On calcule les positions finales et on dessine chaque label manuellement.
    """
    # 1) Désactiver l'axe "officiel"
    ax.set_xticks([])
    ax.set_xlim(0, len(df.columns) - 1)



    ncols = len(df.columns)
    if ncols == 0:
        return  # Rien à faire si df n'a pas de colonnes

    # 3) Prendre une date "au hasard" (ou la première), calculer sa largeur
    random_col = df.columns[0]   # ou n’importe quel index
    text_random = random_col.strftime("%Y-%m-%d")
    text_width = compute_text_width_in_data_coords(ax)

    # 4) Trouver le spacing_factor optimal
    best_sf, positions_preview = find_best_spacing_factor(
        ncols=ncols,
        text_width=text_width,
        spacing_factor_min=spacing_factor_min,
        spacing_factor_max=spacing_factor_max,
        step=step,
        with_colormap=with_colormap
    )

    # 5) Placement effectif
    positions = compute_label_positions_for_spacing(
        ncols=ncols,
        text_width=text_width,
        spacing_factor=best_sf,
        with_colormap=with_colormap
    )

    # On récupère les limites X de l'axe
    x_min, x_max = ax.get_xlim()

    # 1) Construire une transform "Axes" + offset en points
    offset_axes_transform = mtransforms.offset_copy(
        ax.transAxes,            # on part du repère Axes (0..1)
        fig=ax.figure,
        x=0,
        y=-0.7,
        units='points'
    )

    # 2) Boucle d'affichage
    for (x_val, _) in positions:
        col_index = int(np.floor(x_val))
        if col_index < ncols:
            label_str = df.columns[col_index].strftime("%Y-%m-%d")

            # Convertir x_val (data) -> x_val_axes (0..1)
            x_val_axes = (x_val - x_min) / (x_max - x_min)

            # On place le texte en Axes coords
            ax.text(
                x_val_axes,   # X en [0..1]
                0.0,          # Y=0 en Axes coords (bas de l'axe)
                label_str,
                rotation=90,
                rotation_mode='anchor',
                ha='right',
                va='top',     # ancré "en haut" pour que le -2 pts décale vers le bas
                transform=offset_axes_transform
            )



    return best_sf

In [None]:
def measure_text_height_axes(ax, label="999.99"):
    """
    Renvoie :
      - text_height_axes : la hauteur TOTALE (bottom -> top) en coords AXES
      - offset_axes      : la distance (baseline - bottom) de la bbox en coords AXES
    """
    # Placement (invisible) d'un texte aligné baseline, à la position (0,0) en AXES
    text_baseline = ax.text(
        0,
        0,
        label,
        va='baseline',
        ha='left',
        alpha=0,  # invisible
        transform=ax.transAxes  # <-- On le place en Axes !
    )

    # On force un rendu pour obtenir la bbox en coords pixels
    ax.figure.canvas.draw()
    renderer = ax.figure.canvas.get_renderer()
    bbox = text_baseline.get_window_extent(renderer=renderer)
    text_baseline.remove()

    # Coordonnées (x, y) en PIXELS du point (0, 0) Axes
    anchor_pixel = ax.transAxes.transform((0, 0))

    # Hauteur de la bbox en pixels
    text_height_pixel = bbox.height

    # Décalage (baseline -> bottom) en pixels
    offset_pixel = anchor_pixel[1] - bbox.y0

    # 1 “unité Axes” = combien de pixels ?
    y0_pix = ax.transAxes.transform((0, 0))[1]
    y1_pix = ax.transAxes.transform((0, 1))[1]
    one_axes_unit_in_pixels = abs(y1_pix - y0_pix)

    # Conversion des pixels -> coords Axes
    text_height_axes = text_height_pixel / one_axes_unit_in_pixels
    offset_axes      = offset_pixel      / one_axes_unit_in_pixels

    return text_height_axes, offset_axes




def data_to_axes(y_data, data_min, data_max):
    """Convertit y_data (dans [data_min, data_max]) en [0,1]."""
    return (y_data - data_min) / (data_max - data_min)



def axes_to_data(y_axes, data_min, data_max):
    """Convertit y_axes (dans [0,1]) en [data_min, data_max]."""
    return y_axes*(data_max - data_min) + data_min


def compute_label_positions_axes(
    text_height_axes,
    offset_axes,
    spacing_factor,
    vmin_axes=0.0,
    vmax_axes=1.0
):
    """
    Calcule les positions en Axes-coords pour placer les labels
    de vmin_axes à vmax_axes (en général [0,1]).

    On part baseline = vmin_axes,
    et on incrémente de (text_height_axes * spacing_factor).
    """
    positions = []
    current_baseline = vmin_axes + 0.0016

    while True:
        top_of_bbox = current_baseline + (text_height_axes - offset_axes)
        if top_of_bbox > vmax_axes:
            break

        # On enregistre la position en Axes-coords
        positions.append(current_baseline)

        current_baseline += (text_height_axes * spacing_factor)
        if current_baseline > vmax_axes:
            break

    return positions




def get_final_top_baseline_axes(baseline, text_height_axes, offset_axes):
    """
    baseline = la baseline du texte (axes-coords)
    Retourne la coord "top" de la bbox du dernier label.
    """
    return baseline + (text_height_axes - offset_axes)

def find_best_spacing_factor_axes(
    ax,
    text_height_axes,
    offset_axes,
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001,
    vmin_axes=0.0,
    vmax_axes=1.0
):
    best_sf = spacing_factor_min
    best_diff = float('inf')
    spacing_factors = np.arange(spacing_factor_min, spacing_factor_max + step, step)

    for sf in tqdm(spacing_factors):
        positions = compute_label_positions_axes(
            text_height_axes, offset_axes, sf,
            vmin_axes, vmax_axes
        )
        if positions:
            last_baseline = positions[-1]
            last_top = get_final_top_baseline_axes(last_baseline, text_height_axes, offset_axes)
            diff = abs(last_top - vmax_axes)
        else:
            # Si on ne trouve aucun label => diff = 1.0
            diff = 1.0

        if diff < best_diff:
            best_diff = diff
            best_sf = sf
            if diff == 0:
                break

    return best_sf




def manual_colorbar_ticks(
    fig,
    ax,
    data_min,      # 2.60 par exemple
    data_max,      # 3.46 par exemple
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001
):
    """
    1) Mesure la hauteur du texte en Axes-coords
    2) Trouve un spacing_factor optimal pour remplir [0,1] verticalement
    3) Calcule toutes les baselines en [0,1]
    4) Affiche des labels correspondant à la "vraie" valeur data
       sur la baseline Axes correspondante
    """
    # 1) Hauteur du texte
    text_height_axes, offset_axes = measure_text_height_axes(ax, label="999.99")

    # 2) Spacing factor optimal
    sf_opt = find_best_spacing_factor_axes(
        ax,
        text_height_axes,
        offset_axes,
        spacing_factor_min,
        spacing_factor_max,
        step,
        vmin_axes=0.0,
        vmax_axes=1.0
    )

    # 3) Calcul positions
    positions_axes = compute_label_positions_axes(
        text_height_axes,
        offset_axes,
        sf_opt,
        vmin_axes=0.0,
        vmax_axes=1.0
    )

    # Créez un offset de 10 points vers la droite et 0 points vers le haut
 #   offset = transforms.ScaledTranslation(0.7/72, 0, fig.dpi_scale_trans)

    offset_axes_transform = mtransforms.offset_copy(
        ax.transAxes,
        fig=ax.figure,
        x=1.5,
        y=0,
        units='points'
    )
    # 10/72 car 1 point = 1/72 inch

    # Combinez ax.transAxes avec cet offset
  #  trans = ax.transAxes + offset_axes_transform

    # 4) Dessin
    for baseline_axes in positions_axes:
        # Convertir la baseline axes -> data
        val_data = axes_to_data(baseline_axes, data_min, data_max)
        label_str = f"{val_data:.2f}"

        ax.text(
            1,
            baseline_axes,
            label_str,
            va='baseline',
            ha='left',
            transform=offset_axes_transform
        )

In [None]:
def create_custom_colorbar(fig,
                           df_normalized,
                           cmap=plt.cm.coolwarm,
                           colorbar_position=[1.0, 0.0, 0.02, 0.85],
                           step=0.0005,
                           spacing_factor_min=1.02,
                           spacing_factor_max=1.2):

    # Déterminer les valeurs min et max pour la normalisation
    vmin = df_normalized.min().min()
    vmax = df_normalized.max().max()

    # Créer la normalisation et l'objet ScalarMappable
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])  # Nécessaire pour l'affectation de la colorbar

    # Ajouter un axe pour la colorbar (à droite, ici)
    cbar_ax = fig.add_axes(colorbar_position)

    # Créer la colorbar
    cbar = fig.colorbar(sm,
                        cax=cbar_ax,
                        orientation='vertical',
                        spacing='proportional',
                        extend='neither',
                        fraction=1.0,
                        pad=0.0)

    # Personnaliser l'apparence de la colorbar
    cbar.outline.set_visible(False)  # Supprime la bordure
    cbar.set_ticks([])               # Supprime les graduations par défaut

    # Appeler la fonction manuelle pour définir les graduations
    manual_colorbar_ticks(
        fig,
        cbar_ax,
        vmin,
        vmax,
        step=step,
        spacing_factor_min=spacing_factor_min,
        spacing_factor_max=spacing_factor_max
    )

In [None]:
def remove_outliers_by_mean(df, threshold=100):
    """
    Pour chaque (Group, Date), remplace par 0
    toute valeur > threshold * moyenne_des_autres (dans la même ligne).

    Paramètres
    ----------
    df : pd.DataFrame
        Le DataFrame dont les lignes sont des Group et les colonnes des dates.
    threshold : float
        Facteur multiplicatif pour détecter les outliers (défaut = 100).

    Retour
    ------
    df_out : pd.DataFrame
        Copie de df avec les outliers remplacés par 0.
    outliers_df : pd.DataFrame
        Tableau listant les outliers détectés (Group, Date, Value).
    """
    # Copie pour ne pas modifier l'original
    df_out = df.copy()

    # Liste pour stocker les outliers détectés
    outliers = []

    # Parcours de chaque ligne (group)
    for group, row in df_out.iterrows():
        # Parcours des colonnes (dates)
        for date in df_out.columns:
            val = row[date]

            # Moyenne des autres colonnes de la ligne
            avg_ignore = row.drop(labels=date).mean()

            # Test d'outlier : val > threshold * moyenne_des_autres
            if (avg_ignore > 0) and (val > threshold * avg_ignore):
                # On remplace par 0 dans le DataFrame
                df_out.at[group, date] = 0
                # On garde trace de l'outlier
                outliers.append((group, date, val))

    # Création d'un DataFrame pour les outliers détectés
    outliers_df = pd.DataFrame(outliers, columns=['Group', 'Date', 'Value'])

    return df_out, outliers_df

In [None]:
import numba
import numpy as np

@numba.njit
def cost_sq(x, y):
    return (x - y)**2

@numba.njit
def downsample(seq):
    """
    Réduit la série par un facteur de 10 en moyennant chaque groupe de 10 points.
    Exemple : [x0, x1, ..., x9, x10, x11, ..., x19, ...] -> [(x0+x1+...+x9)/10, (x10+x11+...+x19)/10, ...]
    """
    new_len = len(seq) // 10
    new_seq = np.empty(new_len, dtype=np.float64)
    for i in range(new_len):
        start = 10 * i
        new_seq[i] = np.mean(seq[start:start + 10])  # Moyenne des 10 points
    return new_seq


@numba.njit
def dtw_distance(seq1, seq2):
    """
    DTW simple renvoyant uniquement la distance, sans chemin.
    """
    n, m = len(seq1), len(seq2)
    dtw = np.full((n+1, m+1), np.inf)
    dtw[0, 0] = 0.0

    for i in range(1, n+1):
        for j in range(1, m+1):
            cost = cost_sq(seq1[i-1], seq2[j-1])
            dtw[i, j] = cost + min(
                dtw[i-1, j],    # "delete" (avancer dans seq1)
                dtw[i, j-1],    # "insert" (avancer dans seq2)
                dtw[i-1, j-1]   # "match"
            )
    return dtw[n, m]

@numba.njit
def _dtw_distance_and_path(seq1, seq2):
    """
    DTW standard *uniquement* pour le 'base case' (petites séries).
    Renvoie (path, distance). Le path est nécessaire pour construire
    la fenêtre en plus haute résolution. On ne l'exposera pas en public.
    """
    n, m = len(seq1), len(seq2)
    dtw = np.full((n+1, m+1), np.inf)
    direction = np.zeros((n+1, m+1), dtype=np.int8)

    dtw[0, 0] = 0.0
    for i in range(1, n+1):
        for j in range(1, m+1):
            cost = cost_sq(seq1[i-1], seq2[j-1])
            # On teste 3 possibilités
            del_val = dtw[i-1, j]
            ins_val = dtw[i, j-1]
            match_val = dtw[i-1, j-1]

            if del_val < ins_val and del_val < match_val:
                dtw[i, j] = cost + del_val
                direction[i, j] = 1
            elif ins_val < match_val:
                dtw[i, j] = cost + ins_val
                direction[i, j] = 2
            else:
                dtw[i, j] = cost + match_val
                direction[i, j] = 0

    # Rétropropagation du chemin (en partant de (n,m))
    path = []
    i, j = n, m
    while i > 0 and j > 0:
        path.append((i-1, j-1))
        move = direction[i, j]
        if move == 0:
            i -= 1
            j -= 1
        elif move == 1:
            i -= 1
        else:
            j -= 1

    path.reverse()
    return path, dtw[n, m]

@numba.njit
def expand_window(path_lowres, len_x, len_y, radius=1):
    """
    Projette le chemin en basse résolution (path_lowres)
    vers la taille (len_x, len_y), en créant un voisinage (window)
    autour de ce chemin. Retourne un set() de paires (i, j) autorisées.
    """
    window = set()
    if len(path_lowres) == 0:
        # Par sécurité : si path vide, autoriser toute la matrice
        for i in range(len_x):
            for j in range(len_y):
                window.add((i, j))
        return window

    # Indices max en basse résolution
    max_i_low = path_lowres[-1][0]
    max_j_low = path_lowres[-1][1]

    # Facteurs d'échelle pour passer de la résolution basse à la fine
    scale_x = (len_x - 1) / max(1, max_i_low)
    scale_y = (len_y - 1) / max(1, max_j_low)

    for (i_l, j_l) in path_lowres:
        i_center = int(i_l * scale_x)
        j_center = int(j_l * scale_y)
        # Autorise i_center +/- radius et j_center +/- radius
        for di in range(-radius, radius+1):
            for dj in range(-radius, radius+1):
                i_w = i_center + di
                j_w = j_center + dj
                if 0 <= i_w < len_x and 0 <= j_w < len_y:
                    window.add((i_w, j_w))

    return window

@numba.njit
def dtw_distance_constrained(seq1, seq2, window):
    """
    Calcule la distance DTW en ne considérant que les cellules (i,j)
    présentes dans 'window' (un set de paires).
    """
    n, m = len(seq1), len(seq2)
    dtw = np.full((n+1, m+1), np.inf)
    dtw[0, 0] = 0.0

    # On trie les (i, j) par i+j croissant pour respecter
    # les dépendances (i-1, j), (i, j-1), etc.
    window_list = list(window)
    window_list.sort(key=lambda x: x[0] + x[1])

    for (i, j) in window_list:
        i1, j1 = i+1, j+1
        cost = cost_sq(seq1[i], seq2[j])

        # On prend le min parmi (i-1, j), (i, j-1), (i-1, j-1)
        tmp = dtw[i1, j1]
        if i1 > 0 and dtw[i1-1, j1] < tmp:
            tmp = dtw[i1-1, j1]
        if j1 > 0 and dtw[i1, j1-1] < tmp:
            tmp = dtw[i1, j1-1]
        if i1 > 0 and j1 > 0 and dtw[i1-1, j1-1] < tmp:
            tmp = dtw[i1-1, j1-1]

        dtw[i1, j1] = cost + tmp

    return dtw[n, m]

@numba.njit
def _fastdtw_path_lowres(seq1, seq2, radius=1):
    """
    Fonction 'privée' qui calcule le chemin + distance en basse résolution,
    afin de construire la fenêtre. Elle *n'est pas* appelée en dernier,
    juste pour connaître la zone de recherche à l'échelle fine.
    """
    # -- Cas de base : si la série est très courte, on récupère path + distance --
    if len(seq1) <= radius + 2 or len(seq2) <= radius + 2:
        return _dtw_distance_and_path(seq1, seq2)

    # -- Sinon, downsample --
    shrunk1 = downsample(seq1)
    shrunk2 = downsample(seq2)

    # -- Récursif : on obtient un path_lowres à l'échelle encore plus réduite --
    path_lowres, _ = _fastdtw_path_lowres(shrunk1, shrunk2, radius)

    # -- On "projette" ce path_lowres à l'échelle courante pour re-calculer
    #    un chemin "un peu plus précis" en se limitant à une fenêtre contrainte --
    window = expand_window(path_lowres, len(seq1), len(seq2), radius)
    # Ici, pour obtenir un *path* local, on pourrait coder un "dtw_path_constrained",
    # mais pour aller au plus simple, on recycle _dtw_distance_and_path()
    # *sans* repasser par un full DTW sur toute la fenêtre.
    # Pour la démonstration, on va "tricher" en faisant un DTW complet
    # dans la fenêtre en mode distance SEULEMENT, et on NE RECALCULE PAS le path
    # très précis. On s’en contente pour renvoyer un path "approximatif" identique.

    # => Pour le *vrai* path, il faudrait un backtrack partiel dans la fenêtre.
    #    Ici, on renvoie juste le path_lowres + la distance contrainte "fine".
    dist_constrained = dtw_distance_constrained(seq1, seq2, window)
    return path_lowres, dist_constrained

@numba.njit
def fastdtw_distance(seq1, seq2, radius=1):
    """
    Fonction publique : renvoie uniquement la distance FastDTW
    entre seq1 et seq2, pour un 'radius' donné.
    """
    # -- Cas de base : si la série est courte --
    if len(seq1) <= radius + 2 or len(seq2) <= radius + 2:
        return dtw_distance(seq1, seq2)

    # -- On calcule un chemin approximatif en basse résolution --
    path_lowres, _ = _fastdtw_path_lowres(seq1, seq2, radius)

    # -- On construit la fenêtre autour de ce chemin --
    window = expand_window(path_lowres, len(seq1), len(seq2), radius)

    # -- On calcule la distance DTW dans la fenêtre --
    dist_final = dtw_distance_constrained(seq1, seq2, window)
    return dist_final

In [None]:
def plot_custom_heatmap(
    df,
    sigma='auto',
    cmap="coolwarm",
    relative_normalizaton=True,
    with_colormap=True
):
    df, outliers_table = remove_outliers_by_mean(df, threshold=100000)

    if len(outliers_table) > 1:
        print('Valeurs aberrantes trouvées')
        print(outliers_table)

    df = df.astype(float)

    # 1) Application du filtre gaussien, ligne par ligne
    list_of_series = []
    for index, row in df.iterrows():
        filtered_values = gaussian_filter(row, sigma=sigma)
        s = pd.Series(filtered_values, index=df.columns, name=index)
        list_of_series.append(s)

    df_normalized = pd.concat(list_of_series, axis=1).T

    if relative_normalizaton:
        list_of_series = []
        for index, row in df_normalized.iterrows():
            normalized_values = (row - row.min()) / (row.max() - row.min()) if (row.max() != row.min()) else row
            s = pd.Series(normalized_values, index=df_normalized.columns, name=index)
            list_of_series.append(s)

        df_normalized = pd.concat(list_of_series, axis=1).T

    if len(df_normalized) > 1:
       # if relative_normalizaton:
        # 1) On calcule, pour chaque série, la colonne où se situe son max
        max_positions = df_normalized.idxmax(axis=1)

        # 2) On trie les lignes (les séries) selon ces positions croissantes
        sorted_index = max_positions.sort_values().index

        # 3) On réordonne le DataFrame dans ce nouvel ordre
        df_normalized = df_normalized.loc[sorted_index]
   #     else:
            # 1. Calcul de la distance en euclidien avec pdist
     #       condensed_dist_matrix = pdist(df_normalized.values, metric='euclidean')

            # 2. Classification hiérarchique (méthode "average")
      #      Z = linkage(condensed_dist_matrix, method='average', optimal_ordering=True)

            # 3. Récupérer l'ordre des feuilles
       #     dendro = dendrogram(Z, no_plot=True)
        #    leaves_order = dendro['leaves']

            # 4. Réordonner le DataFrame
         #   df_normalized = df_normalized.iloc[leaves_order]



    # 8) Ajustement de la hauteur de figure
    figure_height_inch = (df_normalized.shape[0] * PX_PER_TOPIC) / DPI

    # 9) Création de la figure et tracé de la heatmap
    fig = plt.figure(figsize=(FIGURE_WIDTH_INCH, figure_height_inch), dpi=DPI)
    main_ax = fig.add_axes([0.3, 0.0, 0.697, 0.85])

    mask = pd.DataFrame(False, index=df_normalized.index, columns=df_normalized.columns)

    # On passe à True directement via .loc,
    # en prenant outliers_table["Group"] comme index de lignes
    # et outliers_table["Date"] comme index de colonnes
    mask.loc[outliers_table["Group"], outliers_table["Date"]] = True

    ax = sns.heatmap(
        df_normalized,
        cmap=cmap,
        ax=main_ax,
        cbar=False,
        rasterized=False,
        linewidths=0.0,
        linecolor="white",
        mask=mask
    )


    # 10) Tracé des lignes de séparation horizontales
    for i in range(1, df_normalized.shape[0]):
        ax.axhline(i, color="white", linewidth=1)

    # 11) Affichage manuel des labels (index) à gauche
    offset_axes_transform_2 = mtransforms.offset_copy(
        ax.transAxes,
        fig=ax.figure,
        x=-1.5,  # Décalage en points
        y=0,
        units='points'
    )

    for i, label in enumerate(df_normalized.index):
        # On calcule la position du haut vers le bas
        y_pos = (df_normalized.shape[0] - i - 0.5) / df_normalized.shape[0]
        ax.text(
            0,
            y_pos,
            label,
            ha='right',
            va='center',
            transform=offset_axes_transform_2
        )

    # 12) Supprimer les bordures & masquer les graduations Y
    for spine in ax.spines.values():
        spine.set_visible(False)

    ax.set_yticks([])

    # Placement manuel des ticks X (dates ou colonnes) si besoin
    manual_tick_placement(
        ax,
        df_normalized,
        spacing_factor_min=1.02,
        spacing_factor_max=1.2,
        step=0.0001,
        with_colormap=with_colormap
    )

    # 13) Colorbar facultative
    if with_colormap:
        create_custom_colorbar(
            fig,
            df_normalized=df_normalized,
            cmap=cmap,
            step=0.0001,
            spacing_factor_min=1.02,
            spacing_factor_max=1.2
        )

In [None]:
def sentiments_heatmaps(relative_normalizaton, sigma, transformed_sentiments, dates):
    # Calcul du poids total de chaque topic par jour pour chaque topic_count
    total_weight_by_topic_count_topic_and_date = {}
    for topic_count, W_matrix in all_nmf_W.items():
        for article_num, topic_scores in enumerate(W_matrix):
            # Vérification pour éviter les erreurs d'index
            if article_num >= len(dates):
                continue
            article_date = dates[article_num]

            # topic_scores est un array / liste de poids. Exemple: [0.2, 0.5, 0.1, ...]
            for topic_num, topic_weight in enumerate(topic_scores):
                key = (topic_count, topic_num, article_date)
                if key not in total_weight_by_topic_count_topic_and_date:
                    total_weight_by_topic_count_topic_and_date[key] = topic_weight
                else:
                    total_weight_by_topic_count_topic_and_date[key] += topic_weight


    # Initialisation d'un dictionnaire pour stocker les sentiments normalisés
    # par date, topic_count et topic_num
    sentiment_by_date_and_topic = {}

    for topic_count, W_matrix in all_nmf_W.items():
        for article_num, topic_scores in enumerate(W_matrix):
            if article_num >= len(dates) or article_num >= len(transformed_sentiments):
                continue
            article_date = dates[article_num]
            sentiment_score = transformed_sentiments[article_num]

            for topic_num, topic_weight in enumerate(topic_scores):
                adjusted_sentiment_score = sentiment_score * topic_weight

                weight_key = (topic_count, topic_num, article_date)
                total_weight = total_weight_by_topic_count_topic_and_date.get(weight_key, 0)

                if total_weight > 0:
                    normalized_sentiment_score = adjusted_sentiment_score / total_weight
                else:
                    normalized_sentiment_score = 0

                combined_key = (topic_count, topic_num, article_date)
                if combined_key not in sentiment_by_date_and_topic:
                    sentiment_by_date_and_topic[combined_key] = [normalized_sentiment_score]
                else:
                    sentiment_by_date_and_topic[combined_key].append(normalized_sentiment_score)


    # Calcul de la moyenne pour chaque combinaison de (topic_count, topic_num, date)
    for key, normalized_sentiments in sentiment_by_date_and_topic.items():
        average_sentiment = sum(normalized_sentiments)  # / len(normalized_sentiments) si besoin
        sentiment_by_date_and_topic[key] = average_sentiment

    # Filtrer les combinaisons avec un score de 0
    sentiment_by_date_and_topic = {
        k: v for k, v in sentiment_by_date_and_topic.items() if v != 0
    }

    # Création du dossier de résultats si nécessaire
    if not os.path.exists(f"{results_path}{base_name}_TOPICS_SENTIMENTS_DYNAMICS_HEATMAPS/"):
        os.makedirs(f"{results_path}{base_name}_TOPICS_SENTIMENTS_DYNAMICS_HEATMAPS/")

    # Boucle principale : on génère la heatmap pour chaque topic_count
    for topic_count in all_nmf_W:
        # 1) On construit d'abord un dictionnaire (topic_num, date) -> sentiment
        filtered_data = {
            (topic_num, date): sentiment
            for (count, topic_num, date), sentiment in sentiment_by_date_and_topic.items()
            if count == topic_count
        }

        # 2) Conversion en DataFrame
        #    On sépare (topic_num, date) en deux colonnes distinctes : Topic et Date
        df = pd.DataFrame(list(filtered_data.items()), columns=['Topic_Date', 'Sentiment'])
        df[['Topic', 'Date']] = pd.DataFrame(df['Topic_Date'].tolist(), index=df.index)

        # 3) On crée un DataFrame de mapping "Topic -> Label"
        #    (en s'appuyant sur la liste topic_labels_by_config[topic_count])
        df_labels = pd.DataFrame({
            'Topic': range(len(topic_labels_by_config[topic_count])),
            'Topic_label': topic_labels_by_config[topic_count]
        })

        # 4) On fusionne df et le mapping pour obtenir le label de chaque topic_num
        df = df.merge(df_labels, on='Topic', how='left')

        # 5) On fait le pivot en utilisant le **label** comme index
        df = df.pivot(index="Topic_label", columns="Date", values="Sentiment")

        # 6) Interpolation et imputation
        df_imputed = df.ffill(axis=1).bfill(axis=1)
        df = df_imputed.interpolate(method='linear', axis=1)

        # 7) Comme dans le code original, on transpose pour gérer la chronologie
        df_transposed = df.T

        start_date = df_transposed.index.min()
        end_date = df_transposed.index.max()
        all_dates = pd.date_range(start=start_date, end=end_date, freq='D')

        # 8) On réindexe pour ne rien perdre, puis on interpole
        df_reindexed = df_transposed.reindex(all_dates)
        df_interpolated = df_reindexed.interpolate(method='linear')

        # 9) Re-transposer : chaque ligne correspond désormais à un label (Topic_label)
        df = df_interpolated.T

        # NOTE IMPORTANTE :
        # À ce stade, l'index de df est constitué des "Topic_label", et non plus des topic_num.
        # Tu n'as donc plus besoin de faire un `labels_for_my_df = [...]`.
        # Les labels SONT déjà dans df.index, donc si on veut un array/list pour le plotting :
        labels_for_my_df = df.index.to_list()

        # Ajustement éventuel de sigma si c'est 'auto'
        if sigma == 'auto':
            sigma = len(df.columns) / 15

        # Plot de la heatmap
        plot_custom_heatmap(
            df,
            cmap='coolwarm',
            sigma=sigma,
            relative_normalizaton=relative_normalizaton,
            with_colormap=True
        )

        # Sauvegarde de la figure
        plt.savefig(
            f"{results_path}{base_name}_TOPICS_SENTIMENTS_DYNAMICS_HEATMAPS/"
            f"{base_name}_topics_sentiments_dynamics_heatmap_{topic_count}tc_{relative_normalizaton}n_{int(sigma)}s_"
            f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
            f"{go_remove_duplicates}dup_{web_paper_differentiation}wp_dtwcompletechdtw2.png",
            dpi=DPI,
            bbox_inches='tight',
            pad_inches=0
        )
        plt.close()

In [None]:
def compute_text_height_in_data_coords(ax):
    """
    Mesure la hauteur (en 'data coords') d'un texte donné,
    via un placement temporaire invisible pour récupérer la bounding box en pixels.
    """
    # 1) On place du texte "invisible" (alpha=0) dans l'axe,
    #    peu importe où (ici en Axes coords = transAxes).
    #    Note : rotation=0 pour mesurer une hauteur "verticale" classique.
    temp_text = ax.text(
        0,
        0,
        '0123456789',       # Exemple de chaîne un peu longue
        rotation=0,
        rotation_mode='anchor',
        ha='left',
        va='bottom',
        alpha=0,
        transform=ax.transAxes  # On le place en Axes coords
    )

    # 2) On force un dessin pour que la bounding box soit calculée
    ax.figure.canvas.draw()
    renderer = ax.figure.canvas.get_renderer()

    # 3) On récupère la bounding box en pixels
    bbox = temp_text.get_window_extent(renderer=renderer)
    pixel_height = bbox.height  # Hauteur en pixels

    # 4) Convertir la hauteur (en pixels) -> (en 'data coords' sur l'axe Y) :
    #    On regarde le décalage vertical en pixels pour "1" unité sur l'axe Y
    x0_data, y0_data = ax.transData.transform((0, 0))
    x1_data, y1_data = ax.transData.transform((0, 1))
    one_unit_in_pixels = y1_data - y0_data

    data_height = pixel_height / one_unit_in_pixels

    # 5) Nettoyage : on supprime le texte temporaire
    temp_text.remove()

    return data_height


def calc_positions_for_continuous_spacing_Y(ymin, ymax, text_height, spacing_factor):
    """
    Calcule (sans dessiner) les positions Y (en coordonnées 'data')
    où l’on placerait chaque label, en avançant de 'text_height * spacing_factor'
    tant que le bord "supérieur" du label (current_y + text_height)
    ne dépasse pas ymax (avec une petite tolérance).
    """
    positions = []
    # On démarre de ymin + text_height/2 pour centrer le label sur cette position
    current_y = ymin + text_height / 2

    # Tolérance permettant de s'assurer que le label complet reste dans [ymin..ymax]
    tolerance_max = ymax + text_height / 2

    while True:
        top_edge = current_y + text_height
        if top_edge > tolerance_max:
            break
        positions.append(current_y)
        current_y += text_height * spacing_factor

    return positions


def find_optimal_continuous_spacing_Y(ymin, ymax, text_height,
                                      spacing_factor_min=1.02,
                                      spacing_factor_max=1.2,
                                      step=0.001):
    """
    Cherche le "spacing_factor" optimal (entre spacing_factor_min et spacing_factor_max)
    pour maximiser le remplissage de [ymin..ymax] par des labels
    espacés de manière continue.
    """
    best_sf = spacing_factor_min
    best_diff = float('inf')
    best_positions = []

    # Comme dans la fonction X, on définit une tolérance similaire
    tolerance_max = ymax + text_height / 2
    spacing_values = np.arange(spacing_factor_min, spacing_factor_max + step, step)

    for sf in spacing_values:
        positions = calc_positions_for_continuous_spacing_Y(
            ymin=ymin,
            ymax=ymax,
            text_height=text_height,
            spacing_factor=sf
        )

        # Si aucune position n'est retournée, c'est que sf est trop grand
        if not positions:
            diff = 9999
        else:
            last_y = positions[-1]
            top_edge = last_y + text_height
            diff = abs(tolerance_max - top_edge)

        if diff < best_diff:
            best_diff = diff
            best_sf = sf
            best_positions = positions
            # Si on est extrêmement proche de la limite, on arrête
            if diff < 1e-9:
                break

    return best_sf, best_positions


def manual_tick_placement_continuous_Y(
    ax,
    ymin,
    ymax,
    spacing_factor_min=1.02,
    spacing_factor_max=1.2,
    step=0.001
):
    """
    Place manuellement des pseudo-ticks pour l'axe Y dans [ymin..ymax].
    On coupe les ticks officiels et on dessine les labels via ax.text.
    """

    # 1) On suppose que vous avez une fonction qui calcule la hauteur
    #    d'un label en 'data coords' :
    text_height = compute_text_height_in_data_coords(ax)  # À adapter

    # 2) Désactiver les ticks "officiels" de l'axe Y
    ax.set_yticks([])
    ax.set_ylim(ymin, ymax)

    # 3) Recherche du spacing_factor optimal
    best_sf, _ = find_optimal_continuous_spacing_Y(
        ymin=ymin,
        ymax=ymax,
        text_height=text_height,
        spacing_factor_min=spacing_factor_min,
        spacing_factor_max=spacing_factor_max,
        step=step
    )

    # 4) Placement effectif des positions optimisées
    positions = calc_positions_for_continuous_spacing_Y(
        ymin=ymin,
        ymax=ymax,
        text_height=text_height,
        spacing_factor=best_sf
    )

    # 5) Création d'un offset transform pour décaler légèrement le texte
    #    vers la gauche (x<0) ou la droite (x>0) en points
    offset_axes_transform = mtransforms.offset_copy(
        ax.transAxes,
        fig=ax.figure,
        x=-2.0,   # Décalage à gauche en points (ajustez selon vos besoins)
        y=0,
        units='points'
    )

    # Récupération pour la conversion data -> coords Axe
    y_min, y_max = ax.get_ylim()

    # 6) Dessin de chaque label
    for y_val in positions:
        # Convertir la coordonnée data -> coordonnée "Axes" (entre 0 et 1)
        y_val_axes = (y_val - y_min) / (y_max - y_min)

        label_str = f"{y_val:.3f}"  # Format de l'étiquette (à adapter si besoin)

        ax.text(
            0.0,               # On place le texte "à gauche" du subplot
            y_val_axes,
            label_str,
            rotation=0,        # On peut mettre 0 ou toute autre rotation
            rotation_mode='anchor',
            ha='right',        # Alignement horizontal à droite
            va='center',       # Alignement vertical centré
            transform=offset_axes_transform,
            bbox=dict(facecolor='white', edgecolor='none', boxstyle='round,pad=0.0')
        )

    return best_sf

In [None]:
def create_box_plots(group_column=None):
    # Ce dictionnaire contiendra pour chaque "n_components" (nombre de topics),
    # la distribution des scores de chaque topic par journal
    distri_topics_by_journal_by_num_topic = {}

    # Parcours de chaque nombre de composantes (chaque clé de all_nmf_W)
    for n_components, W_matrix in all_nmf_W.items():
        # Initialiser un dictionnaire pour stocker la distribution des sujets par journal pour ce n_components
        distri_topics_by_journal = {}

        # W_matrix est une matrice de taille (nb_articles, n_components).
        # num_article correspond ici à l'index de la ligne (document) dans la matrice.
        for num_article, row_values in enumerate(W_matrix):
            # Récupération du "journal" selon la source
            if source_type == 'europresse':
                header = all_soups[num_article].header
                journal_text = extract_information(header, '.rdp__DocPublicationName')
                journal_text = normalize_journal(journal_text)

            elif source_type == 'istex':
                journal_text = columns_dict['journal'][num_article]

            elif source_type == 'csv':
                # Vérification de l'existence de la colonne
                if group_column not in columns_dict:
                    print(f"La colonne '{group_column}' n'a pas été trouvée dans le fichier CSV.")
                    return

                journal_text = columns_dict[group_column][num_article]

            # row_values est un vecteur de scores de longueur n_components,
            # chaque "topic" est l'index dans ce vecteur.
            for topic, score in enumerate(row_values):
                # On initialise le sous-dictionnaire si nécessaire
                if topic not in distri_topics_by_journal:
                    distri_topics_by_journal[topic] = {}

                if journal_text not in distri_topics_by_journal[topic]:
                    distri_topics_by_journal[topic][journal_text] = []

                # Ajout du score dans la liste correspondant à ce journal et ce topic
                distri_topics_by_journal[topic][journal_text].append(score)

        # On stocke ensuite cette distribution pour le n_components courant
        distri_topics_by_journal_by_num_topic[n_components] = distri_topics_by_journal

    """
    Remplace le test de Kruskal-Wallis par un test bootstrap sur les moyennes.
    """

    # Création du dossier principal
    if not os.path.exists(f"{results_path}{base_name}_BOX_PLOTS/"):
        os.makedirs(f"{results_path}{base_name}_BOX_PLOTS/")

    for num_topic in distri_topics_by_journal_by_num_topic:

        # Sous-dossier spécifique au nombre de topics
        if not os.path.exists(f"{results_path}{base_name}_BOX_PLOTS/{base_name}_{num_topic}TC_BOX_PLOTS/"):
            os.makedirs(f"{results_path}{base_name}_BOX_PLOTS/{base_name}_{num_topic}TC_BOX_PLOTS/")

        for topic in tqdm(distri_topics_by_journal_by_num_topic[num_topic], desc="Processing topics"):
            topic_data = distri_topics_by_journal_by_num_topic[num_topic][topic]

            # Collecte des données de score pour chaque journal
            data = []
            journals = []  # Pour les étiquettes
            for journal, scores in topic_data.items():
                data.extend(scores)
                journals.extend([journal] * len(scores))

            # Création d'un DataFrame pour Seaborn
            df = pd.DataFrame({'Journal': journals, 'Score': data})

            # Filtrer les journaux (ceux qui ont au moins "threshold" valeurs)
            journal_counts = df['Journal'].value_counts()
            journals_to_keep = journal_counts[journal_counts >= threshold].index
            df = df[df['Journal'].isin(journals_to_keep)]

            # Test bootstrap (si au moins 2 groupes)
            if len(set(df['Journal'])) < 2:
                print("Pas assez de groupes pour effectuer le test bootstrap pour ce sujet et topic")
                continue

            # Journaux uniques
            unique_journals = df['Journal'].unique()

            # Calcul de la hauteur de la figure
            figure_height_inch = (len(unique_journals) * PX_PER_TOPIC) / DPI

            # Figure avec un sous-axe par journal
            fig, axes = plt.subplots(len(unique_journals), 1,
                                     figsize=(FIGURE_WIDTH_INCH, figure_height_inch),
                                     dpi=DPI, sharex=True)

            # On trie les journaux par moyenne
            mean_scores = df.groupby('Journal')['Score'].mean().sort_values(ascending=False)
            sorted_journals = mean_scores.index.tolist()

            # Plot de chaque boxplot
            # Si un seul journal, axes n'est pas un array => on le transforme en liste
            if len(unique_journals) == 1:
                axes = [axes]

            for i, journal in enumerate(sorted_journals):
                # Boxplot
                sns.boxplot(
                    x='Score',
                    data=df[df['Journal'] == journal],
                    ax=axes[i],
                    whis=[0, 100],
                    showmeans=True,
                    width=0.98,
                    meanprops={
                        'marker': '|',
                        'markeredgecolor': 'red',
                        'markeredgewidth': 5,
                        'markersize': 16
                    },
                    boxprops={
                        'facecolor': (0.0, 0.2, 0.8),
                        'edgecolor': (0.0, 0.2, 0.8)
                    },
                    medianprops={
                        'color': 'none',
                        'linewidth': 10
                    },
                    whiskerprops={
                        'color': 'black',
                        'linewidth': 2
                    },
                    capprops={
                        'color': 'none',
                        'linewidth': 0
                    },
                )

                # Ajustement des x-lims
                axes[i].set_xlim(left=0, right=df['Score'].max())

                # Masque l'axe X pour tous sauf le dernier
                if i < len(unique_journals) - 1:
                    axes[i].xaxis.set_visible(False)
                else:
                    # Placement manuel des ticks (exemple de fonction custom que vous aviez)
                    manual_tick_placement_continuous(
                        ax=axes[i],
                        xmin=0,
                        xmax=df['Score'].max(),
                        spacing_factor_min=1.02,
                        spacing_factor_max=1.2,
                        step=0.001
                    )

                # Retirer y-label et y-ticks
                axes[i].set_ylabel('')
                axes[i].set_yticks([])
                axes[i].set_xticks([])

                offset_axes_transform = mtransforms.offset_copy(
                    axes[i].transAxes,
                    fig=axes[i].figure,
                    x=-3.0,
                    y=0.0,
                    units='points'
                )

                # Petit label à gauche (nom du journal)
                axes[i].text(
                    0,
                    0.5,
                    journal,
                    ha='right',
                    va='center',
                    transform=offset_axes_transform
                )

            sns.despine(left=True, bottom=True)

            # Sauvegarde de la figure
            plt.savefig(
                f"{results_path}{base_name}_BOX_PLOTS/{base_name}_{num_topic}TC_BOX_PLOTS/"
                f"{base_name}_{num_topic}tc_{topic_labels_by_config[num_topic][topic]}_"
                f"{minimum_caracters_nb_by_document}minc_{maximum_caracters_nb_by_document}maxc_"
                f"{go_remove_duplicates}dup_{web_paper_differentiation}wp_"
                f"{threshold}thr_journals_boxplots.png",
                bbox_inches='tight',
                pad_inches=0
            )
            plt.close()