# 1. Presentation et objectif :   
 **Réalisé par :**


*   HADDADI Mazigh (MLSD)
*   BEN TAYEB Mohamed Amine (AMSD)

**Objectif :**

Le TP consiste à regrouper des documents textuels tels que les documents qui
partagent le même thème se retrouvent dans le même groupe, et les documents qui
portent sur des sujets très différents se trouvent dans des groupes différents.




# 2. Mise en place de l’environnement de travail

***Installation de Spark***

In [1]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
! wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [3]:
! tar xf /content/spark-3.5.0-bin-hadoop3.tgz

In [4]:
! pip install -q findspark
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=7c6e7e1dabe4d6d6a75451baa633d36bc3f5f1b25e71720478d1945f098e23b9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


***Configuration des variables d'environnement requises***

In [5]:
# Configurer les variables d'environnement requises
import os

# Définir le chemin vers JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Définir le chemin vers SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Définir les arguments de soumission PySpark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.5 pyspark-shell'

# Importer le module findspark pour initialiser Spark dans un environnement Python
import findspark

findspark.init("spark-3.5.0-bin-hadoop3")

# Importer les modules Spark nécessaires
from pyspark import SparkContext, SparkConf

# Configurer Spark avec le nom de l'application et le mode maître local
configuration = SparkConf().setAppName("name").setMaster("local[4]")

# Initialiser le contexte Spark
sc = SparkContext(conf=configuration)


In [6]:
sc

***Création d'une session Spark***

In [7]:
from pyspark.sql import SparkSession

# Création d'une session Spark
spark = SparkSession.builder \
    .appName("TP2_Spark") \
    .getOrCreate()


In [8]:
spark

# 3. Données

***Téléchargement et extraction des données***

In [9]:
# Télécharger les données
!wget -O 20news-19997.tar.gz http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz

# Décompresser les données
!tar -xzf 20news-19997.tar.gz


--2024-02-05 13:43:45--  http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz
Resolving qwone.com (qwone.com)... 173.48.205.131
Connecting to qwone.com (qwone.com)|173.48.205.131|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17332201 (17M) [application/x-gzip]
Saving to: ‘20news-19997.tar.gz’


2024-02-05 13:43:47 (9.41 MB/s) - ‘20news-19997.tar.gz’ saved [17332201/17332201]






**Chargement des données dans les  deux RDD**




In [10]:
# Charger les données dans deux RDDs
alt_atheism_rdd = sc.wholeTextFiles("20_newsgroups/alt.atheism/*")
rec_sport_baseball_rdd = sc.wholeTextFiles("20_newsgroups/rec.sport.baseball/*")

# Vérifier le nombre de documents dans chaque RDD
print("Nombre de documents dans alt.atheism_rdd:", alt_atheism_rdd.count())
print("Nombre de documents dans rec_sport_baseball_rdd:", rec_sport_baseball_rdd.count())

Nombre de documents dans alt.atheism_rdd: 1000
Nombre de documents dans rec_sport_baseball_rdd: 1000


***Séparation du corps du message de l’entête***

In [11]:
# Définir une fonction pour séparer le corps du message de l'entête
def separate_header_body(text):
    header, body = text.split("\n\n", 1)  # Séparation sur la première occurrence de "\n\n"
    return header, body

# Appliquer la fonction de séparation sur chaque RDD
alt_atheism_header_body_rdd = alt_atheism_rdd.map(lambda x: separate_header_body(x[1]))
rec_sport_baseball_header_body_rdd = rec_sport_baseball_rdd.map(lambda x: separate_header_body(x[1]))

# Vérifier le résultat en affichant quelques éléments
print("Exemple d'entête et de corps de message dans alt_atheism_header_body_rdd:")
for header, body in alt_atheism_header_body_rdd.take(1):
    print("Entête :", header)
    print("Corps :", body)
    print()

print("\nExemple d'entête et de corps de message dans rec_sport_baseball_header_body_rdd:")
for header, body in rec_sport_baseball_header_body_rdd.take(1):
    print("Entête :", header)
    print("Corps :", body)
    print()

Exemple d'entête et de corps de message dans alt_atheism_header_body_rdd:
Entête : Xref: cantaloupe.srv.cs.cmu.edu alt.atheism:49960 alt.atheism.moderated:713 news.answers:7054 alt.answers:126
Path: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!cis.ohio-state.edu!magnus.acs.ohio-state.edu!usenet.ins.cwru.edu!agate!spool.mu.edu!uunet!pipex!ibmpcug!mantis!mathew
From: mathew <mathew@mantis.co.uk>
Newsgroups: alt.atheism,alt.atheism.moderated,news.answers,alt.answers
Subject: Alt.Atheism FAQ: Atheist Resources
Summary: Books, addresses, music -- anything related to atheism
Keywords: FAQ, atheism, books, music, fiction, addresses, contacts
Message-ID: <19930329115719@mantis.co.uk>
Date: Mon, 29 Mar 1993 11:57:19 GMT
Expires: Thu, 29 Apr 1993 11:57:19 GMT
Followup-To: alt.atheism
Distribution: world
Organization: Mantis Consultants, Cambridge. UK.
Approved: news-answers-request@mit.edu
Supersedes: <19930301143317@mantis.co.uk>
Lines: 290
Corps : Arch

***Extraction d'Informations (’organisation et la catégorie:Newsgroups) avec des Expressions Régulières (module re)***

In [12]:
import re

def extract_fields(header):
    organization_match = re.search(r"Organization:\s*(.*?)\n", header, re.IGNORECASE)
    organization = organization_match.group(1).strip() if organization_match else None

    newsgroups_match = re.search(r"Newsgroups:\s*(.*?)\n", header, re.IGNORECASE)
    newsgroups = newsgroups_match.group(1).strip() if newsgroups_match else None

    return organization, newsgroups


***Application de l'Extraction d'Informations sur les RDD***

In [13]:
# Appliquer la fonction d'extraction sur chaque RDD
alt_atheism_extracted_fields_rdd = alt_atheism_header_body_rdd.map(lambda x: (extract_fields(x[0]), x[1]))
rec_sport_baseball_extracted_fields_rdd = rec_sport_baseball_header_body_rdd.map(lambda x: (extract_fields(x[0]), x[1]))

# Vérifier le résultat en affichant quelques éléments
print("Exemple d'organisation et de catégorie dans alt_atheism_extracted_fields_rdd:")
for (organization, newsgroups), body in alt_atheism_extracted_fields_rdd.take(1):
    print("Organisation :", organization)
    print("Catégorie (Newsgroups) :", newsgroups)
    print()

print("\nExemple d'organisation et de catégorie dans rec_sport_baseball_extracted_fields_rdd:")
for (organization, newsgroups), body in rec_sport_baseball_extracted_fields_rdd.take(1):
    print("Organisation :", organization)
    print("Catégorie (Newsgroups) :", newsgroups)
    print()


Exemple d'organisation et de catégorie dans alt_atheism_extracted_fields_rdd:
Organisation : Mantis Consultants, Cambridge. UK.
Catégorie (Newsgroups) : alt.atheism,alt.atheism.moderated,news.answers,alt.answers


Exemple d'organisation et de catégorie dans rec_sport_baseball_extracted_fields_rdd:
Organisation : Homewood Academic Computing, Johns Hopkins University, Baltimore, Md, USA
Catégorie (Newsgroups) : rec.sport.baseball



***Fusion des Informations Extraites (Union de RDD)***

In [14]:
# Fusionner les deux RDD (union)
merged_rdd = alt_atheism_extracted_fields_rdd.union(rec_sport_baseball_extracted_fields_rdd)

# Vérifier le nombre d'éléments dans le RDD fusionné
print("Nombre d'éléments dans le RDD fusionné :", merged_rdd.count())

Nombre d'éléments dans le RDD fusionné : 2000


***Transformation en Lignes (Rows) avec pyspark.sql***

In [15]:
from pyspark.sql import Row

# Transformer le RDD pour que chaque élément soit de type Row
row_rdd = merged_rdd.map(lambda x: Row(organization=x[0][0], newsgroups=x[0][1], body=x[1]))

# Afficher quelques éléments pour vérifier le type
for row in row_rdd.take(1):
    print(type(row), row)


<class 'pyspark.sql.types.Row'> Row(organization='Mantis Consultants, Cambridge. UK.', newsgroups='alt.atheism,alt.atheism.moderated,news.answers,alt.answers', body='Archive-name: atheism/resources\nAlt-atheism-archive-name: resources\nLast-modified: 11 December 1992\nVersion: 1.0\n\n                              Atheist Resources\n\n                      Addresses of Atheist Organizations\n\n                                     USA\n\nFREEDOM FROM RELIGION FOUNDATION\n\nDarwin fish bumper stickers and assorted other atheist paraphernalia are\navailable from the Freedom From Religion Foundation in the US.\n\nWrite to:  FFRF, P.O. Box 750, Madison, WI 53701.\nTelephone: (608) 256-8900\n\nEVOLUTION DESIGNS\n\nEvolution Designs sell the "Darwin fish".  It\'s a fish symbol, like the ones\nChristians stick on their cars, but with feet and the word "Darwin" written\ninside.  The deluxe moulded 3D plastic fish is $4.95 postpaid in the US.\n\nWrite to:  Evolution Designs, 7119 Laurel Canyon #4

***Création d'un DataFrame à partir du RDD d'objets Row***

In [16]:
# Créer un DataFrame à partir du RDD d'objets Row
df = spark.createDataFrame(row_rdd)

# Afficher le schéma du DataFrame
df.printSchema()

# Afficher les premières lignes du DataFrame
df.show(5)

root
 |-- organization: string (nullable = true)
 |-- newsgroups: string (nullable = true)
 |-- body: string (nullable = true)

+--------------------+--------------------+--------------------+
|        organization|          newsgroups|                body|
+--------------------+--------------------+--------------------+
|Mantis Consultant...|alt.atheism,alt.a...|Archive-name: ath...|
|Mantis Consultant...|alt.atheism,alt.a...|Archive-name: ath...|
|Technical Univers...|         alt.atheism|In article <65974...|
|Mantis Consultant...|alt.atheism,alt.p...|dmn@kepler.unh.ed...|
|        IBM Research|alt.atheism,soc.m...|In article <N4HY....|
+--------------------+--------------------+--------------------+
only showing top 5 rows



***Installation des Bibliothèques Fastavro et Pandavro***

In [17]:
!pip install fastavro
!pip install pandavro

Collecting fastavro
  Downloading fastavro-1.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro
Successfully installed fastavro-1.9.3
Collecting pandavro
  Downloading pandavro-1.8.0-py3-none-any.whl (8.8 kB)
Installing collected packages: pandavro
Successfully installed pandavro-1.8.0


***Importation des Bibliothèques Fastavro, Numpy, Pandas et Pandavro***


In [18]:
from fastavro import writer, reader, parse_schema
import numpy as np
import pandas as pd
import pandavro as pdx

***Conversion du DataFrame Spark en DataFrame Pandas***

In [19]:
pandas_df=df.toPandas()

***Sauvegarde du DataFrame Pandas au Format Avro***

In [20]:
# Définir le nom du fichier Avro
filename = "sauvegarde_avro/df.avro"

# Vérifier si le dossier existe, sinon le créer
if not os.path.exists("sauvegarde_avro"):
    os.makedirs("sauvegarde_avro")

# Sauvegarder la DataFrame au format Avro dans le fichier spécifié
pdx.to_avro(filename, pandas_df)

***Lecture du Fichier Avro***

In [21]:
with open('sauvegarde_avro/df.avro', 'rb') as fo:
    for record in reader(fo):
        print(record)

Output hidden; open in https://colab.research.google.com to view.

***Installation des Bibliothèques pyarrow***

In [22]:
!pip install pyarrow



***Sauvegarde du DataFrame Pandas au Format Parquet***

In [23]:
import pyarrow as pa
import pyarrow.parquet as pq

filename_parquet = "sauvegarde_parquet/df.parquet"

# Vérifier si le dossier existe, sinon le créer
if not os.path.exists("sauvegarde_parquet"):
    os.makedirs("sauvegarde_parquet")

table = pa.Table.from_pandas(pandas_df)
pq.write_table(table, filename_parquet)

***Lecture du Fichier Parquet***

In [24]:
table2 = pq.read_table('sauvegarde_parquet/df.parquet')
table2.to_pandas()

Unnamed: 0,organization,newsgroups,body
0,"Mantis Consultants, Cambridge. UK.","alt.atheism,alt.atheism.moderated,news.answers...",Archive-name: atheism/resources\nAlt-atheism-a...
1,"Mantis Consultants, Cambridge. UK.","alt.atheism,alt.atheism.moderated,news.answers...",Archive-name: atheism/introduction\nAlt-atheis...
2,"Technical University Braunschweig, Germany",alt.atheism,In article <65974@mimsy.umd.edu>\nmangoe@cs.um...
3,"Mantis Consultants, Cambridge. UK.","alt.atheism,alt.politics.usa.constitution",dmn@kepler.unh.edu (...until kings become phil...
4,IBM Research,"alt.atheism,soc.motss,rec.scouting",In article <N4HY.93Apr5120934@harder.ccr-p.ida...
...,...,...,...
1995,"University of Washington, Seattle",rec.sport.baseball,My brother purchased baseball tickets for Texa...
1996,"Simon Fraser University, Burnaby, B.C., Canada",rec.sport.baseball,behrens@cc.swarthmore.edu (Eric Behrens) write...
1997,"Duke University; Durham, N.C.",rec.sport.baseball,In article <1993Apr26.161946.846@adobe.com> sn...
1998,"Homewood Academic Computing, Johns Hopkins Uni...",rec.sport.baseball,I heard that Eli is selling the team to a grou...


# 4. Analyse descriptive

***Création d'une Vue Temporaire pour compter les catégories distinctes***

In [25]:
# Créer une vue temporaire pour la DataFrame afin de pouvoir l'utiliser avec Spark SQL
df.createOrReplaceTempView("documents")

# Utiliser Spark SQL pour compter les catégories distinctes avec normalisation
distinct_categories_count = spark.sql("""
    SELECT COUNT(DISTINCT normalized_newsgroups) AS distinct_categories
    FROM (
        SELECT
            CASE
                WHEN newsgroups LIKE '%rec.sport.basebal%' THEN 'rec.sport.baseball'
                WHEN newsgroups LIKE '%alt.atheism%' THEN 'alt.atheism'
            END AS normalized_newsgroups
        FROM documents
    ) temp
""").collect()[0][0]


# Afficher le résultat
print("Nombre de catégories différentes de documents :", distinct_categories_count)



Nombre de catégories différentes de documents : 2


***Trouver le nombre d'organisations différentes***

In [26]:
# Créer une vue temporaire pour la DataFrame afin de pouvoir l'utiliser avec Spark SQL
df.createOrReplaceTempView("documents")

# Utiliser Spark SQL pour trouver le nombre d'organisations différentes
distinct_organizations_count = spark.sql("SELECT COUNT(DISTINCT Organization) AS distinct_organizations FROM documents").collect()[0][0]

# Afficher le résultat
print("Nombre d'organisations différentes de documents :", distinct_organizations_count)


Nombre d'organisations différentes de documents : 481


***Statistiques descriptives***

In [27]:

# Utiliser Spark SQL pour obtenir toutes les statistiques descriptives et les 5 premières organisations par ordre décroissant de fréquence
stats = spark.sql("""
    SELECT
        (SELECT COUNT(*) FROM documents) AS total_documents,
        (SELECT COUNT(DISTINCT newsgroups) FROM documents) AS distinct_categories,
        (SELECT COUNT(DISTINCT Organization) FROM documents) AS distinct_organizations,
        (SELECT AVG(LENGTH(body)) FROM documents) AS avg_body_length,
        Organization,
        COUNT(*) AS organization_count
    FROM documents
    GROUP BY Organization
    ORDER BY organization_count DESC
""").collect()

# Afficher les résultats
print("Nombre total de documents :", stats[0]["total_documents"])
print("Nombre de catégories différentes de documents :", stats[0]["distinct_categories"])
print("Nombre d'organisations différentes :", stats[0]["distinct_organizations"])
print("Longueur moyenne du corps des documents :", stats[0]["avg_body_length"])
print("\nTop 5 des organisations par ordre décroissant de fréquence :")
for stat in stats[:5]:
    print(stat["Organization"], ":", stat["organization_count"])


Nombre total de documents : 2000
Nombre de catégories différentes de documents : 36
Nombre d'organisations différentes : 481
Longueur moyenne du corps des documents : 1610.9735

Top 5 des organisations par ordre décroissant de fréquence :
sgi : 70
None : 69
California Institute of Technology, Pasadena : 65
Siemens-Nixdorf AG : 41
Mantis Consultants, Cambridge. UK. : 40


# 5. Transformation du texte

***Découper les documents en listes de mots à l’aide de Tokenize***

In [28]:
#Transformation du texte

from pyspark.ml.feature import Tokenizer


# Configurer le Tokenizer
tokenizer = Tokenizer(inputCol="body", outputCol="words")

# Appliquer le Tokenizer pour découper les documents en listes de mots
tokenized_df = tokenizer.transform(df)

# Afficher le résultat
tokenized_df.select("words").show(n=1, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

***Représentation vectorielle des documents à l’aide de HashingTF***

In [29]:
from pyspark.ml.feature import HashingTF

# Configurer HashingTF avec 10 features
hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="vectorial_representation")

# Appliquer HashingTF pour créer la représentation vectorielle
tf_df = hashingTF.transform(tokenized_df)

# Afficher le résultat
tf_df.select("vectorial_representation").show(n=1, truncate=False)


+----------------------------------------------------------------------------------------+
|vectorial_representation                                                                |
+----------------------------------------------------------------------------------------+
|(10,[0,1,2,3,4,5,6,7,8,9],[134.0,182.0,711.0,215.0,146.0,193.0,127.0,234.0,143.0,165.0])|
+----------------------------------------------------------------------------------------+
only showing top 1 row



# 6. Grouper les documents qui ont des représentations vectorielles proches


In [30]:
from pyspark.ml.clustering import KMeans

# Renommer la colonne "vectorial_representation" en "features"
tf_df = tf_df.withColumnRenamed("vectorial_representation", "features")

# Créer un modèle KMeans avec 2 clusters
kmeans = KMeans(k=2, seed=1)

# Entraîner le modèle KMeans avec les représentations vectorielles des documents
model = kmeans.fit(tf_df)

# Attribuer des clusters aux documents
clustered_df = model.transform(tf_df)

# Afficher les résultats
clustered_df.select("prediction").show(n=5)


+----------+
|prediction|
+----------+
|         1|
|         1|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



# 7. Pour aller plus loin (optionnel)


***1. Pondérer les mots avec la formule Tf-Idf (avant KMeans)***

In [31]:
from pyspark.ml.feature import CountVectorizer, IDF

# Configurer CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="raw_features", vocabSize=1000)

# Appliquer CountVectorizer pour obtenir les vecteurs de termes bruts
cv_model = cv.fit(tf_df)
raw_features_df = cv_model.transform(tf_df)

# Configurer IDF
idf = IDF(inputCol="raw_features", outputCol="tf_idf_features")

# Entraîner le modèle IDF
idf_model = idf.fit(raw_features_df)
tfidf_df = idf_model.transform(raw_features_df)

# Afficher le résultat
tfidf_df.select("tf_idf_features").show(n=1, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

***2. Normaliser les vecteurs représentant les documents (avant KMeans)***

In [32]:
from pyspark.ml.feature import MinMaxScaler

# Créer un scaler pour normaliser les vecteurs
scaler = MinMaxScaler(inputCol="tf_idf_features", outputCol="scaled_features")

# Ajuster le scaler sur les données
scaler_model = scaler.fit(tfidf_df)

# Appliquer la transformation de normalisation
normalized_df = scaler_model.transform(tfidf_df)

# Afficher le résultat
normalized_df.select("scaled_features").show(n=1, truncate=False)
normalized_df = normalized_df.drop("features")


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

***Appliquer KMeans***

In [33]:
# Renommer la colonne "scaled_features" en "features"
normalized_df = normalized_df.withColumnRenamed("scaled_features", "features")

# Créer un modèle KMeans avec 2 clusters
kmeans = KMeans(k=2, seed=1)

# Entraîner le modèle KMeans avec les représentations vectorielles des documents
model = kmeans.fit(normalized_df)

# Attribuer des clusters aux documents
clustered_norm_df = model.transform(normalized_df)

# Afficher les résultats
clustered_norm_df.select("prediction").show(n=10)


+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         0|
|         0|
|         1|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 10 rows

