In [None]:
import numpy as np
import matplotlib.pyplot as plt

import codecs
import re
import os.path

from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, SVC
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_curve, auc, roc_auc_score
from sklearn.model_selection import GridSearchCV, cross_val_score, train_test_split, StratifiedKFold
from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
import seaborn as sns
import warnings

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import pandas as pd
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from sklearn.feature_selection import chi2
import numpy as np
from nltk.corpus import stopwords

# Données reconnaissance du locuteur (Chirac/Mitterrand)

In [None]:
import codecs
# Chargement des données:
def load_pres(fname):
    alltxts = []
    alllabs_pres = []
    s=codecs.open(fname, 'r','utf-8') # pour régler le codage
    while True:
        txt = s.readline()
        if(len(txt))<5:
            break
        #
        lab = re.sub(r"<[0-9]*:[0-9]*:(.)>.*","\\1",txt)
        txt = re.sub(r"<[0-9]*:[0-9]*:.>(.*)","\\1",txt)
        if lab.count('M') >0:
            alllabs_pres.append(-1)
        else: 
            alllabs_pres.append(1)
        alltxts.append(txt)
    s.close() 
    return alltxts,alllabs_pres

def load_pres_test(fname):
    alltxts = []
    s=codecs.open(fname, 'r','utf-8') # pour régler le codage
    while True:
        txt = s.readline()
        if(len(txt))<5:
            break
        #
        txt = re.sub(r"<[0-9]*:[0-9]>(.*)","\\1",txt)
        alltxts.append(txt)
    s.close() 
    return alltxts

def save_pred_pres(pred): 
    with open('prediction_president.txt', 'w') as f:
        for item in pred:
            f.write("%s\n" % item)

In [None]:
# fname = "./datasets/AFDpresidentutf8/corpus.tache1.learn.utf8.txt"
fname = "./datasets/AFDpresidentutf8/corpus.tache1.learn.utf8.txt"
alltxts_pres,alllabs_pres = load_pres(fname)


In [None]:
print(len(alltxts_pres),len(alllabs_pres))
print(alltxts_pres[0])
print(alllabs_pres[0])
print(alltxts_pres[-1])
print(alllabs_pres[-1])

# Données classification de sentiments (films)

In [None]:
def load_movies(path2data): # 1 classe par répertoire
    alltxts = [] # init vide
    labs = []
    cpt = 0
    for cl in os.listdir(path2data): # parcours des fichiers d'un répertoire
        for f in os.listdir(path2data+cl):
            txt = open(path2data+cl+'/'+f).read()
            alltxts.append(txt)
            labs.append(cpt)
        cpt+=1 # chg répertoire = cht classe
        
    return alltxts,labs
def load_movies_test(file): # 1 classe par répertoire
    alltxts = [] 
    #lire ligne par ligne dans le fichier test
    with open(file) as f:
        lines = f.readlines()
        for line in lines:
            alltxts.append(line)
    return alltxts

def save_pred(pred): 
    with open('prediction_movies.txt', 'w') as f:
        for item in pred:
            if item == 1:
                f.write("P\n")
            else:
                f.write("N\n")

In [None]:
path_test = "./datasets/testSentiment.txt"
alltxtsTest = load_movies_test(path_test)

In [None]:
path = "./datasets/movies/movies1000/"
alltxts,alllabs = load_movies(path)

In [None]:
print(len(alltxts),len(alllabs))
print(alltxts[0])
print(alllabs[0])
print(alltxts[-1])
print(alllabs[-1])

# A) Transformation paramétrique du texte (pre-traitements)

Vous devez tester, par exemple, les cas suivants:
- transformation en minuscule ou pas
- suppression de la ponctuation
- transformation des mots entièrement en majuscule en marqueurs spécifiques
- suppression des chiffres ou pas
- conservation d'une partie du texte seulement (seulement la première ligne = titre, seulement la dernière ligne = résumé, ...)
- stemming
- ...


Vérifier systématiquement sur un exemple ou deux le bon fonctionnement des méthodes sur deux documents (au moins un de chaque classe).

In [None]:
import re
import string
from nltk.stem import PorterStemmer
import unicodedata
from nltk.stem import WordNetLemmatizer

def preprocess_text(text, lowercase=True, remove_punctuation=True, 
                     remove_digits=True, 
                     keep_part='full', stemming=False, lemmatize=False):
    """
    Série de prétraitements au texte.
    Parameters:
    - text: Texte à traiter.
    - lowercase: Si True, convertit le texte en minuscules.
    - remove_punctuation: Si True, supprime la ponctuation.
    - remove_digits: Si True, supprime les chiffres.
    - keep_part: Partie du texte à conserver ('full', 'first_line', 'last_line').
    - stemming: Si True, applique le stemming.

    Returns:
    - Texte prétraité.
    """

    # Supprimer la ponctuation 
    if remove_punctuation:
        p = string.punctuation
        p += '\n\r\t'  # Ajouter les retours chariot, tabulation
        text = text.translate(str.maketrans(p, ' ' * len(p)))
        clean_text = re.sub(r'\s[a-z]\s', ' ', text)
        text = clean_text.strip()
        text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
    
    # Convertir en minuscules
    if lowercase:
        text = text.lower()

    # Supprimer les chiffres
    if remove_digits:
        text = re.sub('[0-9]+', '', text)  # Remplacer une séquence de chiffres par rien

    # Conserver une partie spécifique du texte
    if keep_part == 'first_line':
        text = text.split('\n')[0]
    elif keep_part == 'last_line':
        text = text.split('\n')[-1]

    # Appliquer le stemming à chaque mot du texte
    if stemming:
        ps = PorterStemmer()
        words = text.split()
        stemmed_words = [ps.stem(word) for word in words]
        text = ' '.join(stemmed_words)

    if lemmatize:
        lemmatizer = WordNetLemmatizer()
        words = text.split()
        lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
        text = ' '.join(lemmatized_words)

    return text


# B) Extraction du vocabulaire (BoW)

- **Exploration préliminaire des jeux de données**
    - Quelle est la taille d'origine du vocabulaire?
    - Que reste-t-il si on ne garde que les 100 mots les plus fréquents? [word cloud]
    - Quels sont les 100 mots dont la fréquence documentaire est la plus grande? [word cloud]
    - Quels sont les 100 mots les plus discriminants au sens de odds ratio? [word cloud]
    - Quelle est la distribution d'apparition des mots (Zipf)
    - Quels sont les 100 bigrammes/trigrammes les plus fréquents?

- **Variantes de BoW**
    - TF-IDF
    - Réduire la taille du vocabulaire (min_df, max_df, max_features)
    - BoW binaire
    - Bi-grams, tri-grams
    - **Quelles performances attendre ? Quels sont les avantages et les inconvénients des ces variantes?**

In [None]:

# A) Exploration préliminaire des jeux de données

def explore_dataset(vocabulary, n_top_words=100):
    # Taille d'origine du vocabulaire
    print("Taille du vocabulaire :", len(vocabulary))

    # Word Cloud pour les 100 mots les plus fréquents
    wordcloud_top_words = WordCloud(width=400, height=200, background_color='white').generate_from_frequencies(vocabulary)
    
    # Word Cloud pour les 100 mots avec la fréquence documentaire la plus grande
    df_vocabulary = pd.DataFrame(vocabulary.items(), columns=['Mot', 'Fréquence'])
    df_vocabulary_sorted = df_vocabulary.sort_values(by='Fréquence', ascending=False)
    wordcloud_doc_freq = WordCloud(width=400, height=200, background_color='white').generate_from_frequencies(dict(zip(df_vocabulary_sorted['Mot'], df_vocabulary_sorted['Fréquence'])))

    # # Calcul des odds ratio
    # X = vectorizer.fit_transform(alltxts_pres)
    # chi2_stat, _ = chi2(X, alllabs_pres)
    # odds_ratio = np.exp(chi2_stat)
    # df_odds_ratio = pd.DataFrame({'Mot': vectorizer.get_feature_names_out(), 'Odds Ratio': odds_ratio})
    # return df_odds_ratio
    # df_odds_ratio_sorted = df_odds_ratio.sort_values(by='Odds Ratio', ascending=False)

    # # Word Cloud pour les 100 mots les plus discriminants au sens de odds ratio
    # df_odds_ratio_sorted = df_odds_ratio.sort_values(by='Odds Ratio', ascending=False)

    # # Remplacement des valeurs NaN par une chaîne de caractères
    # df_odds_ratio_sorted['Odds Ratio'].fillna(0, inplace=True)

    # wordcloud_odds_ratio = WordCloud(width=400, height=200, background_color='white').generate_from_frequencies(dict(zip(df_odds_ratio_sorted['Mot'], df_odds_ratio_sorted['Odds Ratio'])))

    # Distribution d'apparition des mots (Zipf)
    df_zipf = df_vocabulary_sorted.reset_index(drop=True)

    # Création d'une figure avec 3 sous-graphiques
    fig, axes = plt.subplots(1, 3, figsize=(16, 3))

    # Affichage des images sur les sous-graphiques
    axes[0].imshow(wordcloud_top_words, interpolation='bilinear')
    axes[0].axis('off')
    axes[0].set_title('100 mots les plus fréquents')

    axes[1].imshow(wordcloud_doc_freq, interpolation='bilinear')
    axes[1].axis('off')
    axes[1].set_title('100 mots avec la plus grande fréquence documentaire')

    # axes[2].imshow(wordcloud_odds_ratio, interpolation='bilinear')
    # axes[2].axis('off')
    # axes[2].set_title('100 mots les plus discriminants (Odds Ratio)')

    axes[2].plot(df_zipf.index + 1, df_zipf['Fréquence'], marker='o')
    axes[2].set_xscale('log')
    axes[2].set_yscale('log')
    axes[2].set_title('Distribution d\'apparition des mots (Zipf)')
    axes[2].set_xlabel('Rang du mot')
    axes[2].set_ylabel('Fréquence')

    # Ajustement du placement des sous-graphiques
    plt.tight_layout()

    # Affichage de la figure
    plt.show()

# # B) Variantes de BoW

# Vectorisation BoW
print("Vectorisation BoW")
vectorizer = CountVectorizer(binary=True, stop_words='english')
X_bow = vectorizer.fit_transform(alltxts)
vocabulary_bow = dict(zip(vectorizer.get_feature_names_out(), np.asarray(X_bow.sum(axis=0)).ravel()))
# Exploration préliminaire
explore_dataset(vocabulary_bow)

# TF-IDF
print("TF-IDF")
vectorizer_tfidf = TfidfVectorizer(stop_words='english')
X_tfidf = vectorizer_tfidf.fit_transform(alltxts)
vocabulary_tfidf = dict(zip(vectorizer_tfidf.get_feature_names_out(), np.asarray(X_tfidf.sum(axis=0)).ravel()))
# Exploration préliminaire
explore_dataset(vocabulary_tfidf)

# Réduire la taille du vocabulaire (max_features)
print("taille du vocabulaire avec max_features = 100")
vectorizer_max_features = CountVectorizer(max_features=100)
X_max_features = vectorizer_max_features.fit_transform(alltxts)
vocabulary_max_features = dict(zip(vectorizer_max_features.get_feature_names_out(), np.asarray(X_max_features.sum(axis=0)).ravel()))
# Exploration préliminaire
explore_dataset(vocabulary_max_features)

# BoW binaire
print("BoW binaire")
vectorizer_binary = CountVectorizer(binary=True)
X_binary = vectorizer_binary.fit_transform(alltxts)
vocabulary_binary = dict(zip(vectorizer_binary.get_feature_names_out(), np.asarray(X_binary.sum(axis=0)).ravel()))
# Exploration préliminaire
explore_dataset(vocabulary_binary)

# Bigrammes
print("Bigrammes")
vectorizer_bigram = CountVectorizer(ngram_range=(1, 2))
X_bigram = vectorizer_bigram.fit_transform(alltxts)
vocabulary_bigram = dict(zip(vectorizer_bigram.get_feature_names_out(), np.asarray(X_bigram.sum(axis=0)).ravel()))
# Exploration préliminaire
explore_dataset(vocabulary_bigram)


***
# Analyse avec un Tfidf Binaire des données Movie Review

In [None]:
def evaluer_modele(vectorizer, grid_search, X_test, y_test, labels):
    # best model
    best_model = grid_search.best_estimator_
    print("Meilleur modèle : ", best_model)
    
    # Obtenir les prédictions de classification du meilleur estimateur
    y_pred = best_model.predict(X_test)

    # Obtenir les prédictions de probabilité du meilleur estimateur
    # y_proba = best_model.predict_proba(X_test)

    accuracy = accuracy_score(y_test, y_pred)
    print(f"Précision sur l'ensemble de test avec le meilleur modèle : {accuracy:.3f}\n")

    # Matrice de confusion
    conf_mat = confusion_matrix(y_test, y_pred)
    fig, ax = plt.subplots(figsize=(6, 6))
    sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', cbar=False, ax=ax)
    ax.set_xlabel('Prédit')
    ax.set_ylabel('Vrai')
    ax.set_title('Matrice de confusion')
    ax.xaxis.set_ticklabels(labels)
    ax.yaxis.set_ticklabels(labels)
    plt.show()

    # Rapport de classification
    print(classification_report(y_test, y_pred, target_names=labels))

    # Courbe ROC
    decision_scores = best_model.decision_function(X_test)  # scores de décision
    # y_pred_proba = 1 / (1 + np.exp(-decision_scores)) # probabilités prédites pour utiliser dans la courbe ROC

    # Calculer les taux de vrais positifs et de faux positifs
    fpr, tpr, thresholds = roc_curve(y_test, decision_scores)
    # Calculer l'AUC
    auc = roc_auc_score(y_test, decision_scores)
    # fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
    # roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(6, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('Taux de faux positifs')
    plt.ylabel('Taux de vrais positifs')
    plt.title('Courbe ROC')
    plt.legend(loc='lower right')
    plt.show()

    # Interprétation des poids
    # Poids des mots
    weights = grid_search.best_estimator_.coef_.flatten()
    words = vectorizer.get_feature_names_out()
    df_weights = pd.DataFrame({'Mot': words, 'Poids': weights})
    df_weights_sorted = df_weights.sort_values(by='Poids', ascending=False)

    # 10 mots les plus positifs
    print("10 mots les plus positifs\n")
    print(df_weights_sorted.head(10))

    # 10 mots les plus négatifs
    print("10 mots les plus négatifs\n")
    print(df_weights_sorted.tail(10))

    # Visualisation des poids
    fig, axes = plt.subplots(1, 2, figsize=(14, 7))

    # Premier sous-graphique : Barplot
    df_weights_sorted = df_weights_sorted.head(20).append(df_weights_sorted.tail(20))
    sns.barplot(x='Poids', y='Mot', data=df_weights_sorted, ax=axes[0])
    axes[0].set_title('Poids des mots - Barplot')

    # Deuxième sous-graphique : Word Cloud
    wordcloud_weights = WordCloud(width=300, height=300, background_color='white').generate_from_frequencies(dict(zip(df_weights['Mot'], df_weights['Poids'])))
    axes[1].imshow(wordcloud_weights, interpolation='bilinear')
    axes[1].axis('off')
    axes[1].set_title('Poids des mots - Word Cloud')

    plt.show()

    # Visualisation des poids avec Word Cloud (positifs et négatifs)
    fig, axes = plt.subplots(1, 2, figsize=(14, 7))

    # Premier sous-graphique : Word Cloud (positifs)
    df_weights_pos = df_weights_sorted[df_weights_sorted['Poids'] > 0]
    wordcloud_weights_pos = WordCloud(width=300, height=300, background_color='white').generate_from_frequencies(dict(zip(df_weights_pos['Mot'], df_weights_pos['Poids'])))
    axes[0].imshow(wordcloud_weights_pos, interpolation='bilinear')
    axes[0].axis('off')
    axes[0].set_title('Poids des mots (positifs)')

    # Deuxième sous-graphique : Word Cloud (négatifs)
    df_weights_neg = df_weights_sorted[df_weights_sorted['Poids'] < 0]
    wordcloud_weights_neg = WordCloud(width=300, height=300, background_color='white').generate_from_frequencies(dict(zip(df_weights_neg['Mot'], df_weights_neg['Poids'])))
    axes[1].imshow(wordcloud_weights_neg, interpolation='bilinear')
    axes[1].axis('off')
    axes[1].set_title('Poids des mots (négatifs)')

    # plt.show()
    return y_pred, df_weights_sorted

In [None]:
alltxts_process_stem = [preprocess_text(txt, stemming=True) for txt in alltxts]

In [None]:
from collections import Counter
# visualisation des mots les plus fréquents en fonction de la classe
vectorizer_binary_neg = TfidfVectorizer(stop_words=stopwords.words('english'), ngram_range=(1, 2), min_df=2, max_features=100)
X_binary_neg = vectorizer_binary_neg.fit_transform(alltxts_process_stem[:1000])
print(vectorizer_binary_neg.get_feature_names_out()[:20])
vocabulary_binary_neg = dict(zip(vectorizer_binary_neg.get_feature_names_out(), np.asarray(X_binary_neg.sum(axis=0)).ravel()))
explore_dataset(vocabulary_binary_neg)

vectorizer_binary_pos = TfidfVectorizer(stop_words=stopwords.words('english'), ngram_range=(1, 2), min_df=2, max_features=100)
X_binary_pos = vectorizer_binary_pos.fit_transform(alltxts_process_stem[1000:])
print(vectorizer_binary_pos.get_feature_names_out()[:20])
vocabulary_binary_pos = dict(zip(vectorizer_binary_pos.get_feature_names_out(), np.asarray(X_binary_pos.sum(axis=0)).ravel()))
explore_dataset(vocabulary_binary_pos)


In [None]:
stop_words = set(stopwords.words('english'))

keep_words = {'couldn', 'don', 'couldn', 'didn', 'not', 'wasn'}
l_stop_words = stop_words - keep_words
l_stop_words = list(l_stop_words)

vectorizer = CountVectorizer(binary=True, stop_words=l_stop_words, ngram_range=(1, 2), min_df=2, max_features=12500)
X = vectorizer.fit_transform(alltxts_process_stem)
print(X.shape)

In [None]:
# Ignore all warnings
warnings.filterwarnings("ignore")

# Split des données
X_train, X_test, y_train, y_test = train_test_split(X, alllabs, test_size=0.2,shuffle=True)
# _, X_test, _, y_test = train_test_split(X, alllabs, test_size=0.2,shuffle=True)

# Modèles
models = {
    # 'Naive Bayes': MultinomialNB(),
    'Régression logistique': LogisticRegression(),
    'SVC linéaire': LinearSVC()
}

# Recherche des hyperparamètres optimaux
param_grid = {}

for name, model in models.items():
    if name == 'Naive Bayes': # le naïf bayésien n'a pas de paramètres à optimiser
        param_grid = {
            'alpha': [0.1]
        }
    elif name == 'SVC linéaire':
        param_grid = {
            'C': [0.1, 1, 2, 3, 5, 10],
            'penalty': ['l2'],
        }
    elif name == 'Régression logistique':
        param_grid = {
            'C': [0.1, 1, 2, 5, 10],
            'penalty': ['l2'],
            'solver': ['liblinear']
        }
    grid_search = GridSearchCV(model, param_grid, cv=5)
    grid_search.fit(X_train, y_train)
    
    print(f"Meilleur score de validation croisée pour {name}: {grid_search.best_score_:.3f}")
    print(f"Meilleurs hyperparamètres pour {name}: {grid_search.best_params_}\n")


# Évaluation du modèle
prediction, df = evaluer_modele(vectorizer, grid_search, X_test, y_test, ['positif', 'négatif'])

In [None]:
vectorizer = CountVectorizer(binary=True, stop_words=l_stop_words, ngram_range=(1, 2), min_df=2, max_features=12500)
X_Test = vectorizer.fit_transform(testSentiment_process)

In [None]:
pred = grid_search.predict(X_Test)
save_pred(pred)

In [None]:
# Classifieur aléatoire
y_pred_random = np.random.choice([0, 1], size=len(y_test))
accuracy_random = accuracy_score(y_test, y_pred_random)
print(f"Précision sur l'ensemble de test avec un classifieur aléatoire: {accuracy_random:.3f}")
print(classification_report(y_test, y_pred_random, target_names=['Négatif', 'Positif']))

# Classifieur majoritaire
y_pred_majority = np.ones(len(y_test))
accuracy_majority = accuracy_score(y_test, y_pred_majority)
print(f"Précision sur l'ensemble de test avec un classifieur majoritaire: {accuracy_majority:.3f}")
print(classification_report(y_test, y_pred_majority, target_names=['Négatif', 'Positif']))


## 1) Métriques d'évaluation 

Il faudra utiliser des métriques d'évaluation pertinentes suivant la tâche et l'équilibrage des données : 
- Accuracy
- Courbe ROC, AUC, F1-score

# C) Modèles de Machine Learning

## 2) Variantes sur les stratégies d'entraînement

- **Sur-apprentissage**. Les techniques sur lesquelles nous travaillons étant sujettes au sur-apprentissage: trouver le paramètre de régularisation dans la documentation et optimiser ce paramètre au sens de la métrique qui vous semble la plus appropriée (cf question précédente).
<br>
- **Equilibrage des données**. Un problème reconnu comme dur dans la communauté est celui de l'équilibrage des classes (*balance* en anglais). Que faire si les données sont à 80, 90 ou 99% dans une des classes?
Le problème est dur mais fréquent; les solutions sont multiples mais on peut isoler 3 grandes familles de solution.

1. Ré-équilibrer le jeu de données: supprimer des données dans la classe majoritaire et/ou sur-échantilloner la classe minoritaire.<BR>
   $\Rightarrow$ A vous de jouer pour cette technique
1. Changer la formulation de la fonction de coût pour pénaliser plus les erreurs dans la classe minoritaire:
soit une fonction $\Delta$ mesurant les écarts entre $f(x_i)$ et $y_i$ 
$$C = \sum_i  \alpha_i \Delta(f(x_i),y_i), \qquad \alpha_i = \left\{
\begin{array}{ll}
1 & \text{si } y_i \in \text{classe majoritaire}\\
B>1 & \text{si } y_i \in \text{classe minoritaire}\\
\end{array} \right.$$
<BR>
   $\Rightarrow$ Les SVM et d'autres approches sklearn possèdent des arguments pour régler $B$ ou $1/B$... Ces arguments sont utiles mais pas toujours suffisant.
1. Courbe ROC et modification du biais. Une fois la fonction $\hat y = f(x)$ apprise, il est possible de la *bidouiller* a posteriori: si toutes les prédictions $\hat y$ sont dans une classe, on va introduire $b$ dans $\hat y = f(x) + b$ et le faire varier jusqu'à ce qu'un des points change de classe. On peut ensuite aller de plus en plus loin.
Le calcul de l'ensemble des scores associés à cette approche mène directement à la courbe ROC.

**Note:** certains classifieurs sont intrinsèquement plus résistante au problème d'équilibrage, c'est par exemple le cas des techniques de gradient boosting que vous verrez l'an prochain.

## 3) Post-processing sur les données Président

Pour la tâche de reconnaissance de locuteur, des phrases successives sont souvent associés à un même locuteur. Voir par exemples les 100 premiers labels de la base d'apprentissage. 

# Analyse des données president 

### variante 1 :  Utilisation de Bow

In [None]:
process_alltxts = [preprocess_text(txt, lowercase=False, stemming=True) for txt in alltxts_pres]

In [None]:
from collections import Counter
# visualisation des mots les plus fréquents en fonction de la classe negative

y_indice_C = np.where(np.array(alllabs_pres)==-1)[0]
y_indice_M = np.where(np.array(alllabs_pres)==1)[0]

stopwords_list = stopwords.words('french')
vectorizer_binary_neg = TfidfVectorizer(stop_words=stopwords_list, ngram_range=(1, 2), min_df=2, max_features=200)
X_c = vectorizer_binary_neg.fit_transform([process_alltxts[i] for i in y_indice_C])
print(vectorizer_binary_neg.get_feature_names_out()[:20])
vocabulary_binary_neg = dict(zip(vectorizer_binary_neg.get_feature_names_out(), np.asarray(X_c.sum(axis=0)).ravel()))
explore_dataset(vocabulary_binary_neg)

# visualisation des mots les plus fréquents en fonction de la classe positive
vectorizer_binary_pos = TfidfVectorizer(stop_words=stopwords_list, ngram_range=(1, 2), min_df=2, max_features=200)
X_m = vectorizer_binary_pos.fit_transform([process_alltxts[i] for i in y_indice_M])
print(vectorizer_binary_pos.get_feature_names_out()[:20])
vocabulary_binary_pos = dict(zip(vectorizer_binary_pos.get_feature_names_out(), np.asarray(X_m.sum(axis=0)).ravel()))
explore_dataset(vocabulary_binary_pos)

In [None]:
stopwords_list = stopwords.words('french')
vectorizer = TfidfVectorizer(stop_words=stopwords_list, ngram_range=(1,2), min_df=1, max_df=0.95, max_features=25000)
X = vectorizer.fit_transform(process_alltxts)
print(X.shape)

In [None]:
from imblearn.under_sampling import TomekLinks
from imblearn.combine import SMOTEENN # importer la méthode SMOTEENN
# Ignore all warnings
warnings.filterwarnings("ignore")


# Split des données
Skf = StratifiedKFold(n_splits=5, shuffle=False)

fold_test = np.random.choice([0, 1, 2, 3, 4])
index_test = []
index_train = []

for i, (train_index, test_index) in enumerate(Skf.split(X, alllabs_pres)):
    if i == fold_test:
        index_test =  test_index.tolist()
        index_train = train_index.tolist()  

print(len(index_test), len(index_train))

X_train = X[index_train]
y_train = [alllabs_pres[i] for i in index_train]

X_test = X[index_test]
y_test = [alllabs_pres[i] for i in index_test]

# Split des données
# r0s = RandomUnderSampler() # sous-échantillonnage de la classe majoritaire
# X_train, y_train = r0s.fit_resample(X_train, y_train)
# X_test, y_test = r0s.fit_resample(X_test, y_test)

# Trouver les indices où y_train est égal à 1
indices_pos = np.where(np.array(y_train) == 1)[0]

# shuffle que les données de train
indices = np.arange(X_train.shape[0])
np.random.shuffle(indices)
X_train = X_train[indices]
y_train = np.array(y_train)[indices]

# Calcul du nombre cible pour chaque classe
n_majority = (np.where(np.array(y_train) == 1)[0]) # nombre d'observations de la classe majoritaire
n_minority = (np.where(np.array(y_train) == -1)[0]) # nombre d'observations de la classe minoritaire
n_target = (n_majority.shape[0] + n_minority.shape[0]) // 2 # nombre cible pour chaque classe

# print(n_majority, n_minority, n_target)
r10 = RandomUnderSampler(sampling_strategy = {1:n_target}) # sur-échantillonnage de la classe minoritaire
X_train, y_train = r10.fit_resample(X_train, y_train)

r10 = RandomOverSampler() # sur-échantillonnage de la classe minoritaire
X_train, y_train = r10.fit_resample(X_train, y_train)

# # Rééchantillonnage des données de train avec SMOTEENN
# smote_enn = SMOTEENN(sampling_strategy=0.2)
# X_train, y_train = smote_enn.fit_resample(X_train, y_train)

# # Rééchantillonnage des données de train avec TomekLinks
# rus = TomekLinks(sampling_strategy='majority') # sous-échantillonnage de la classe majoritaire
# X_train, y_train = rus.fit_resample(X_train, y_train)

rus = RandomUnderSampler() # sous-échantillonnage de la classe majoritaire
# X_test, y_test = rus.fit_resample(X_test, y_test)


y_indice_C = np.where(np.array(y_test) == 1)[0]
y_indice_M = np.where(np.array(y_test) == -1)[0]
print(len(y_indice_C),len(y_indice_M))

# Modèles
models = {
    'Régression logistique': LogisticRegression(),
#     'SVM linéaire': LinearSVC(),
#     'SVM' : SVC(gamma='auto')
}

# Recherche des hyperparamètres optimaux
param_grid = {}

for name, model in models.items():
    if name == 'SVM linéaire':
        param_grid = {
            'C': [0.1, 1, 2, 3, 5, 10],
            'penalty': ['l2', 'l1'],
            'class_weight':[{-1:10, 1:1}]
        }
    elif name == 'Régression logistique':
        param_grid = {
            'C': [0.1, 1, 2, 5, 10],
            'penalty': ['l2'],
            # 'solver': ['liblinear'],
            'class_weight': ["balanced"]
        }
    grid_search = GridSearchCV(model, param_grid, cv=6)
    grid_search.fit(X_train, y_train)
    print(f"Meilleur score de validation croisée pour {name}: {grid_search.best_score_:.3f}")
    print(f"Meilleurs hyperparamètres pour {name}: {grid_search.best_params_}\n")


# evaluer le modèle
y_pred,_= evaluer_modele(vectorizer, grid_search, X_test, y_test, ['Mitterand', 'Chirac'])
y_prob = grid_search.predict_proba(X_test)

In [None]:
fname = "./datasets/AFDpresidentutf8/corpus.tache1.learn.utf8.txt"
alltxts,alllabs = load_pres(fname)

plt.figure()
plt.plot(list(range(len(alllabs[0:100]))),alllabs[0:100])

**Une méthode de post-traitement pour améliorer les résultats consistent à lisser les résultats de la prédictions d'une phrases par les prédictions voisines, en utilisant par exemple une convolution par une filtre Gaussien. Compléter la fonction ci-dessous et tester l'impact de ce lissage sur les performances.**

In [None]:
from scipy.signal import gaussian
def gaussian_smoothing(pred, size=11):
    # LISSAGE par un filtre Gaussien de taille size - vous pouvez utiliser np.convolve
    kernel = gaussian(size, 5)
    kernel = kernel / kernel.sum()
    acc = np.convolve(pred, kernel, mode='same')
    smoothed = np.where(acc < 0.5, 1, -1)
    return smoothed, acc
pred_proba, pred_binaire = gaussian_smoothing(y_prob[:,0], 10)
print(pred_proba)


In [None]:
# Précision sur l'ensemble de test avec le meilleur modèle
labels = ['Mitterand', 'Chirac']
best_model = grid_search.best_estimator_

accuracy =  accuracy_score(y_test, pred_proba)
# pred_prob = best_model.predict_proba(X_test)

print(f"Précision sur l'ensemble de test avec le meilleur modèle : {accuracy:.3f}\n")
# Matrice de confusion
conf_mat = confusion_matrix(y_test, pred_proba)
fig, ax = plt.subplots(figsize=(6, 6))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', cbar=False, ax=ax)
ax.set_xlabel('Prédit')
ax.set_ylabel('Vrai')
ax.set_title('Matrice de confusion')
ax.xaxis.set_ticklabels(labels)
ax.yaxis.set_ticklabels(labels)
plt.show()
# Rapport de classification
print(classification_report(y_test, pred_proba, target_names=labels))
# Courbe ROC
# Calculer les taux de vrais positifs et de faux positifs
fpr, tpr, thresholds = roc_curve(y_test,  1 - pred_binaire)

# Calculer l'AUC
auc = roc_auc_score(y_test,  1 - pred_binaire)

# Tracer la courbe ROC
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()


## 4) Estimer les performances de généralisation d'une méthodes
**Ce sera l'enjeu principal du projet : vous disposez d'un ensemble de données, et vous évaluerez les performances sur un ensemble de test auquel vous n'avez pas accès. Il faut donc être capable d'estimer les performances de généralisation du modèles à partir des données d'entraînement.**


Avant de lancer de grandes expériences, il faut se construire une base de travail solide en étudiant les questions suivantes:

- Combien de temps ça prend d'apprendre un classifieur NB/SVM/RegLog sur ces données en fonction de la taille du vocabulaire?
- La validation croisée est-elle nécessaire? Est ce qu'on obtient les mêmes résultats avec un simple *split*?
- La validation croisée est-elle stable? A partir de combien de fold (travailler avec différentes graines aléatoires et faire des statistiques basiques)?

### Variante 2 : uitlisation de Doc2Vec
#### **Avec un peu de chance on aura plus de performance 😏"**

In [None]:
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

# Création des documents pour Doc2Vec
# tagged_data = [TaggedDocument(words=preprocess_text(txt, lowercase=False, remove_digits=False, lemmatize=True), tags=[str(i)]) for i, txt in enumerate(alltxts_pres)]


# Entraînement du modèle Doc2Vec
model = Doc2Vec(vector_size=500, window=20,               
                min_count=5,                      
                sample=0.01, workers=3,
                cbow_mean=1, epochs=5)
model.build_vocab(tagged_data)
model.train(tagged_data, total_examples=model.corpus_count, epochs=model.epochs)

# Obtention des vecteurs de document
X_doc2vec = [model.infer_vector(preprocess_text(txt, lowercase=False, remove_digits=False, lemmatize=True).split()) for txt in alltxts_pres]

X_doc2vec = np.array(X_doc2vec)
print(X_doc2vec.shape)


In [None]:

# Division des données
Skf = StratifiedKFold(n_splits=5, shuffle=False)

fold_test = np.random.choice([0, 1, 2, 3, 4])
index_test = []
index_train = []

for i, (train_index, test_index) in enumerate(Skf.split(X_doc2vec, alllabs_pres)):
    if i == fold_test:
        index_test =  test_index.tolist()
        index_train = train_index.tolist()

X_train = np.array([X_doc2vec[i] for i in index_train])
y_train = [alllabs_pres[i] for i in index_train]

X_test = np.array([X_doc2vec[i] for i in index_test])
y_test = [alllabs_pres[i] for i in index_test]

# Calcul du nombre cible pour chaque classe
n_majority = (np.where(np.array(y_train) == 1)[0]) # nombre d'observations de la classe majoritaire
n_minority = (np.where(np.array(y_train) == -1)[0]) # nombre d'observations de la classe minoritaire
n_target = (n_majority.shape[0] + n_minority.shape[0]) // 2 # nombre cible pour chaque classe

# print(n_majority, n_minority, n_target)
r10 = RandomUnderSampler(sampling_strategy = {1:n_target}) # sur-échantillonnage de la classe minoritaire
X_train, y_train = r10.fit_resample(X_train, y_train)

r10 = RandomOverSampler() # sur-échantillonnage de la classe minoritaire
X_train, y_train = r10.fit_resample(X_train, y_train)

# Modèles
models = {
    'Régression logistique': LogisticRegression(),
#     'SVM linéaire': LinearSVC()
}

# Recherche des hyperparamètres optimaux
param_grid = {
    'C': [0.1, 1, 2, 5, 10],
    'penalty': ['l2'],
    'solver': ['liblinear'],
    'class_weight': ["balanced"]
}

for name, model in models.items():
    grid_search = GridSearchCV(model, param_grid, cv=4)
    grid_search.fit(X_train, y_train)
    print(f"Meilleur score de validation croisée pour {name}: {grid_search.best_score_:.3f}")
    print(f"Meilleurs hyperparamètres pour {name}: {grid_search.best_params_}\n")


In [None]:
# Évaluation du modèle
best_model = grid_search.best_estimator_
accuracy = best_model.score(X_test, y_test)
print(f"Accuracy du meilleur modèle sur l'ensemble de test: {accuracy:.3f}")

# Prédiction
y_pred = best_model.predict(X_test)
y_prob = grid_search.predict_proba(X_test)
 # Rapport de classification
print(classification_report(y_test, y_pred, target_names=["Mitterrand", "Chirac"]))

### Variante 3 :  Utilisation de Bert 
#### **je vais utiliser la librairie transformers de huggingface pour utiliser Bert et voir si on peut avoir une meilleure performance**

In [None]:
# Attention : ça prend du temps enorme pour entrainer le modèle (surtout si vous n'avez pas de GPU)

from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, TensorDataset
import torch
import os

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Division des données en ensembles d'entraînement et de test
Skf = StratifiedKFold(n_splits=5, shuffle=False)

fold_test = np.random.choice([0, 1, 2, 3, 4])
index_test = []
index_train = []

for i, (train_index, test_index) in enumerate(Skf.split(alltxts_pres, alllabs_pres)):
    if i == fold_test:
        index_test =  test_index.tolist()
        index_train = train_index.tolist()  


X_train =np.array(alltxts_pres)[index_train]
V = X_train.reshape(-1,1)
y_train = [alllabs_pres[i] for i in index_train]
r1s = RandomUnderSampler() # sous-échantillonnage de la classe majoritaire
X_train, y_train = r0s.fit_resample(V, y_train)
print(X_train.shape, len(y_train))
X_train = X_train.flatten()
print(X_train.shape, len(y_train))

X_train = [str(x) for x in X_train]
y_train = (np.array(y_train) + 1) // 2

print(len(X_train), len(y_train))

X_test = np.array(alltxts_pres)[index_test]
X_test = [str(x) for x in X_test]
y_test = [alllabs_pres[i] for i in index_test]
y_test = (np.array(y_test) + 1) // 2

# Chargement du tokenizer BERT
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Prétraitement des données d'entraînement
train_encodings = tokenizer(X_train, truncation=True, padding=True, return_tensors='pt')

# Prétraitement des données de test
test_encodings = tokenizer(X_test, truncation=True, padding=True, return_tensors='pt')

# Création des ensembles de données d'entraînement et de test
train_inputs, train_masks, train_labels = train_encodings['input_ids'], train_encodings['attention_mask'], torch.tensor(y_train)
test_inputs, test_masks, test_labels = test_encodings['input_ids'], test_encodings['attention_mask'], torch.tensor(y_test)

# Chargement du modèle BERT
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2) # Assuming binary classification

# Entraînement du modèle
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()
print("Entraînement du modèle")
for epoch in range(5): # Adjust number of epochs
    model.train()
    optimizer.zero_grad()
    train_inputs, train_masks, train_labels = train_inputs.to(device), train_masks.to(device), train_labels.to(device)
    train_labels = train_labels.long()
    outputs = model(train_inputs, attention_mask=train_masks, labels=train_labels)
    loss = outputs.loss
    loss.backward()
    optimizer.step()
    print(f"Époque {epoch + 1} terminée")

# Évaluation du modèle
model.eval()
with torch.no_grad():
    test_inputs, test_masks, test_labels = test_inputs.to(device), test_masks.to(device), test_labels.to(device)
    logits = model(test_inputs, attention_mask=test_masks).logits
    predicted_labels = torch.argmax(logits, dim=1)
    all_predicted_labels = predicted_labels.cpu().numpy()
    all_true_labels = test_labels.cpu().numpy()

precision = precision_score(all_true_labels, all_predicted_labels)
recall = recall_score(all_true_labels, all_predicted_labels)
f1 = f1_score(all_true_labels, all_predicted_labels)
accuracy = accuracy_score(all_true_labels, all_predicted_labels)

print(f"Précision : {precision:.3f}")
print(f"Rappel : {recall:.3f}")
print(f"F1-score : {f1:.3f}")
print(f"Accuracy : {accuracy:.3f}")
