# Machine learning avec pyspark

Quand on veut faire du machine learning avec Pyspark, on doit transformer notre jeu de données pour qu'au final, il ne contienne qu'une ou deux colonnes : 
- Features, labels (Supervisé)
- Features (Non-supervisé)

Au début, cette transformation est contraignante qu'on a l'habitude de travailler avec pandas. 

Le facteur de succès pour travailler avec pyspark, c'est de s'y retrouve dans la documentation : 
https://spark.apache.org/docs/latest/ml-guide.html

In [40]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark=SparkSession.builder.appName('Titanic').getOrCreate()

In [41]:
df = spark.read.csv('titanic_data.csv',header=True,inferSchema=True)

In [42]:
# Je renomme les colonnes
col_names = [x.lower() for x in df.columns]
df = df.toDF(*col_names)

In [43]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|passengerid|survived|pclass|                name|   sex| age|sibsp|parch|          ticket|   fare|cabin|embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [44]:
# valeurs manquantes
from pyspark.sql.functions import when, count, col, isnull

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|passengerid|survived|pclass|name|sex|age|sibsp|parch|ticket|fare|cabin|embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [45]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCol='age', 
    outputCol='age'
    ).setStrategy("mean")

df = imputer.fit(df).transform(df)

In [46]:
df = df.drop('cabin') # je drop Cabin car trop de valeurs manquantes

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+--------+
|passengerid|survived|pclass|name|sex|age|sibsp|parch|ticket|fare|embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+



In [47]:
df.show()
# Je vais utiliser Pclass, Sex, Age, SibSp, Parch, Fare

+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+
|passengerid|survived|pclass|                name|   sex|              age|sibsp|parch|          ticket|   fare|embarked|
+-----------+--------+------+--------------------+------+-----------------+-----+-----+----------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|             22.0|    1|    0|       A/5 21171|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|             38.0|    1|    0|        PC 17599|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|             26.0|    0|    0|STON/O2. 3101282|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|             35.0|    1|    0|          113803|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|             35.0|    0|    0|          373450|   8.05|       S|
|          6|       0|  

In [48]:
df = df.select(['pclass', 'sex', 'age', 'sibsp', 'parch', 'fare', 'survived'])
df.show()

+------+------+-----------------+-----+-----+-------+--------+
|pclass|   sex|              age|sibsp|parch|   fare|survived|
+------+------+-----------------+-----+-----+-------+--------+
|     3|  male|             22.0|    1|    0|   7.25|       0|
|     1|female|             38.0|    1|    0|71.2833|       1|
|     3|female|             26.0|    0|    0|  7.925|       1|
|     1|female|             35.0|    1|    0|   53.1|       1|
|     3|  male|             35.0|    0|    0|   8.05|       0|
|     3|  male|29.69911764705882|    0|    0| 8.4583|       0|
|     1|  male|             54.0|    0|    0|51.8625|       0|
|     3|  male|              2.0|    3|    1| 21.075|       0|
|     3|female|             27.0|    0|    2|11.1333|       1|
|     2|female|             14.0|    1|    0|30.0708|       1|
|     3|female|              4.0|    1|    1|   16.7|       1|
|     1|female|             58.0|    0|    0|  26.55|       1|
|     3|  male|             20.0|    0|    0|   8.05|  

In [49]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+------+---+---+-----+-----+----+--------+
|pclass|sex|age|sibsp|parch|fare|survived|
+------+---+---+-----+-----+----+--------+
|     0|  0|  0|    0|    0|   0|       0|
+------+---+---+-----+-----+----+--------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler


In [50]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer
indexer=StringIndexer(inputCol="sex",outputCol="sex_indexed")
df_r=indexer.fit(df).transform(df)
df_r.show()

+------+------+-----------------+-----+-----+-------+--------+-----------+
|pclass|   sex|              age|sibsp|parch|   fare|survived|sex_indexed|
+------+------+-----------------+-----+-----+-------+--------+-----------+
|     3|  male|             22.0|    1|    0|   7.25|       0|        0.0|
|     1|female|             38.0|    1|    0|71.2833|       1|        1.0|
|     3|female|             26.0|    0|    0|  7.925|       1|        1.0|
|     1|female|             35.0|    1|    0|   53.1|       1|        1.0|
|     3|  male|             35.0|    0|    0|   8.05|       0|        0.0|
|     3|  male|29.69911764705882|    0|    0| 8.4583|       0|        0.0|
|     1|  male|             54.0|    0|    0|51.8625|       0|        0.0|
|     3|  male|              2.0|    3|    1| 21.075|       0|        0.0|
|     3|female|             27.0|    0|    2|11.1333|       1|        1.0|
|     2|female|             14.0|    1|    0|30.0708|       1|        1.0|
|     3|female|          

In [51]:
features_list = df_r.columns.copy()
features_list.remove('survived')
features_list.remove('sex')

In [52]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=features_list,outputCol="X")
output=featureassembler.transform(df_r)

In [53]:
output.show()

+------+------+-----------------+-----+-----+-------+--------+-----------+--------------------+
|pclass|   sex|              age|sibsp|parch|   fare|survived|sex_indexed|                   X|
+------+------+-----------------+-----+-----+-------+--------+-----------+--------------------+
|     3|  male|             22.0|    1|    0|   7.25|       0|        0.0|[3.0,22.0,1.0,0.0...|
|     1|female|             38.0|    1|    0|71.2833|       1|        1.0|[1.0,38.0,1.0,0.0...|
|     3|female|             26.0|    0|    0|  7.925|       1|        1.0|[3.0,26.0,0.0,0.0...|
|     1|female|             35.0|    1|    0|   53.1|       1|        1.0|[1.0,35.0,1.0,0.0...|
|     3|  male|             35.0|    0|    0|   8.05|       0|        0.0|[3.0,35.0,0.0,0.0...|
|     3|  male|29.69911764705882|    0|    0| 8.4583|       0|        0.0|[3.0,29.699117647...|
|     1|  male|             54.0|    0|    0|51.8625|       0|        0.0|[1.0,54.0,0.0,0.0...|
|     3|  male|              2.0|    3| 

In [54]:
train_data,test_data=output.randomSplit([0.75,0.25])

classifier = LogisticRegression(featuresCol='X', labelCol='survived')
classifier = classifier.fit(train_data)

In [55]:
results = classifier.evaluate(test_data)

In [56]:
results.accuracy

0.75