## Chargement des données depuis HDFS

In [27]:
from pyspark.sql import SparkSession

# Créer une session Spark
spark = SparkSession.builder.appName("Data Exploration").getOrCreate()

# Lire le fichier CSV depuis HDFS
df = spark.read.csv("hdfs://192.168.111.130:9000/ilyas/hadoop/archives/listplayers.csv", header=True)

# Afficher les premières lignes du DataFrame
df.show(5)

# Afficher la structure des colonnes du DataFrame
df.printSchema()


+---+---------------+
|_c0|    Name_Player|
+---+---------------+
|  0|Hernán Galíndez|
|  1|   Félix Torres|
|  2| Piero Hincapié|
|  3|Robert Arboleda|
|  4| José Cifuentes|
+---+---------------+
only showing top 5 rows

root
 |-- _c0: string (nullable = true)
 |-- Name_Player: string (nullable = true)



## Préparation des données (Création de colonnes supplémentaires)

In [28]:
from pyspark.sql.functions import col, when

# Convertir la colonne '_c0' en entier (car elle est en string actuellement)
df = df.withColumn("_c0", col("_c0").cast("int"))

# Créer une colonne "file_size" (valeurs fictives en multipliant par 100)
df = df.withColumn("file_size", col("_c0") * 100)

# Créer une colonne "archive_label" en fonction de l'index (_c0 pair = 1, sinon 0)
df = df.withColumn("archive_label", when(df['_c0'] % 2 == 0, 1).otherwise(0))

# Afficher les premières lignes pour vérifier les nouvelles colonnes
df.show(5)


+---+---------------+---------+-------------+
|_c0|    Name_Player|file_size|archive_label|
+---+---------------+---------+-------------+
|  0|Hernán Galíndez|        0|            1|
|  1|   Félix Torres|      100|            0|
|  2| Piero Hincapié|      200|            1|
|  3|Robert Arboleda|      300|            0|
|  4| José Cifuentes|      400|            1|
+---+---------------+---------+-------------+
only showing top 5 rows



## Modélisation avec Régression Logistique

In [29]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Sélectionner les colonnes "file_size" comme features pour le modèle
assembler = VectorAssembler(inputCols=["file_size"], outputCol="features")
data = assembler.transform(df)

# Diviser les données en train et test
train, test = data.randomSplit([0.8, 0.2], seed=12345)

# Créer le modèle de régression logistique
lr = LogisticRegression(featuresCol="features", labelCol="archive_label")
model = lr.fit(train)

# Tester le modèle
predictions = model.transform(test)
predictions.select("features", "archive_label", "prediction").show()


+---------+-------------+----------+
| features|archive_label|prediction|
+---------+-------------+----------+
|  [600.0]|            1|       1.0|
| [2400.0]|            1|       0.0|
| [2500.0]|            0|       0.0|
| [2900.0]|            0|       0.0|
| [3600.0]|            1|       0.0|
| [4700.0]|            0|       0.0|
| [5000.0]|            1|       0.0|
| [6700.0]|            0|       0.0|
| [9200.0]|            1|       0.0|
| [9500.0]|            0|       0.0|
| [9800.0]|            1|       0.0|
|[11200.0]|            1|       0.0|
|[11600.0]|            1|       0.0|
|[11900.0]|            0|       0.0|
|[12200.0]|            1|       0.0|
|[12400.0]|            1|       0.0|
|[12600.0]|            1|       0.0|
|[12900.0]|            0|       0.0|
|[13300.0]|            0|       0.0|
|[13500.0]|            0|       0.0|
+---------+-------------+----------+
only showing top 20 rows



## Modélisation avec Forêt Aléatoire

In [30]:
from pyspark.ml.classification import RandomForestClassifier

# Créer et entraîner un modèle de forêt aléatoire
rf = RandomForestClassifier(featuresCol="features", labelCol="archive_label", numTrees=10)
rf_model = rf.fit(train)

# Tester le modèle
rf_predictions = rf_model.transform(test)

# Évaluer l'exactitude
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="archive_label", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Exactitude du modèle de forêt aléatoire : {rf_accuracy}")


Exactitude du modèle de forêt aléatoire : 0.4129032258064516


## Modélisation avec Gradient Boosting

In [31]:
from pyspark.ml.classification import GBTClassifier

# Gradient Boosted Trees Classifier
gbt = GBTClassifier(featuresCol="features", labelCol="archive_label", maxIter=10)
gbt_model = gbt.fit(train)

# Tester le modèle
gbt_predictions = gbt_model.transform(test)

# Évaluer l'exactitude du modèle Gradient Boosting
gbt_accuracy = evaluator.evaluate(gbt_predictions)
print(f"Exactitude du modèle Gradient Boosting : {gbt_accuracy}")


Exactitude du modèle Gradient Boosting : 0.3548387096774194


## Évaluation de la précision et du rappel

In [32]:
# Calcul de la précision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="archive_label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(gbt_predictions)
print(f"Précision du modèle : {precision}")

# Calcul du rappel
recall_evaluator = MulticlassClassificationEvaluator(labelCol="archive_label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(gbt_predictions)
print(f"Rappel du modèle : {recall}")


Précision du modèle : 0.35816518580119927
Rappel du modèle : 0.3548387096774194


## Filtrer et afficher les fichiers à archiver

In [33]:
# Filtrer les fichiers à archiver (prediction == 1.0)
files_to_archive = gbt_predictions.filter(gbt_predictions.prediction == 1.0)

# Afficher les fichiers qui devraient être archivés
files_to_archive.select("Name_Player", "file_size").show()


+--------------------+---------+
|         Name_Player|file_size|
+--------------------+---------+
|      Jackson Porozo|     2400|
|     Kevin Rodríguez|     2500|
|Virgil van Dijk (...|     2900|
|     Steven Berghuis|     3600|
|     Denzel Dumfries|     4700|
|         Xavi Simons|     5000|
|     Boualem Khoukhi|     6700|
|         Tom Lockyer|    19700|
|         Mark Harris|    19900|
|         Ben Cabango|    20400|
|     Leandro Paredes|    21100|
|     Guido Rodríguez|    22400|
|    Nicolás Otamendi|    22500|
|    Lautaro Martínez|    22800|
|   Emiliano Martínez|    22900|
|      Enzo Fernández|    23000|
|       Johan Vásquez|    23700|
|        Raúl Jiménez|    24100|
|  Rogelio Funes Mori|    24300|
|          Matty Cash|    26000|
+--------------------+---------+
only showing top 20 rows



## Écriture des fichiers compressés dans HDFS

In [35]:
# Simuler l'archivage ou la compression avec l'option d'écrasement (overwrite)
files_to_archive.write.mode("overwrite").parquet("hdfs://192.168.111.130:9000/ilyas/hadoop/archives_compressed/", compression='snappy')
