Notebook Entrainement de modèle depuis les données mongoDB vers MLFLOW

In [47]:
import os
import json
import pymongo
import mlflow
import mlflow.sklearn
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from pyspark.sql.types import StructType, StructField, StringType, MapType, TimestampType
from transformers import DistilBertTokenizer
from transformers import DistilBertModel
import torch
import torch.nn as nn

In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Define a UDF to convert the string representation of the array to an actual array of floats
def parse_array(col):
    return [float(x) for x in col.strip('[]').split(', ')]

parse_array_udf = udf(parse_array, ArrayType(FloatType()))


# Create a Spark session with the MongoDB Spark Connector package
spark = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()


schema = StructType([
    StructField("user", StringType(), True),
    StructField("repo", StringType(), True),
    StructField("mainLanguage", StringType(), True),
    StructField("languages", MapType(StringType(), StringType()), True),
    StructField("readme", StringType(), True),
    StructField("processed_readme", StringType(), True),
    StructField("last_updated", TimestampType(), True),
])

df = spark.read \
    .format("mongo") \
    .option("database", "dev") \
    .option("collection", "raw_data") \
    .option("uri", "mongodb://mongo:27017/") \
    .schema(schema) \
    .load()

df = df.withColumn("processed_readme", parse_array_udf(df["processed_readme"]))

df.show()

ConnectionRefusedError: [Errno 111] Connection refused

In [46]:
from pyspark.sql.functions import size

# Get the length of each row in the processed_readme column
lengths_df = df.select(size(df["processed_readme"]).alias("length"))

# Show unique lengths
unique_lengths = lengths_df.distinct().collect()
unique_lengths = [row["length"] for row in unique_lengths]
print(f"Unique lengths of processed_readme arrays: {unique_lengths}")

ConnectionRefusedError: [Errno 111] Connection refused

In [16]:
num_lines = df.count()
print(f"Number of lines in the dataframe: {num_lines}")



Number of lines in the dataframe: 3555


                                                                                

In [17]:
# Charger les données depuis MongoDB
def load_data_from_mongo():
    data = df.select("processed_readme", "mainLanguage").collect()
    texts = [row["processed_readme"] for row in data]
    labels = [row["mainLanguage"] for row in data]
    return texts, labels


In [18]:
# Charger le tokenizer et le modèle DistilBERT
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
bert_model = DistilBertModel.from_pretrained("distilbert-base-uncased")

def encode_texts_with_bert(texts, tokenizer, model):
    """Tokenise et vectorise les textes avec DistilBERT."""
    encoded_texts = []
    
    for text in texts:
        inputs = tokenizer(text, padding='max_length', truncation=True, max_length=512, return_tensors="pt")
        with torch.no_grad():  # Pas besoin de calculer les gradients
            outputs = model(**inputs)
        
        # On prend le CLS token ([0, 0, :]) qui représente l'ensemble du texte
        sentence_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
        encoded_texts.append(sentence_embedding)
    
    return np.array(encoded_texts)

# Charger et préparer les données
texts, labels = load_data_from_mongo()
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)



                                                                                

In [19]:
# Transformer les textes en embeddings DistilBERT
#document_vectors = encode_texts_with_bert(texts, tokenizer, bert_model)

# Division des données en train/test
#X_train, X_test, y_train, y_test = train_test_split(document_vectors, encoded_labels, test_size=0.2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(texts, encoded_labels, test_size=0.2, random_state=42)


In [20]:
#Entraînement du classifieur
classifier = RandomForestClassifier(n_estimators=100, random_state=42)
classifier.fit(X_train, y_train)



In [21]:
#Évaluation du modèle
train_score = classifier.score(X_train, y_train)
test_score = classifier.score(X_test, y_test)

print(f"Train Accuracy: {train_score:.4f}")
print(f"Test Accuracy: {test_score:.4f}")



Train Accuracy: 1.0000
Test Accuracy: 0.4684


In [32]:
#Enregistrement dans MLflow
mlflow.set_tracking_uri("http://mlflow:8080")

with mlflow.start_run():
    mlflow.sklearn.log_model(classifier, "random_forest_model")
    mlflow.log_param("model_type", "RandomForestClassifier")
    mlflow.log_metric("train_accuracy", train_score)
    mlflow.log_metric("test_accuracy", test_score)

print("Modèle entraîné et enregistré dans MLflow")



🏃 View run clean-slug-695 at: http://mlflow:8080/#/experiments/0/runs/e2bd7bc366134162be735c253f8796fb
🧪 View experiment at: http://mlflow:8080/#/experiments/0
Modèle entraîné et enregistré dans MLflow
