<p style="font-family: Arial; font-size:3.75em;color:purple; font-style:bold"><br>
Introduction



1. <strong>Objective</strong>: The goal of this analysis was to determine the most effective model and feature extraction technique for sentiment analysis.

2. <strong>Feature Extraction Evaluation</strong>: We evaluated different feature extraction methods, including:
   - <strong>Hashing TF-IDF + 1-Gram</strong>
   - <strong>CountVectorizer TF-IDF + 1-Gram</strong>
   - More complex features such as <strong>1-2-3-Grams</strong> and <strong>ChiSqSelector</strong>.

3. <strong>Model Comparison</strong>: We compared the performance of the following models:
   - <strong>Logistic Regression</strong>
   - <strong>Naive Bayes</strong>
   - <strong>SVM</strong>
   - The comparison focused on precision across each scenario.

4. <strong>MLflow Integration</strong>: We used <strong>MLflow</strong> to track our machine learning models, enabling us to:
   - Record experiment metrics
   - Identify the best combination of feature extraction method and model
   - Optimize predictive accuracy through detailed experiment tacking.

</p><br>


## Imports

In [None]:
import findspark
findspark.init()

# PySpark
import pyspark
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, NumericType
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, StopWordsRemover, VectorAssembler
)
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import udf, col, count, when, isnull, countDistinct, length

# Bibliothèques générales
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
from datetime import datetime
import re

# NLTK pour le traitement du texte
from nltk.stem import SnowballStemmer, WordNetLemmatizer
from nltk.corpus import stopwords

# MLFlow
import mlflow
import mlflow.spark

# Wordcloud
from wordcloud import WordCloud

In [None]:
mlflow.set_tracking_uri("http://127.0.0.1:1234")

## Variables de contexte

In [None]:
spark1 = SparkSession.builder\
            .master("local[*]")\
            .appName("Sentiment_Analysis")\
            .getOrCreate()

In [None]:
path = "./data.csv"  # Path dataset

## Chargement du dataset et séparation train/test

In [None]:
# Data schema
schema = StructType([StructField("Sentence", StringType(), True),
    StructField("Sentiment", StringType(), True)])

In [None]:
df = spark1.read.csv(path,
                     inferSchema=True, # Spark uses the defined schema
                     header=True,
                     schema=schema)

In [None]:
# Compter les valeurs manquantes par colonne
missing_values = df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns])
missing_values.show()

In [None]:
df.dropna()

In [None]:
# Création de la nouvelle colonne 'target'
df = df.withColumn(
    "target",
    F.when(df["Sentiment"] == "neutral", 0)
     .when(df["Sentiment"] == "negative", -1)
     .when(df["Sentiment"] == "positive", 1)
     .otherwise(None)  # Gérer les autres valeurs (qui seront ignorées dans le filtrage)
)
# Création de la nouvelle colonne 'target'
# Filtrer le DataFrame pour ne garder que les valeurs "positive", "negative" et "neutral"
df = df.filter(df["Sentiment"].isin("positive", "negative", "neutral"))

# Afficher les valeurs distinctes
df.select("Sentiment", "target").distinct().show()

In [None]:
# Initialiser le Lemmatizer et le Stemmer
lemmatizer = WordNetLemmatizer()
stemmer = SnowballStemmer("english")
STOPWORDS = set(stopwords.words("english"))
STOPWORDS.update(['rt', 'mkr', 'didn', 'bc', 'n', 'm', 'im', 'll', 'y', 've', 
                  'u', 'ur', 'don', 'p', 't', 's', 'aren', 'kp', 'o', 'kat', 
                  'de', 're', 'amp', 'will'])

# Fonction de nettoyage du texte
def clean_text(df, field):
    # Remplacement des éléments à l'aide de F.regexp_replace
    df = df.withColumn(field, F.regexp_replace(df[field], r"http\S+", " "))  # Remplacer les URLs
    df = df.withColumn(field, F.regexp_replace(df[field], r"http", " "))
    df = df.withColumn(field, F.regexp_replace(df[field], r"@", "at"))
    df = df.withColumn(field, F.regexp_replace(df[field], r"#\S+", " "))  # Remplacer les hashtags
    df = df.withColumn(field, F.regexp_replace(df[field], r"[^A-Za-z(),!?@\'\"_\n]", " "))  # Suppression des caractères spéciaux
    df = df.withColumn(field, F.lower(df[field]))  # Convertir en minuscules
    df = df.filter(col("Sentiment").isin(["negative", "positive", "neutral"]))
    return df

In [None]:
df_cleaned = clean_text(df, 'Sentence') 
df_pandas = df_cleaned.select('Sentiment').toPandas()

# Tracer un countplot
sns.countplot(x='Sentiment', data=df_pandas, palette='coolwarm')
plt.title('Répartition des Sentiments')
plt.show()

In [None]:
# Ajouter une colonne avec la longueur des phrases
df_cleaned = df_cleaned.withColumn('Sentence_Length', length(col('Sentence')))

# Calculer la longueur moyenne des phrases par sentiment
sentiment_length = df_cleaned.groupBy('Sentiment').agg({'Sentence_Length': 'avg'})

# Renommer la colonne pour qu'elle soit plus facile à utiliser
sentiment_length = sentiment_length.withColumnRenamed('avg(Sentence_Length)', 'Avg_Sentence_Length')

# Convertir en Pandas pour l'utiliser avec Seaborn
sentiment_length_pd = sentiment_length.toPandas()

# Créer un boxplot pour la longueur des phrases par sentiment
import seaborn as sns
import matplotlib.pyplot as plt

sns.boxplot(x='Sentiment', y='Avg_Sentence_Length', data=sentiment_length_pd)
plt.title('Longueur des phrases par sentiment')
plt.xlabel('Sentiment')
plt.ylabel('Longueur moyenne de la phrase')
plt.show()

In [None]:
# Extraction des sentences
sentences = df.select('Sentence').rdd.flatMap(lambda x: x).collect()

# Générer le nuage de mots
text = ' '.join(sentences)
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text)

plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
# Filtrer les données pour chaque sentiment
positive_sentences = df_cleaned.filter(df_cleaned['Sentiment'] == 'positive').select('Sentence').rdd.flatMap(lambda x: x).collect()
negative_sentences = df_cleaned.filter(df_cleaned['Sentiment'] == 'negative').select('Sentence').rdd.flatMap(lambda x: x).collect()
neutral_sentences = df_cleaned.filter(df_cleaned['Sentiment'] == 'neutral').select('Sentence').rdd.flatMap(lambda x: x).collect()

# Créer des word clouds pour chaque sentiment
positive_wordcloud = WordCloud().generate(' '.join(positive_sentences))
negative_wordcloud = WordCloud().generate(' '.join(negative_sentences))
neutral_wordcloud = WordCloud().generate(' '.join(neutral_sentences))

# Afficher les word clouds
plt.figure(figsize=(10, 7))
plt.subplot(1, 3, 1)
plt.imshow(positive_wordcloud, interpolation='bilinear')
plt.title('Positive Sentiment')
plt.axis('off')

plt.subplot(1, 3, 2)
plt.imshow(negative_wordcloud, interpolation='bilinear')
plt.title('Negative Sentiment')
plt.axis('off')

plt.subplot(1, 3, 3)
plt.imshow(neutral_wordcloud, interpolation='bilinear')
plt.title('Neutral Sentiment')
plt.axis('off')

plt.show()

In [None]:
# Conversion en Pandas DataFrame
df_pandas = df_cleaned.select('Sentiment', 'Sentence_length').toPandas()

# Tracer un boxplot
sns.boxplot(x='Sentiment', y='Sentence_length', data=df_pandas)
plt.title('Longueur des Phrases par Sentiment')
plt.show()

<h1>Dataset statistics</h1>

In [None]:
# Nombre de variables (colonnes)
num_variables = len(df.columns)

# Nombre d'observations (lignes)
num_observations = df.count()

# Nombre de cellules manquantes
missing_cells = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).collect()
missing_cells_total = sum([row[c] for row in missing_cells for c in df.columns])

# Pourcentage de cellules manquantes
missing_cells_percentage = (missing_cells_total / (num_observations * num_variables)) * 100

# Nombre de lignes dupliquées
duplicate_rows = df.count() - df.distinct().count()

# Pourcentage de lignes dupliquées
duplicate_rows_percentage = (duplicate_rows / num_observations) * 100

# Taille totale en mémoire du DataFrame
total_size_memory = df.rdd.map(lambda row: sum([len(str(cell)) for cell in row])).sum()

# Taille moyenne d'un enregistrement
average_record_size = total_size_memory / num_observations

# Affichage des résultats
print(f"Number of variables: {num_variables}")
print(f"Number of observations: {num_observations}")
print(f"Missing cells: {missing_cells_total}")
print(f"Missing cells (%): {missing_cells_percentage:.1f}%")
print(f"Duplicate rows: {duplicate_rows}")
print(f"Duplicate rows (%): {duplicate_rows_percentage:.1f}%")
print(f"Total size in memory: {total_size_memory / 1024:.1f} KiB")
print(f"Average record size in memory: {average_record_size:.1f} B")

In [None]:
duplicate_rows = df.count() - df.dropDuplicates().count()  # Nombre de doublons
duplicate_rows_percentage = (duplicate_rows / num_observations) * 100

# Données pour le graphique
duplicate_data = {'Category': ['Duplicate Rows', 'Duplicate Rows (%)'], 'Count': [duplicate_rows, duplicate_rows_percentage]}

# Création du graphique
sns.barplot(x='Category', y='Count', data=duplicate_data)
plt.title('Doublons dans les Données')
plt.show()

In [None]:
total_size_memory = df.rdd.map(lambda x: len(str(x))).sum()  # Estimation de la taille totale en mémoire
average_record_size = total_size_memory / num_observations

# Données pour le graphique
size_data = {'Category': ['Total Size in Memory (KiB)', 'Average Record Size (B)'], 'Size': [total_size_memory / 1024, average_record_size]}

# Création du graphique
sns.barplot(x='Category', y='Size', data=size_data)
plt.title('Taille en Mémoire des Données')
plt.show()

<h1>Type de la variable</h1>

In [None]:
columns_types = df.dtypes

# Compter le nombre de variables numériques (NumericType)
num_numeric_variables = len([col for col, dtype in columns_types if isinstance(df.schema[col].dataType, NumericType)])

# Compter le nombre de variables catégorielles (StringType)
num_categorical_variables = len([col for col, dtype in columns_types if isinstance(df.schema[col].dataType, StringType)])

# Afficher les résultats
print(f"Number of numeric variables: {num_numeric_variables}")
print(f"Number of categorical variables: {num_categorical_variables}")

In [None]:
def pre_process(text):
    # Remove links
    text = re.sub('http://\S+|https://\S+', '', text)
    text = re.sub('http[s]?://\S+', '', text)
    text = re.sub(r"http\S+", "", text)

    # Convert HTML references
    text = re.sub('&amp', 'and', text)
    text = re.sub('&lt', '<', text)
    text = re.sub('&gt', '>', text)

    # Remplacer certaines contractions par leurs formes complètes
    text = re.sub(r"won\'t", "will not", text)
    text = re.sub(r"can\'t", "can not", text)
    text = re.sub(r"n\'t", " not", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'s", " is", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'t", " not", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'m", " am", text)

    # Remplacer les caractères non alphabétiques par un espace
    text = re.sub('[^a-zA-Z]', ' ', text)
    text = re.sub(r'[^\x00-\x7f]+', '', text)  # Supprimer les caractères non-ASCII
    
    
    # Remove new line characters
    text = re.sub('[\r\n]+', ' ', text)
    
    # Remove mentions
    text = re.sub(r'@\w+', '', text)
    
    # Remove hashtags
    text = re.sub(r'#\w+', '', text)

    # Remove multiple space characters
    text = re.sub('\s+',' ', text)
    
    # Convert to lowercase
    text = text.lower()
    return text

In [None]:
pre_process_udf = udf(pre_process, StringType())

In [None]:
df = df.withColumn("Processed_Sentence", pre_process_udf(df["Sentence"]))

In [None]:
df.select("Processed_Sentence").show(5)

In [None]:
df.columns

In [None]:
(train_set, test_set) = df.randomSplit([0.80, 0.20], seed = 2000)

In [None]:
# Vérifier les tailles des ensembles
print(f"Ensemble d'entraînement: {train_set.count()} exemples")
print(f"Ensemble de test: {test_set.count()} exemples")

## Logistic Regression

## HashingTF - IDF (paramètres par défaut)

In [None]:
# Commencez une session MLflow
with mlflow.start_run():

    # Configuration du modèle
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features")

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    lr = LogisticRegression()
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Pipeline
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

    # Enregistrez les hyperparamètres du modèle dans MLflow
    mlflow.log_param("tokenizer", "Tokenizer")
    mlflow.log_param("hashtf", "HashingTF")
    mlflow.log_param("idf", "IDF")
    mlflow.log_param("classifier", "LogisticRegression")

    # Temps d'entraînement
    start_time = datetime.utcnow()

    # Entraînement du modèle
    pipelineFit = pipeline.fit(train_set)

    training_time = datetime.utcnow() - start_time
    print('Training time:', training_time)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrez les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrez le modèle entraîné dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Enregistrez également le temps d'entraînement
    mlflow.log_metric("training_time_seconds", training_time.total_seconds())

## HashingTF - IDF (paramètres customisés)

In [None]:
# Commencez une session MLflow
with mlflow.start_run():

    # Configuration du modèle avec HashingTF - IDF (paramètres customisés)
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)  # minDocFreq: remove sparse terms

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    lr = LogisticRegression()
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Pipeline
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

    # Enregistrez les hyperparamètres du modèle dans MLflow
    mlflow.log_param("tokenizer", "Tokenizer")
    mlflow.log_param("hashtf", "HashingTF with custom params (numFeatures=2^16, minDocFreq=5)")
    mlflow.log_param("idf", "IDF with minDocFreq=5")
    mlflow.log_param("classifier", "LogisticRegression")

    # Temps d'entraînement
    start_time = datetime.utcnow()

    # Entraînement du modèle
    pipelineFit1 = pipeline.fit(train_set)

    training_time = datetime.utcnow() - start_time
    print('Training time:', training_time)

    # Prédictions
    predictions = pipelineFit1.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrez les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrez le modèle entraîné dans MLflow
    mlflow.spark.log_model(pipelineFit1, "model")

    # Enregistrez également le temps d'entraînement
    mlflow.log_metric("training_time_seconds", training_time.total_seconds())


## CountVectorizer - IDF (paramètres customisés)

In [None]:
# Commencez une session MLflow
with mlflow.start_run():

    # Configuration du modèle avec CountVectorizer - IDF (paramètres customisés)
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)  # minDocFreq: remove sparse terms

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    lr = LogisticRegression()
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

    # Enregistrez les hyperparamètres du modèle dans MLflow
    mlflow.log_param("tokenizer", "Tokenizer")
    mlflow.log_param("cv", "CountVectorizer with custom params (vocabSize=2^16, minDocFreq=5)")
    mlflow.log_param("idf", "IDF with minDocFreq=5")
    mlflow.log_param("classifier", "LogisticRegression")

    # Temps d'entraînement
    start_time = datetime.utcnow()

    # Entraînement du modèle
    pipelineFit3 = pipeline.fit(train_set)

    training_time = datetime.utcnow() - start_time
    print('Training time:', training_time)

    # Prédictions
    predictions = pipelineFit3.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrez les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrez le modèle entraîné dans MLflow
    mlflow.spark.log_model(pipelineFit3, "model")

    # Enregistrez également le temps d'entraînement
    mlflow.log_metric("training_time_seconds", training_time.total_seconds())

## CountVectorizer + NGram + ChisQSelector

In [None]:
def build_trigrams(inputCol="Processed_Sentence", targetCol="target", n=3):
    tokenizer = Tokenizer(inputCol=inputCol, outputCol="words")
    
    # NGram transformations for 1, 2, ..., n grams
    ngrams = [NGram(n=i, inputCol="words", outputCol=f"{i}_grams") for i in range(1, n + 1)]
    
    # CountVectorizer and IDF for each n-gram
    cv = [CountVectorizer(vocabSize=2**14, inputCol=f"{i}_grams", outputCol=f"{i}_tf") for i in range(1, n + 1)]
    idf = [IDF(inputCol=f"{i}_tf", outputCol=f"{i}_tfidf", minDocFreq=5) for i in range(1, n + 1)]
    
    # Assembler to combine all tfidf features into a single vector
    assembler = VectorAssembler(inputCols=[f"{i}_tfidf" for i in range(1, n + 1)], outputCol="rawFeatures")
    
    # Label encoding for the target column
    label_stringIdx = StringIndexer(inputCol=targetCol, outputCol="label")
    
    # Chi-Square feature selection
    selector = ChiSqSelector(numTopFeatures=2**14, featuresCol="rawFeatures", outputCol="features")
    
    # Logistic Regression classifier
    lr = LogisticRegression()
    
    # Pipeline
    pipeline = Pipeline(stages=[tokenizer] + ngrams + cv + idf + [assembler] + [label_stringIdx] + [selector] + [lr])
    
    return pipeline

In [None]:
# Suivi avec MLflow
with mlflow.start_run():
    # Définir le pipeline
    pipeline = build_trigrams()

    # Enregistrez les paramètres du modèle
    mlflow.log_param("inputCol", "Processed_Sentence")
    mlflow.log_param("targetCol", "target")
    mlflow.log_param("nGrams", 3)
    mlflow.log_param("vocabSize", 2**14)
    mlflow.log_param("minDocFreq", 5)

    # Démarrer l'entraînement
    start_time = datetime.utcnow()
    pipelineFit4 = pipeline.fit(train_set)
    training_time = datetime.utcnow() - start_time
    print(f'Training time: {training_time}')

    # Prédictions
    predictions = pipelineFit4.transform(test_set)
    
    # Calcul des métriques
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Afficher les résultats
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    
    # Enregistrer le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit4, "model")
    
    # Enregistrer le temps d'entraînement dans MLflow
    mlflow.log_metric("training_time_seconds", training_time.total_seconds())

## Metrics with best model

In [None]:
# Définir la fonction pour construire les trigrammes
def build_trigrams(inputCol=["Processed_Sentence","target"], n=3):
    
    tokenizer = [Tokenizer(inputCol="Processed_Sentence", outputCol="words")]
    
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14, inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    
    selector = [ChiSqSelector(numTopFeatures=2**14, featuresCol='rawFeatures', outputCol="features")]
    
    lr = [LogisticRegression(regParam = 0.1, maxIter = 1000, elasticNetParam = 0.0)]
    
    return Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + selector + lr)

In [None]:
# Commencez une session MLflow
with mlflow.start_run():
    
    # Loggez les hyperparamètres utilisés
    mlflow.log_param("ngrams", "Trigrams")
    mlflow.log_param("cv_vocab_size", 2**14)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("regParam", 0.1)
    mlflow.log_param("maxIter", 1000)
    mlflow.log_param("elasticNetParam", 0.0)

    # Démarrer le processus de construction et d'entraînement du modèle
    start = time.time()

    pipelineFit5 = build_trigrams().fit(train_set)

    # Prédictions
    predictions = pipelineFit5.transform(test_set)

    # Calcul des métriques
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle entraîné dans MLflow
    mlflow.spark.log_model(pipelineFit5, "model")

    # Loggez le temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)

    mlflow.log_metric("training_time_seconds", training_time)

<h1>Naive Bayes</h1>

## HashingTF - IDF (paramètres par défaut)

In [None]:
# Commencez une session MLflow
with mlflow.start_run():

    # Loggez les hyperparamètres utilisés
    mlflow.log_param("hashingTF_inputCol", "words")
    mlflow.log_param("hashingTF_outputCol", "tf")
    mlflow.log_param("idf_inputCol", "tf")
    mlflow.log_param("idf_outputCol", "features")
    mlflow.log_param("naiveBayes_modelType", "multinomial")

    # Définir les étapes du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features")

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    nb = NaiveBayes(modelType="multinomial")
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, nb])

    # Démarrer le processus de construction et d'entraînement du modèle
    start = time.time()

    pipelineFit = pipeline.fit(train_set)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle entraîné dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Loggez le temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)

    mlflow.log_metric("training_time_seconds", training_time)

## HashingTF - IDF (paramètres customisés)

In [None]:
# Démarrez une session MLflow
with mlflow.start_run():

    # Loggez les hyperparamètres
    mlflow.log_param("hashingTF_numFeatures", 2**16)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("naiveBayes_modelType", "multinomial")

    # Définir les étapes du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    nb = NaiveBayes(modelType="multinomial")
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, nb])

    # Démarrer le processus d'entraînement
    start = time.time()

    pipelineFit = pipeline.fit(train_set)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Loggez le temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)

    mlflow.log_metric("training_time_seconds", training_time)

## CountVectorizer - IDF (paramètres par défaut)

In [None]:
# Démarrez une session MLflow
with mlflow.start_run():

    # Loggez les hyperparamètres
    mlflow.log_param("cv_vocabSize", 2**16)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("naiveBayes_modelType", "multinomial")

    # Définir les étapes du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    cv = CountVectorizer(inputCol="words", outputCol='cv')
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    nb = NaiveBayes(modelType="multinomial")
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, nb])

    # Démarrer le processus d'entraînement
    start = time.time()

    pipelineFit = pipeline.fit(train_set)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Loggez le temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)

    mlflow.log_metric("training_time_seconds", training_time)

## CountVectorizer - IDF (paramètres customisés)

In [None]:
# Démarrez une session MLflow
with mlflow.start_run():

    # Loggez les hyperparamètres
    mlflow.log_param("cv_vocabSize", 2**16)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("naiveBayes_modelType", "multinomial")

    # Définir les étapes du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    nb = NaiveBayes(modelType="multinomial")
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, nb])

    # Démarrer le processus d'entraînement
    start = time.time()

    pipelineFit = pipeline.fit(train_set)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Enregistrer les métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Loggez le temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)

    mlflow.log_metric("training_time_seconds", training_time)

# CountVectorizer + NGram + ChisQSelector

In [None]:
def build_trigrams(inputCol=["Processed_Sentence","target"], n=3):
    
    # Tokenizer for the sentences
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    
    # Generating NGrams from 1 to n
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]
    
    # CountVectorizer for each nGram
    cv = [
        CountVectorizer(vocabSize=2**14, inputCol="{0}_grams".format(i), outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    
    # IDF for each count vectorizer
    idf = [
        IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5)
        for i in range(1, n + 1)
    ]
    
    # VectorAssembler to combine all features
    assembler = VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )
    
    # StringIndexer for the target variable
    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")
    
    # Chi-squared selector for feature selection
    selector = ChiSqSelector(numTopFeatures=2**14, featuresCol='rawFeatures', outputCol="features")
    
    # NaiveBayes classifier
    nb = NaiveBayes(modelType="multinomial", labelCol="label", featuresCol="features")
    
    # Combine all stages in the pipeline
    stages = [tokenizer] + ngrams + cv + idf + [assembler] + [label_stringIdx] + [selector] + [nb]
    
    # Return the pipeline
    return Pipeline(stages=stages)

In [None]:
# MLflow Integration
with mlflow.start_run():
    # Log hyperparameters
    mlflow.log_param("n_grams", 3)
    mlflow.log_param("cv_vocabSize", 2**14)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("chiSqSelector_numTopFeatures", 2**14)
    mlflow.log_param("naiveBayes_modelType", "multinomial")

    # Build and fit the pipeline
    pipeline = build_trigrams()
    
    start = time.time()
    pipelineFit = pipeline.fit(train_set)
    end = time.time()

    # Log training time
    training_time = end - start
    mlflow.log_metric("training_time_seconds", training_time)
    print("Training time:", training_time)

    # Make predictions
    predictions = pipelineFit.transform(test_set)

    # Evaluate metrics
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Print metrics
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Save the model to MLflow
    mlflow.spark.log_model(pipelineFit, "model")

## Cross-Validation 10-fold

In [None]:
# MLflow Integration
with mlflow.start_run():
    # Tokenizer
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    
    # CountVectorizer
    cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
    
    # IDF
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
    
    # StringIndexer
    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")
    
    # Naive Bayes
    nb = NaiveBayes(modelType="multinomial")
    
    # Pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, nb])
    
    # Param Grid for Cross-Validation
    param_grid = ParamGridBuilder() \
        .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
        .build()
    
    # CrossValidator
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    cv_model = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=10
    )
    
    # Log hyperparameters
    mlflow.log_param("cv_vocabSize", 2**16)
    mlflow.log_param("idf_minDocFreq", 5)
    mlflow.log_param("naiveBayes_modelType", "multinomial")
    mlflow.log_param("cv_numFolds", 10)
    mlflow.log_param("nb_smoothing_values", [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
    
    # Fit the model
    start = time.time()
    pipelineFit = cv_model.fit(train_set)
    end = time.time()
    
    # Log training time
    training_time = end - start
    mlflow.log_metric("training_time_seconds", training_time)
    print("Training time:", training_time)
    
    # Make predictions
    predictions = pipelineFit.transform(test_set)
    
    # Evaluate metrics
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    
    # Best model
    best_model = pipelineFit.bestModel
    best_params = pipelineFit.getEstimatorParamMaps()[np.argmax(pipelineFit.avgMetrics)]
    
    # Log best parameters and model
    mlflow.log_param("best_params", best_params)
    mlflow.spark.log_model(best_model, "best_model")
    
    # Print metrics and best parameters
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("Best Parameters:", best_params)

<h1>SVM</h1>

## HashingTF - IDF (paramètres par défaut)

In [None]:
# Démarrer une session MLflow
with mlflow.start_run():

    # Log des hyperparamètres
    mlflow.log_param("hashingTF_numFeatures", "default")  # Par défaut, HashingTF utilise 2^20 features
    mlflow.log_param("idf_minDocFreq", "default")  # Si non défini, la valeur par défaut est 0
    mlflow.log_param("svm_maxIter", 100)
    mlflow.log_param("svm_regParam", 0.01)

    # Définir les étapes du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features")

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    # Définir le classifieur SVM et OneVsRest
    svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01)
    ovr = OneVsRest(classifier=svm)

    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, ovr])

    # Démarrer le processus d'entraînement
    start = time.time()

    pipelineFit = pipeline.fit(train_set)

    # Faire des prédictions
    predictions = pipelineFit.transform(test_set)

    # Calculer les métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Afficher les résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Log des métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Enregistrer le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

    # Log du temps d'entraînement
    end = time.time()
    training_time = end - start
    print("Training time:", training_time)
    mlflow.log_metric("training_time_seconds", training_time)

## HashingTF - IDF (paramètres customisés)

In [None]:
num_features = 2**16  # Nombre de caractéristiques pour HashingTF
min_doc_freq = 5      # Fréquence minimale des documents pour IDF

# Démarrer une session MLflow
with mlflow.start_run():

    # Log des hyperparamètres
    mlflow.log_param("hashingTF_numFeatures", num_features)
    mlflow.log_param("idf_minDocFreq", min_doc_freq)

    # Construction du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    hashtf = HashingTF(numFeatures=num_features, inputCol="words", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features", minDocFreq=min_doc_freq)

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    # Définir le classifieur
    svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01)
    ovr = OneVsRest(classifier=svm)

    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, ovr])

    # Entraînement du modèle
    start = time.time()
    pipelineFit = pipeline.fit(train_set)
    end = time.time()
    training_time = end - start
    mlflow.log_metric("training_time_seconds", training_time)

    # Prédictions sur le jeu de test
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Log des métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Imprimer les résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Sauvegarder le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

## CountVectorizer - IDF (paramètres par défaut)

In [None]:
# Démarrer une session MLflow
with mlflow.start_run():
    # Log des hyperparamètres (par défaut pour CountVectorizer et IDF)
    mlflow.log_param("CountVectorizer_vocabularySize", "default")
    mlflow.log_param("IDF_minDocFreq", "default")

    # Construction du pipeline
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    cv = CountVectorizer(inputCol="words", outputCol='cv')  # Paramètres par défaut
    idf = IDF(inputCol='cv', outputCol="features")          # Paramètres par défaut

    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    # Définir le classifieur
    svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01)
    ovr = OneVsRest(classifier=svm)

    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Construire le pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, ovr])

    # Entraînement du modèle
    start = time.time()
    pipelineFit = pipeline.fit(train_set)
    end = time.time()
    training_time = end - start
    mlflow.log_metric("training_time_seconds", training_time)

    # Prédictions sur le jeu de test
    predictions = pipelineFit.transform(test_set)

    # Calcul des métriques
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Log des métriques dans MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Imprimer les résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Sauvegarder le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")


## CountVectorizer - IDF (paramètres customisés)

In [None]:
# Démarrer une session MLflow
with mlflow.start_run():
    # Log des hyperparamètres personnalisés
    mlflow.log_param("CountVectorizer_vocabSize", 2**16)
    mlflow.log_param("IDF_minDocFreq", 5)

    # Tokenization
    tokenizer = Tokenizer(inputCol="Processed_Sentence", outputCol="words")
    
    # CountVectorizer avec des paramètres personnalisés
    cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
    
    # IDF avec suppression des termes rares
    idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)

    # Conversion de la cible en index numérique
    label_stringIdx = StringIndexer(inputCol="target", outputCol="label")

    # Classifieur SVM pour One-vs-Rest
    svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01)
    ovr = OneVsRest(classifier=svm)

    # Évaluateur pour les métriques
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    # Pipeline
    pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, ovr])

    # Entraînement du modèle
    start = time.time()
    pipelineFit = pipeline.fit(train_set)
    end = time.time()
    training_time = end - start
    mlflow.log_metric("training_time_seconds", training_time)

    # Prédictions
    predictions = pipelineFit.transform(test_set)

    # Évaluation des performances
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

    # Log des métriques
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)

    # Sauvegarder le modèle dans MLflow
    mlflow.spark.log_model(pipelineFit, "model")

# CountVectorizer + NGram + ChisQSelector

In [None]:
def build_trigrams(inputCol=("Processed_Sentence", "target"), n=3):
    # Tokenization
    tokenizer = Tokenizer(inputCol=inputCol[0], outputCol="words")
    
    # Generate n-grams, CountVectorizer, and IDF for each n
    ngrams = [
        NGram(n=i, inputCol="words", outputCol=f"{i}_grams")
        for i in range(1, n + 1)
    ]
    
    cv = [
        CountVectorizer(vocabSize=2**14, inputCol=f"{i}_grams", outputCol=f"{i}_tf")
        for i in range(1, n + 1)
    ]
    
    idf = [
        IDF(inputCol=f"{i}_tf", outputCol=f"{i}_tfidf", minDocFreq=5)
        for i in range(1, n + 1)
    ]
    
    # Assemble all TF-IDF features into a single feature vector
    assembler = VectorAssembler(
        inputCols=[f"{i}_tfidf" for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )
    
    # Index the label column
    label_stringIdx = StringIndexer(inputCol=inputCol[1], outputCol="label")
    
    # Feature selection
    selector = ChiSqSelector(
        numTopFeatures=2**14, featuresCol="rawFeatures", outputCol="features"
    )
    
    # SVM with One-vs-Rest for multiclass classification
    svm = LinearSVC(featuresCol="features", labelCol="label")
    ovr = OneVsRest(classifier=svm)
    
    # Return the constructed pipeline
    return Pipeline(stages=[tokenizer] + ngrams + cv + idf + [assembler, label_stringIdx, selector, ovr])

In [None]:
# Suivi avec MLflow
with mlflow.start_run():
    # Log des hyperparamètres
    mlflow.log_param("ngram_range", 3)
    mlflow.log_param("CountVectorizer_vocabSize", 2**14)
    mlflow.log_param("IDF_minDocFreq", 5)
    mlflow.log_param("ChiSqSelector_numTopFeatures", 2**14)

    # Construction et entraînement du pipeline
    start_time = time.time()
    pipeline = build_trigrams()
    pipelineFit = pipeline.fit(train_set)
    training_time = time.time() - start_time
    mlflow.log_metric("training_time_seconds", training_time)
    
    # Prédictions
    predictions = pipelineFit.transform(test_set)
    
    # Évaluation des performances
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    
    # Log des métriques
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    
    # Affichage des résultats
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    
    # Sauvegarder le modèle
    mlflow.spark.log_model(pipelineFit, "model")