# **Projet Big Data avec Apache Spark**
---
**Douaa BENHADDOUCHE - Lilia HARIRECHE - Antoine RODRIGUEZ** - MLDS FA

## **Mise en place de l’environnement de travail**

### Installation et définition des variables d'environnement

In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.2
!wget -q https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
# unzip it
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
# install findspark 
!pip install -q findspark

CPU times: user 113 ms, sys: 32.4 ms, total: 145 ms
Wall time: 46.6 s


In [None]:
# Set up required environment variables
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-avro_2.12:3.0.2 pyspark-shell"

In [None]:
import findspark
findspark.init("spark-3.0.2-bin-hadoop2.7")

### Création de l'objet `SparkContext`

In [None]:
from pyspark import SparkContext, SparkConf

#Initialisation du driver
conf = SparkConf().setAppName("mon application").setMaster("local[4]")  

#création d'un objet SparkContext
sc = SparkContext(conf=conf)
sc  # affichage

### Création de l'objet `SparkSession`

In [None]:
#création d'un objet SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()

##  **Préparation de données**

### 2(a) and 2(b)

Commençons par charger les données que nous allons étudier tout au long de ce projet.

In [None]:
!wget http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz
!tar xf 20news-19997.tar.gz

--2021-02-25 08:30:09--  http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz
Resolving qwone.com (qwone.com)... 173.48.209.137
Connecting to qwone.com (qwone.com)|173.48.209.137|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17332201 (17M) [application/x-gzip]
Saving to: ‘20news-19997.tar.gz’


2021-02-25 08:30:15 (2.84 MB/s) - ‘20news-19997.tar.gz’ saved [17332201/17332201]



### 2.(c)

On charge ensuite les données en tant qu'objet de type `RDD`

In [None]:
# 2(c) : Loading Data as RDD files
rdd_alt = sc.wholeTextFiles("/content/20_newsgroups/alt.atheism")
rdd_rec = sc.wholeTextFiles("/content/20_newsgroups/rec.sport.baseball")

pyspark.rdd.RDD

Chaque élément du `RDD` est un tuple contenant le nom du fichier et son emplacement, ainsi que le contenu du fichier .txt 

In [None]:
rdd_alt.collect()[0]  # permet d'accéder au premier élément

('file:/content/20_newsgroups/alt.atheism/53558',
 'Path: cantaloupe.srv.cs.cmu.edu!das-news.harvard.edu!noc.near.net!howland.reston.ans.net!usenet.ins.cwru.edu!po.CWRU.edu!kmr4\nFrom: kmr4@po.CWRU.edu (Keith M. Ryan)\nNewsgroups: alt.atheism\nSubject: Re: free moral agency\nDate: Fri, 23 Apr 1993 18:02:47 GMT\nOrganization: Case Western Reserve University\nLines: 21\nDistribution: na\nMessage-ID: <kmr4.1696.735588167@po.CWRU.edu>\nReferences: <93110.031029TAN102@psuvm.psu.edu> <C5v2Mr.1z1@darkside.osrhe.uoknor.edu> <1r98voINNr9q@lynx.unm.edu>\nNNTP-Posting-Host: b64635.student.cwru.edu\nKeywords: Another thread destined for the kill-file\n\nIn article <1r98voINNr9q@lynx.unm.edu> cfaehl@vesta.unm.edu (Chris Faehl) writes:\n\n>> The myth to which I refer is the convoluted counterfeit athiests have\n>> created to make religion appear absurd. \n>\n>"Counterfeit atheists". Hmmmm. So, we\'re just cheap knock-offs of the\n>True Atheists. \n\n\tThey must be theists in disguise.\n\n\tIn any ev

### 2.(d)

Conservons les données sous forme de tuple mais séparons le corps du message et l'en-tête (séparé par 'Lines: XX') afin d'obtenir pour chaque fichier un tuple 

`(en-tête, corps)`

In [None]:
import re  # on va utiliser des regex pour split et aussi extraire certains champs

def splitting(tup):
    split = re.split(r"(Lines:.+\n)", tup[1])
    if len(split) < 3: 
        return tup
    else:
        return split[0] + split[1], split[2]

In [None]:
rdd_alt_split = rdd_alt.map(splitting)  # le RDD comme il est immutable
rdd_rec_split = rdd_rec.map(splitting) 

In [None]:
rdd_alt_split.collect()[0]

('Path: cantaloupe.srv.cs.cmu.edu!das-news.harvard.edu!noc.near.net!howland.reston.ans.net!usenet.ins.cwru.edu!po.CWRU.edu!kmr4\nFrom: kmr4@po.CWRU.edu (Keith M. Ryan)\nNewsgroups: alt.atheism\nSubject: Re: free moral agency\nDate: Fri, 23 Apr 1993 18:02:47 GMT\nOrganization: Case Western Reserve University\nLines: 21\n',
 'Distribution: na\nMessage-ID: <kmr4.1696.735588167@po.CWRU.edu>\nReferences: <93110.031029TAN102@psuvm.psu.edu> <C5v2Mr.1z1@darkside.osrhe.uoknor.edu> <1r98voINNr9q@lynx.unm.edu>\nNNTP-Posting-Host: b64635.student.cwru.edu\nKeywords: Another thread destined for the kill-file\n\nIn article <1r98voINNr9q@lynx.unm.edu> cfaehl@vesta.unm.edu (Chris Faehl) writes:\n\n>> The myth to which I refer is the convoluted counterfeit athiests have\n>> created to make religion appear absurd. \n>\n>"Counterfeit atheists". Hmmmm. So, we\'re just cheap knock-offs of the\n>True Atheists. \n\n\tThey must be theists in disguise.\n\n\tIn any event, we don\'t _need_ to create religious par

### 2.(e)

In [None]:
def extraction_champs(tup):
    groups = '' ; subject = '' ; orga = '' ; lines = '' ; keywords = []
    temp = re.findall(r'(alt.atheism|rec.sport.baseball)', tup[0]) 
    if temp:  # not NoneType / éviter une erreur si les re.findall ne fonctionnent pas
        groups = temp[0]
    temp = re.findall(r'Subject: (.+)\n', tup[0])
    if temp: 
        subject = temp[0]
    temp = re.findall(r'Organization: (.+)\n', tup[0])
    if temp: 
        orga = temp[0]
    temp = re.findall(r'Lines: (.+)\n', tup[0])
    if temp: 
        lines = int(temp[0])
    temp = re.findall(r'Keywords: (.+)\n', tup[0])  # keywords non disponible pour beaucoup de textes
    if temp:
        keywords = re.split(',', temp[0])
        keywords = [k.strip() for k in keywords]
    return (groups, subject, orga, lines, keywords, tup[1])

In [None]:
rdd_alt_ = rdd_alt_split.map(extraction_champs)  
rdd_rec_ = rdd_rec_split.map(extraction_champs) 

In [None]:
rdd_alt_.collect()[0]  # il n'y a bien souvent pas de keywords.

('alt.atheism',
 'Re: free moral agency',
 'Case Western Reserve University',
 21,
 [],
 'Distribution: na\nMessage-ID: <kmr4.1696.735588167@po.CWRU.edu>\nReferences: <93110.031029TAN102@psuvm.psu.edu> <C5v2Mr.1z1@darkside.osrhe.uoknor.edu> <1r98voINNr9q@lynx.unm.edu>\nNNTP-Posting-Host: b64635.student.cwru.edu\nKeywords: Another thread destined for the kill-file\n\nIn article <1r98voINNr9q@lynx.unm.edu> cfaehl@vesta.unm.edu (Chris Faehl) writes:\n\n>> The myth to which I refer is the convoluted counterfeit athiests have\n>> created to make religion appear absurd. \n>\n>"Counterfeit atheists". Hmmmm. So, we\'re just cheap knock-offs of the\n>True Atheists. \n\n\tThey must be theists in disguise.\n\n\tIn any event, we don\'t _need_ to create religious parodies: just \nlook at some actual religions which are absurd.\n\n\n\x1b[34mAnd now . . . \x1b[35mDeep Thoughts\x1b[0m\n\t\x1b[32mby Jack Handey.\x1b[0m\n\n\x1b[36mIf you go parachuting, and your parachute doesn\'t open, and your\nfriend

### 2.(f).

In [None]:
rdd = rdd_alt_.union(rdd_rec_) #fusion entre deux rdd

### 2.(g).

In [None]:
from pyspark.sql import Row

func = Row('Newsgroup','Subject','Organization','Lines', 'Keywords', 'Body') # on définit le nom de chacunes des variables
rdd_ = rdd.map(lambda x : func(*x)) 

In [None]:
rdd_.collect()[0:2]

[Row(Newsgroup='alt.atheism', Subject='Re: free moral agency', Organization='Case Western Reserve University', Lines=21, Keywords=[], Body='Distribution: na\nMessage-ID: <kmr4.1696.735588167@po.CWRU.edu>\nReferences: <93110.031029TAN102@psuvm.psu.edu> <C5v2Mr.1z1@darkside.osrhe.uoknor.edu> <1r98voINNr9q@lynx.unm.edu>\nNNTP-Posting-Host: b64635.student.cwru.edu\nKeywords: Another thread destined for the kill-file\n\nIn article <1r98voINNr9q@lynx.unm.edu> cfaehl@vesta.unm.edu (Chris Faehl) writes:\n\n>> The myth to which I refer is the convoluted counterfeit athiests have\n>> created to make religion appear absurd. \n>\n>"Counterfeit atheists". Hmmmm. So, we\'re just cheap knock-offs of the\n>True Atheists. \n\n\tThey must be theists in disguise.\n\n\tIn any event, we don\'t _need_ to create religious parodies: just \nlook at some actual religions which are absurd.\n\n\n\x1b[34mAnd now . . . \x1b[35mDeep Thoughts\x1b[0m\n\t\x1b[32mby Jack Handey.\x1b[0m\n\n\x1b[36mIf you go parachuting, 

### 2.(h).

In [None]:
df = spark.createDataFrame(rdd_)

In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

On peut afficher le dataframe à l'aide de la méthode `.show()`

In [None]:
df.show(n=5, truncate=20, vertical=False) # prend en compte les \n d'où l'affichage

+-----------+--------------------+--------------------+-----+--------+--------------------+
|  Newsgroup|             Subject|        Organization|Lines|Keywords|                Body|
+-----------+--------------------+--------------------+-----+--------+--------------------+
|alt.atheism|Re: <Political At...|Mantis Consultant...|   11|      []|
mccullou@snake2....|
|alt.atheism|    Re: Albert Sabin|The University of...|   83|      []|
In article <18AP...|
|alt.atheism|Victims of variou...|   Cured, discharged|   30|      []|
In article <9454...|
|alt.atheism|Re: An Anecdote a...|Unorganized Usene...|   24|      []|
In <114127@bu.ed...|
|alt.atheism|    Re: Albert Sabin|The University of...|   15|      []|
In article <1993...|
+-----------+--------------------+--------------------+-----+--------+--------------------+
only showing top 5 rows



### 2.(i). & 2(j).

On peut enregistrer le DataFrame en format avro ou parquet à l'aide des commandes suivantes : 

In [None]:
df.write.format("avro").save("RDD_df.avro")

In [None]:
df.write.parquet("RDD_df.parquet")

##  **Analyse descriptive** 

### 3.(a). & 3.(b).

La méthode `.select()` permet d'extraire une ou plusieurs colonnes celles-ci nous permettra de faire une analyse descriptive de ces colonnes. 

In [None]:
df.select("Newsgroup").distinct().show()  # distinct cherche les éléments unique dans la colonne

+------------------+
|         Newsgroup|
+------------------+
|rec.sport.baseball|
|       alt.atheism|
+------------------+



In [None]:
df.select("Organization").distinct().count() # count compte le nombre d'éléments

468

In [None]:
df.select("Organization").distinct().show(5, truncate=70) # On peux afficher 5 organisations

+----------------------------------------------------------------+
|                                                    Organization|
+----------------------------------------------------------------+
|     University of Wales College of Cardiff, Cardiff, WALES, UK.|
|                                            Brownian Motion Inc.|
|Yale University Computer Science Dept., New Haven, CT 06520-2158|
|                      University of New Hampshire  -  Durham, NH|
|                                       Jet Propulsion Laboratory|
+----------------------------------------------------------------+
only showing top 5 rows



### 3.(a). 
On peut aussi donner des statistiques pour chacunes des colonnes, ici pour la colonne `'Lines'`

In [None]:
df.describe('Lines').show()

+-------+------------------+
|summary|             Lines|
+-------+------------------+
|  count|              1994|
|   mean| 36.48294884653962|
| stddev|48.791372734500925|
|    min|                 1|
|    max|              1049|
+-------+------------------+



In [None]:
df.select("Keywords").distinct().show(5, truncate=70)

+-------------------------------------------+
|                                   Keywords|
+-------------------------------------------+
|                           [second defense]|
|[Another thread destined for the kill-file]|
|                               [mlb, 04.16]|
|                                       [DR]|
|       [Mariners, grand slam, Omar Vizquel]|
+-------------------------------------------+
only showing top 5 rows



Nous souhaitions ensuite construire un wordcould des keywords pour chacun des Newsgroup. Faute d'avoir trouver une solution plus "sparkienne" nous avons essayer de la manière suivante :

In [None]:
import numpy as np

# .filter() permet de selectionner des lignes suivants une condition
# .toPandas.values transforme la colonne en dataframe puis en extrait les valeur sous forme de np.ndarray
atheism_keywords = df.filter(df.Newsgroup == 'alt.atheism').select("Keywords").toPandas().values
baseball_keywords = df.filter(df.Newsgroup == 'rec.sport.baseball').select("Keywords").toPandas().values

Le nombre de mots clés est très faible...

## **Transformation du texte et clustering**

### 4.(a). & 4.(b).

In [None]:
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="Body", outputCol="words")
df_tokenized = tokenizer.transform(df)

In [None]:
df_tokenized.select('words').collect()[0]

Row(words=['', 'mccullou@snake2.cs.wisc.edu', '(mark', 'mccullough)', 'writes:', '>', 'i', 'think', 'you', 'mean', 'circular,', 'not', 'recursive,', 'but', 'that', 'is', 'semantics.', '>', 'recursiveness', 'has', 'no', 'problems,', 'it', 'is', 'just', 'horribly', 'inefficient', '(just', 'ask', '>', 'any', 'assembly', 'programmer.)', '', 'tail-recursive', 'functions', 'in', 'scheme', 'are', 'at', 'least', 'as', 'efficient', 'as', 'iterative', 'loops.', '', 'anyone', 'who', "doesn't", 'program', 'in', 'assembler', 'will', 'have', 'heard', 'of', 'optimizing', 'compilers.', '', '', 'mathew'])

On se propose aussi de retirer les stopwords via : 

In [None]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol='words', outputCol='words_clean')
df_tokenized = remover.transform(df_tokenized)

La fonction `HashingTF()` permet de décrire chaque document comme vecteur d'un espace de fréquences de mots. A la différence de `CountVectorizer()`, ce *transformer* n'a pas besoin d'être entrainé sur les données. 

In [None]:
hTF = HashingTF(inputCol='words_clean', outputCol="features", numFeatures=50)
df_featurized = hTF.transform(df_tokenized)

In [None]:
df_featurized.select('words_clean','features').show(5, truncate=40)

+----------------------------------------+----------------------------------------+
|                             words_clean|                                features|
+----------------------------------------+----------------------------------------+
|[, mccullou@snake2.cs.wisc.edu, (mark...|(50,[2,6,7,9,11,16,20,22,23,25,27,28,...|
|[, article, <18apr199317500990@skyblu...|(50,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,...|
|[, article, <9454@tekig7.pen.tek.com>...|(50,[0,1,2,3,4,5,6,8,9,10,11,12,14,15...|
|[, <114127@bu.edu>, jaeger@buphy.bu.e...|(50,[0,1,2,4,5,7,8,9,10,11,12,13,14,1...|
|[, article, <1993apr3.004902.25370@sc...|(50,[0,2,4,5,7,9,12,13,14,15,16,18,20...|
+----------------------------------------+----------------------------------------+
only showing top 5 rows



La colonne `features` contient une représentation vectorielle en une liste de (30) mots.

### 5.

Entrainons un k-means sur nos `features` pour essayer de retrouver nos ensembles `alt.atheism` et `rec.sport.baseball`.

Commençons par établir le dataframe sur lequel travaillé.

In [None]:
from pyspark.ml.feature import StringIndexer

# pour convertir la catégorie Newsgroup
indexer = StringIndexer(inputCol="Newsgroup", outputCol="NewsgroupIndex") 
df_featurized = indexer.fit(df_featurized).transform(df_featurized)
dataframe = df_featurized.select('NewsgroupIndex','features')

In [None]:
%%time
from pyspark.ml.clustering import KMeans

# on entraine d'abord le modèle
kmeans = KMeans(featuresCol='features', predictionCol='predictions').setK(2).setSeed(42)  # seed pour garder l'initialisation
model = kmeans.fit(dataframe)

CPU times: user 41.2 ms, sys: 8.99 ms, total: 50.2 ms
Wall time: 11.1 s


In [None]:
predictions = model.transform(dataframe)

Afin d'obtenir le même type que les classes initiales on transforme la colonne `predictions` en `double`

In [None]:
predictions = predictions.withColumn('predictions', predictions['predictions'].cast("double")) 

### 6.

On souhaite calculer l'information mutuelle normalisée (NMI) pour évaluer ce Kmeans. Faute d'avoir trouver une implémentation en `pyspark`, utilisons `scikit-learn` :

In [None]:
from sklearn.metrics import normalized_mutual_info_score as NMI

predictions_pd = predictions.toPandas()
NMI(predictions_pd["NewsgroupIndex"].astype(int), predictions_pd["predictions"])

0.00014202781911687058

Le score NMI étant très faible on utilise un *evaluator* de classification binaire de `pyspark`

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="NewsgroupIndex", rawPredictionCol="predictions")
evaluator.evaluate(predictions)  # c'est ici qu'il a fallu convertir en double

0.4995

Comparons désormais avec le Kmeans de `scikit-learn`, pour cela nous aurons besoin de transformer notre dataframe en `pandas.DataFrame`

In [None]:
dataframe_pd = dataframe.toPandas()

In [None]:
dataframe_pd['features'][0]

SparseVector(30, {0: 1.0, 1: 1.0, 2: 2.0, 3: 4.0, 4: 1.0, 5: 4.0, 6: 1.0, 7: 2.0, 8: 3.0, 9: 3.0, 10: 4.0, 11: 2.0, 12: 14.0, 13: 5.0, 14: 1.0, 15: 4.0, 16: 2.0, 17: 1.0, 18: 4.0, 19: 1.0, 20: 1.0, 21: 1.0, 22: 4.0, 23: 3.0, 24: 4.0, 25: 3.0, 26: 1.0, 27: 4.0, 28: 5.0, 29: 3.0})

Comme les features sont sous forme de SparseVector (non pris en charge par sklearn), nous allons effectuer un Kmeans sur l'array suivante :

In [None]:
import numpy as np

temp = dataframe_pd['features'].apply(lambda x : np.array(x.toArray()))
features = np.array(temp.tolist())

In [None]:
from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=2, random_state=42).fit(features)
predictions_pd = kmeans.predict(features)

In [None]:
NMI(predictions_pd, np.array(dataframe_pd['NewsgroupIndex']))

0.00014202781911687058

##  **Implémentation de K-means unidimensionnel**

### 7

Nous décidons de construire une classe python `UnidimensionalKmeans` contenant une méthode `fit` et une méthode `predict()` comme la plupart des autres implémentations. 

Nous utiliserons les fonctions `compute_centroids`, `assign_clusters` et `squared_distances`.

Le contenu est disponible dans le fichier `kmeans.py`

In [None]:
# Exemple d'utilisation des fonctions

points = sc.parallelize([3,8,1,2,4,14,15,17,5,6])
true_clusters = sc.parallelize([1,1,1,1,1,2,2,2,1,1])

In [None]:
from kmeans import UnidimensionalKmeans

UniKmeans = UnidimensionalKmeans(K=2, itermax=20)
UniKmeans.fit(points)
clusters = UniKmeans.predict(points)

Done ! (in 2 iterations)


In [None]:
clusters.collect() # les indices sont justes différents

[0, 0, 0, 0, 0, 1, 1, 1, 0, 0]

## **Spherical k-means et k-means multidimensionnel**

### 8.

Adaptons notre algorithme et nos fonctions au cas multidimensionnel :

La fonction `compute_centroids()` s'adapte à des `numpy.ndarray` il suffira donc de s'en assurer dans la méthode `MultidimensionnalKmeans` en appliquant : 

```python
RDD = RDD.map(lambda x : np.array(x))
```

Pour la fonction `squared_distances` un simple modification suffit, elle reste toujours fonctionnelle pour `UnidimensionnalKmeans`. 

Cette considération rend d'ailleurs toujours valide `assign_clusters()` pour le cas multidimensionnel sous les mêles hypothèses que  `compute_centroids()`.  


In [None]:
data = np.concatenate((np.random.rand(7,2), np.random.rand(3,2)+2) ,axis = 0)
points = sc.parallelize(data)
true_clusters = sc.parallelize([1,1,1,1,1,1,1,2,2,2])

In [None]:
from kmeans import MultidimensionalKmeans

MultiKmeans = MultidimensionalKmeans(K=2, itermax=20)
MultiKmeans.fit(points)
clusters = MultiKmeans.predict(points)

Done ! (in 2 iterations)


In [None]:
clusters.collect()

[1, 1, 1, 1, 1, 1, 1, 0, 0, 0]

### 9. 

Pour adapter le code et implémenter le Spherical K-means il suffit de remplacer la  distance utilisée (distance euclidienne) par la mesure de similarité cosinus, de considérer les données projetées sur l'hypersphère unité et de définir les centroides comme dans **cet article** :

*Han, Eui-Hong (Sam), et George Karypis. « Centroid-Based Document Classification: Analysis and Experimental Results ». In Principles of Data Mining and Knowledge Discovery, édité par Djamel A. Zighed, Jan Komorowski, et Jan Żytkow, 424‑31. Berlin, Heidelberg: Springer Berlin Heidelberg, 2000.*

[lien de téléchargement](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.31.7900&rep=rep1&type=pdf.)

- `compute_centroids()` peut être utilisée comme telle
- `assign_clusters()` doit être modifié afin d'accueillir la nouvelle distance
`cosin_distances()` comme décrite dans l'article.

On peut alors implémenter `SphericalKmeans` en ajoutant une étape de normalisation et en considérant les deux fonctions précédentes.


In [None]:
from kmeans import SphericalKmeans

SKmeans = SphericalKmeans(K=2, itermax=20)
SKmeans.fit(points)
clusters = SKmeans.predict(points)

Done ! (in 2 iterations)


In [None]:
clusters.collect()

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

On reviendra sur ce résultat.

### 10. 

Evaluons `SphericalKmeans` et `MultidimensionnalKmeans` sur notre dataframe que l'on va reconvertir en RDD.

In [None]:
# De cette manière le rdd est compatible avec les algorithmes précédemment implementés
rdd_labels_features = dataframe.rdd.map(lambda x : (x[0], np.array(x[1].toArray())))

In [None]:
rdd_labels_features.collect()[0]

(0.0,
 array([0., 0., 1., 0., 0., 0., 2., 1., 0., 2., 0., 1., 0., 0., 0., 0., 3.,
        0., 0., 0., 3., 0., 6., 3., 0., 1., 0., 1., 1., 1., 1., 1., 1., 0.,
        1., 4., 0., 0., 1., 2., 0., 0., 0., 0., 0., 1., 0., 0., 0., 1.]))

In [None]:
rdd_labels = rdd_labels_features.map(lambda tup: tup[0])
rdd_features = rdd_labels_features.map(lambda tup: tup[1])

Commençons d'abord par évaluer la NMI donnée par l'algorithme implémenté en 8.

In [None]:
from kmeans import MultidimensionalKmeans, SphericalKmeans

MKmeans = MultidimensionalKmeans(K=2, itermax=20)
MKmeans.fit(rdd_features)
predictions_mkm = MKmeans.predict(rdd_features)

Done ! (in 14 iterations)


In [None]:
from sklearn.metrics import normalized_mutual_info_score as NMI
NMI(predictions_mkm.collect(), rdd_labels.collect())

0.00010039230011272857

L'indice NMI est toujours très bas, essayons le Spherical K-Means

In [None]:
SKmeans = SphericalKmeans(K=2, itermax=20)
SKmeans.fit(rdd_features)
predictions_skm = SKmeans.predict(rdd_features)

Done ! (in 3 iterations)


In [None]:
NMI(predictions_skm.collect(), rdd_labels.collect())

0.0

Les données ne sont peut être probablement pas adaptées à la méthode spherical K_means. Essayons de voir ce que donne d'autres implémentations : 

In [None]:
!pip install coclust -q
!pip install spherecluster -q

In [None]:
# on transforme le rdd en array pour les deux algorithmes suivants
features_np = np.array(rdd_features.collect()) 

In [None]:
!pip install scikit-learn>=0.22 # requis pour spherecluster selon https://github.com/jasonlaska/spherecluster/blob/develop/requirements.txt

(Malgré les packages requis il nous a été impossible d'importer spherecluster) 
```python
ImportError: cannot import name '_k_means' from 'sklearn.cluster' (/usr/local/lib/python3.7/dist-packages/sklearn/cluster/__init__.py)
```

Pour appliquer le shpericalKmeans de **Coclust** il faut d'abord retirer les lignes contenant uniquement des zeros. 

In [None]:
indexes = np.where(~features_np.any(axis=1))[0] 
features_np = np.delete(features_np, indexes, axis=0)

In [None]:
import coclust.clustering # import SphericalKmeans

skm = coclust.clustering.SphericalKmeans(n_clusters=2)
skm.fit(features_np)

 == New init == 
iteration: 0
1679.0253781761419
iteration: 1
1726.7192403032714
iteration: 2
1741.4679774428876
iteration: 3
1741.6268419287965
iteration: 4
1741.6489233580974
iteration: 5
1741.6507395640688
iteration: 6
1741.6513021484093
iteration: 7


In [None]:
NMI(skm.labels_, np.delete(rdd_labels.collect(), indexes, axis=0))

0.0005767608552965573

## **Autres classifications**

### 11.

In [None]:
from pyspark.ml.clustering import LDA, BisectingKMeans

## Latent Dirichlet allocation
lda = LDA(featuresCol='features', k=2, maxIter=10)
model = lda.fit(dataframe)
lda_predicts = model.transform(dataframe)
# topicDistribution sont des vecteurs comprenant des probabilités d'appartenir 
# à une certaine classe : il faut donc récupérer le résultat suivant 
lda_predicts_ = lda_predicts.select('topicDistribution').rdd.map(lambda x: np.argmax(x)).collect()
lda_predicts_tc = lda_predicts.select('NewsgroupIndex').rdd.map(lambda x: x[0]).collect() # true clusters

lda_nmi = NMI(lda_predicts_, lda_predicts_tc)

## Bisecting Kmeans
bkm = BisectingKMeans(featuresCol='features', predictionCol='predictions').setK(2).setSeed(1)
model = bkm.fit(dataframe)
bkm_predicts = model.transform(dataframe)
# on applique la même sorte de transformation
bkm_predicts_ = bkm_predicts.select('predictions').rdd.map(lambda x: x[0]).collect()
bkm_predicts_tc = bkm_predicts.select('NewsgroupIndex').rdd.map(lambda x: x[0]).collect()

bkm_nmi = NMI(bkm_predicts_, bkm_predicts_tc)

print('Score NMI pour la LDA : {}'.format(lda_nmi))
print('Score NMI pour le Bisecting Kmeans : {}'.format(bkm_nmi))

Score NMI pour la LDA : 0.0006127557395871436
Score NMI pour le Bisecting Kmeans : 0.00010039230011272857


### 12.

Pour entrainer des modèles de classification : effectuons d'abord une séparation en ensemble test et d'apprentissage de `dataframe`

In [None]:
train, test = dataframe.randomSplit([0.75, 0.025], seed=42)

In [None]:
train.collect()[200]

Row(NewsgroupIndex=0.0, features=SparseVector(50, {0: 6.0, 1: 2.0, 2: 7.0, 3: 3.0, 4: 4.0, 5: 6.0, 6: 3.0, 8: 2.0, 10: 3.0, 11: 3.0, 12: 3.0, 13: 4.0, 14: 4.0, 15: 3.0, 17: 3.0, 18: 1.0, 19: 3.0, 20: 3.0, 21: 1.0, 22: 18.0, 23: 5.0, 24: 1.0, 25: 1.0, 26: 1.0, 27: 3.0, 28: 2.0, 29: 4.0, 30: 2.0, 31: 3.0, 32: 2.0, 33: 2.0, 34: 8.0, 35: 1.0, 38: 3.0, 40: 4.0, 41: 3.0, 42: 1.0, 43: 3.0, 44: 1.0, 45: 2.0, 47: 2.0, 48: 2.0, 49: 2.0}))

In [None]:
from pyspark.ml.classification import NaiveBayes, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # pour évaluer les modèles

## Naive Bayes
nb = NaiveBayes(featuresCol='features',  labelCol='NewsgroupIndex', 
                predictionCol='prediction', smoothing=1.0, 
                modelType="multinomial")

# entrainement du modèle
model_nb = nb.fit(train)
# predictions sur l'ensemble de test
nb_predicts = model_nb.transform(test)
# evaluation du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="NewsgroupIndex",
                                              predictionCol="prediction",
                                              metricName="accuracy")
nb_acc = evaluator.evaluate(nb_predicts)

## Linear SVM
lsvc = LinearSVC(featuresCol='features',  labelCol='NewsgroupIndex', 
                predictionCol='prediction', maxIter=10, regParam=0.1)

model_lsvc = lsvc.fit(train)
lsvc_predicts = model_lsvc.transform(test)
#evaluation du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="NewsgroupIndex",
                                              predictionCol="prediction",
                                              metricName="accuracy")
lsvc_acc = evaluator.evaluate(nb_predicts)

print("Accuracy Bayésien Naïf : %0.3f" % (nb_acc))
print("Accuracy Linear SVM : %0.3f" % (lsvc_acc))

Accuracy Bayésien Naïf : 0.623
Accuracy Linear SVM : 0.623
