Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [1]:
NAME = "Vladyslav Maksyk"
COLLABORATORS = "None"

---

# Machine Learning in Spark

Following the evolution of Spark, there are two ways to do Machine Learning on Spark :

* MLlib, or `spark.mllib`, was the first ML library implemented in the core Spark library and runs on RDDs. As of today, the library is in maintenance mode, but as we did for RDDs vs DataFrames, it is important that we cover some aspects of the older library. MLlib is also the only library that supports training models for Spark Streaming. 
* ML, or `spark.ml` is now the primary ML library on Spark, and runs on DataFrames. Its API is close to those of other mainstream librairies like scikit-learn.

We will dive into both APIs in this notebook, using the `titanic.csv` file for classification purposes on the `Survived` column.

_I think at this point of your career, you all know what the [Titanic dataset](https://www.kaggle.com/c/titanic/data) is..._

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [3]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.mllib.linalg import VectorUDT

In [4]:
from pyspark.sql.functions import mean

In [5]:
from pyspark.sql import SparkSession, functions as F 
from pyspark.sql.types import *

In [6]:
import re

---
## Data preparation

Even though MLlib is designed with RDDs and DStreams in focus, for ease of transforming the data we will read the data and convert it to a DataFrame. Afterwards we will build RDDs for training in MLlib, or stay in DataFrame for training in ML.

In [7]:
filePath = 'titanic.csv'
data = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(filePath)
#data.show()

In [8]:
data.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


From the first summary statistics, we see that the `Age`, `Cabin` and `Embarked` variables can have null values. Also `PassengerId` and `Ticket` look useless for future predictions.

# Question
  
* Drop `Cabin`, `Ticket` and `PassengerId`
* Using `.na.fill` function on a DataFrame :
    * For `Age`, replace `None` by the mean value for the column. 
    * For `Embarked` columns, replace `None` by the most frequent value for the column. 

In [9]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [10]:
def replace_na(df):
    """
    Deal with na values, and drop selected columns    
    """
    # YOUR CODE HERE
    
    columns_to_drop = ["Cabin","Ticket","PassengerId"]
    df = df.drop(*columns_to_drop)

    df.show()
    mean_val = df.select(mean(df.Age)).collect()
    print(type(mean_val)) #mean_val is a list row object
    print('mean value of Age', mean_val[0][0])
    mean_age = mean_val[0][0]
    #now using mean_age value to fill the nulls in sales column
    df = df.na.fill(mean_age,subset=['Age'])
    df.show()

    
    counts = df.groupBy("Embarked").count().collect()
    print("Embarked counts before Na removal ->",counts)
    
    value, max_count = None, 0
    for count in counts:
        if count[1]>max_count:
            value = count[0]
            max_count = count[1]
    print("value, max_count ->",value, max_count)
    
    df = df.na.fill(value,subset=['Embarked'])
    
    counts2 = df.groupBy("Embarked").count().collect()
    print("Embarked counts after Na removal ->",counts2)

    return df

In [11]:
"""
Graded cell

3 points
"""
result = replace_na(data)
assert float(result.describe().toPandas().loc[2]['Age']) - 13 < 0.1
assert int(result.describe().toPandas().loc[0]['Embarked']) == 891
assert list(result.toPandas().columns.values) == ['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

+--------+------+--------------------+------+----+-----+-----+-------+--------+
|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+--------------------+------+----+-----+-----+-------+--------+
|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|       C|
|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|  7.925|       S|
|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|       S|
|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|    Moran, Mr. James|  male|null|    0|    0| 8.4583|       Q|
|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|11.1333|       S|
|       1|     2|Nasser, Mrs. Nich...|fe

For the following two questions, we will use [Transformers](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#transformers). Technically, a Transformer implements a method `transform()`, which converts one DataFrame into another, generally by appending one or more columns.

Example: 

```python
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])
continuousDataFrame.show()

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
```

Result :

```
+---+-------+
| id|feature|
+---+-------+
|  0|    0.1|
|  1|    0.8|
|  2|    0.2|
+---+-------+

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+
```

**Note:** contrary to previous notebooks, I have not imported all of the libraries needed to solve the remaining exercises. When you want to import a library, please import it in the same notebook cell as where you implement your code, otherwise it may impact the automatic grading.

# Question

Through some regex, the [regex_extract UDF](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) and [SQLTransformer](https://spark.apache.org/docs/2.2.0/ml-features.html#sqltransformer), get the title of a person from the `Name` column in a `Title` column. Drop the `Name` column afterwards.

Example

```
Braund, Mr. Owen       --> Mr
Andria, Doctor. Steve  --> Doctor
```

_Not a hint: while it is perfectly possible to write a custom UDF to solve this question, it breaks the purpose of using Dataframes for cleaning because UDFs don't benefit from SparkSQL's optimizer engine and have to transform back to Java objects for processing. Spark built-in UDFs don't share this problem._

In [12]:
data.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"""Johnston, Miss. Catherine Helen """"Carrie""""""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


In [13]:
data_reserve = data

In [14]:
data_reserve.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### Regulaar Expressions Examples

In [17]:
#Example1
# text = ["March 8,2019", "march 8, 2019", "march 8 2019", 
#         "mar 30 2019", "Countermarch 8,2019 feet", "marched"]

# df = spark.createDataFrame(text, StringType()).toDF("text")

# df = df.withColumn("date_from_text", F.regexp_extract(df.text, r"(\b(?:[M|m]ar(?:ch)?)\b [0-9]+,?(?: |)\d{4})", 0))          
# df.show()

In [18]:
#Example2
# names = ["Braund, Mr. Owen", "Futrelle, Mrs. Jacques Heath (Lily May Peel)", "Heikkinen, Miss. Laina", 
#         "Behr, Mr. Karl Howell", "Graham, Miss. Margaret Edith", "Futrelle, Mrs. Jacques Heath (Lily May Peel)"]
# df = spark.createDataFrame(names, StringType()).toDF("names")

# df = df.withColumn("title_from_name", F.regexp_extract(df.names, '\w+\.', 0))          
# df.show()

In [19]:

# rex = re.compile('\w+\.')
# names = ["Braund, Mr. Owen", "Futrelle, Mrs. Jacques Heath (Lily May Peel)", "Heikkinen, Miss. Laina", 
#         "Behr, Mr. Karl Howell", "Graham, Miss. Margaret Edith", "Futrelle, Mrs. Jacques Heath (Lily May Peel)"]
# result = []
# for name in names:
#     result.append(rex.findall(name))
    
# print(result)

In [20]:
#Example4
# adresses = ["Ema Dough (+1-202-555-0189) - 915 Ridge Street Corpus, TX 78418",
#  "Tom Hitt (+33-93-751-3845) - 9190 Berkshire Ave. Wayne, NJ 07470",
#  "Maya Raine (+49-30-833-931-313) - 18 SW. Sage Ave. Ride, CA 95993"]

# phone_numbers = [] 
# pattern = r"\(([\d\-+]+)\)"

# for adress in adresses: 
#     result = re.search(pattern, adress)
#     phone_numbers.append(result.group(1))
# print(phone_numbers)

### Transforming

In [38]:
def extract_civility(df):
    """
    Return dataframe dropping Name and replacing with Civility
    """
    # YOUR CODE HERE
    data = df.withColumn("Civility", F.regexp_extract(df.Name, '\w+\.', 0)) 
    columns_to_drop = ["Name"]
    data = data.drop(*columns_to_drop)
    #data.show()
    
    return data

In [22]:
"""
Graded cell

4 points
"""
result = extract_civility(data)
resultCols = result.columns
assert 'Name' not in resultCols
assert 'Civility' in resultCols
assert list(result.select('Civility').distinct().toPandas()['Civility'].sort_values().values) == ['Capt.',
 'Col.',
 'Countess.',
 'Don.',
 'Dr.',
 'Jonkheer.',
 'Lady.',
 'Major.',
 'Master.',
 'Miss.',
 'Mlle.',
 'Mme.',
 'Mr.',
 'Mrs.',
 'Ms.',
 'Rev.',
 'Sir.']

+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Civility|
+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|          1|       0|     3|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr.|
|          2|       1|     1|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs.|
|          3|       1|     3|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss.|
|          4|       1|     1|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs.|
|          5|       0|     3|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr.|
|          6|       0|     3|  male|null|    0|    0|          330877| 8.4583| null|       Q|     Mr.|
|          7|       0|     1|  male|54.0|    0|    0|           17463|51.

In [None]:
"""
Graded cell

4 points
"""
result = extract_civility(data)
resultCols = result.columns
assert 'Name' not in resultCols
assert 'Civility' in resultCols
assert list(result.select('Civility').distinct().toPandas()['Civility'].sort_values().values) == [
    'Capt',
    'Col',
    'Don',
    'Dr',
    'Jonkheer',
    'Lady',
    'Major',
    'Master',
    'Miss',
    'Mlle',
    'Mme',
    'Mr',
    'Mrs',
    'Ms',
    'Rev',
    'Sir',
    'the Countess'
]

# Question 

[One hot encode](https://spark.apache.org/docs/2.2.0/ml-features.html#onehotencoder) `Sex`, `Civility` and `Embarked` columns into `SexVec`, `CivilityVec` and `EmbarkedVec`. Don't forget to drop the original columns.

### One Hot Encoding Example

In [24]:

from pyspark.ml.feature import OneHotEncoder, StringIndexer

sqlContext = SQLContext(spark)

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()

NameError: name 'SQLContext' is not defined

In [25]:
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded.select("id", "categoryVec").show()


NameError: name 'indexed' is not defined

## Solution
##### Onehot encode Sex, Civility and Embarked columns into SexVec, CivilityVec and EmbarkedVec

In [28]:
result.show()
df=result

+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Civility|
+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|          1|       0|     3|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr.|
|          2|       1|     1|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs.|
|          3|       1|     3|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss.|
|          4|       1|     1|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs.|
|          5|       0|     3|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr.|
|          6|       0|     3|  male|null|    0|    0|          330877| 8.4583| null|       Q|     Mr.|
|          7|       0|     1|  male|54.0|    0|    0|           17463|51.

In [29]:
# Sex
stringIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()

encoder = OneHotEncoder(dropLast=True, inputCol="SexIndex", outputCol="SexVec")
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded = encoded.drop("SexIndex","Sex")
encoded.show()
df=encoded

+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+--------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Civility|SexIndex|
+-----------+--------+------+------+----+-----+-----+----------------+-------+-----+--------+--------+--------+
|          1|       0|     3|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr.|     0.0|
|          2|       1|     1|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs.|     1.0|
|          3|       1|     3|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss.|     1.0|
|          4|       1|     1|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs.|     1.0|
|          5|       0|     3|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr.|     0.0|
|          6|       0|     3|  male|null|    0|    0|          330877| 8.4583| null|       Q|     Mr.|  

In [30]:
# Civility
stringIndexer = StringIndexer(inputCol="Civility", outputCol="CivilityIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()

encoder = OneHotEncoder(dropLast=False, inputCol="CivilityIndex", outputCol="CivilityVec")
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded = encoded.drop("CivilityIndex", "Civility")
encoded.show()
df=encoded

+-----------+--------+------+----+-----+-----+----------------+-------+-----+--------+--------+-------------+-------------+
|PassengerId|Survived|Pclass| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Civility|       SexVec|CivilityIndex|
+-----------+--------+------+----+-----+-----+----------------+-------+-----+--------+--------+-------------+-------------+
|          1|       0|     3|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr.|(1,[0],[1.0])|          0.0|
|          2|       1|     1|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs.|    (1,[],[])|          2.0|
|          3|       1|     3|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss.|    (1,[],[])|          1.0|
|          4|       1|     1|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs.|    (1,[],[])|          2.0|
|          5|       0|     3|35.0|    0|    0|          373450|   8.05| null|       S|     Mr.|(1,[0],[1.0])|          0.0|
|       

In [31]:
# Embarked
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()

encoder = OneHotEncoder(dropLast=False, inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded = encoded.drop("EmbarkedIndex", "Embarked")
encoded.show()
#encoded.select("Survived","Age" ,"EmbarkedVec").show()

+-----------+--------+------+----+-----+-----+----------------+-------+-----+--------+-------------+--------------+-------------+
|PassengerId|Survived|Pclass| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|       SexVec|   CivilityVec|EmbarkedIndex|
+-----------+--------+------+----+-----+-----+----------------+-------+-----+--------+-------------+--------------+-------------+
|          1|       0|     3|22.0|    1|    0|       A/5 21171|   7.25| null|       S|(1,[0],[1.0])|(17,[0],[1.0])|          0.0|
|          2|       1|     1|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    (1,[],[])|(17,[2],[1.0])|          1.0|
|          3|       1|     3|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|    (1,[],[])|(17,[1],[1.0])|          0.0|
|          4|       1|     1|35.0|    1|    0|          113803|   53.1| C123|       S|    (1,[],[])|(17,[2],[1.0])|          0.0|
|          5|       0|     3|35.0|    0|    0|          373450|   8.05| null|       S|(1,[

In [32]:
encoded.select("Survived","Age" ,"SexVec","EmbarkedVec","CivilityVec").show()

+--------+----+-------------+-------------+--------------+
|Survived| Age|       SexVec|  EmbarkedVec|   CivilityVec|
+--------+----+-------------+-------------+--------------+
|       0|22.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       1|38.0|    (1,[],[])|(3,[1],[1.0])|(17,[2],[1.0])|
|       1|26.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       1|35.0|    (1,[],[])|(3,[0],[1.0])|(17,[2],[1.0])|
|       0|35.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0|null|(1,[0],[1.0])|(3,[2],[1.0])|(17,[0],[1.0])|
|       0|54.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0| 2.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[3],[1.0])|
|       1|27.0|    (1,[],[])|(3,[0],[1.0])|(17,[2],[1.0])|
|       1|14.0|    (1,[],[])|(3,[1],[1.0])|(17,[2],[1.0])|
|       1| 4.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       1|58.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       0|20.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0|39.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0]

In [39]:
def one_hot_encode(df):
    """
    Return dataframe one hot encoding selected columns    
    """
    # YOUR CODE HERE
    
    # Sex
    stringIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
    model = stringIndexer.fit(df)
    indexed = model.transform(df)
    encoder = OneHotEncoder(dropLast=True, inputCol="SexIndex", outputCol="SexVec")
    ohe = encoder.fit(indexed)
    df = ohe.transform(indexed)
    
    # Civility
    stringIndexer = StringIndexer(inputCol="Civility", outputCol="CivilityIndex")
    model = stringIndexer.fit(df)
    indexed = model.transform(df)
    encoder = OneHotEncoder(dropLast=False, inputCol="CivilityIndex", outputCol="CivilityVec")
    ohe = encoder.fit(indexed)
    df = ohe.transform(indexed)
    
    # Embarked
    stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
    model = stringIndexer.fit(df)
    indexed = model.transform(df)
    encoder = OneHotEncoder(dropLast=False, inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
    ohe = encoder.fit(indexed)
    df = ohe.transform(indexed)
    
    df = df.drop("EmbarkedIndex", "Embarked","CivilityIndex", "Civility","SexIndex","Sex")
    df.select("Survived","Age" ,"SexVec","EmbarkedVec","CivilityVec").show()
    return df

In [40]:
"""
Graded cell

4 points
"""
result = one_hot_encode(extract_civility(data))
resultCols = result.columns
assert len(resultCols) == 12

assert 'SexVec' in resultCols
assert 'CivilityVec' in resultCols
assert 'EmbarkedVec' in resultCols

assert 'Sex' not in resultCols
assert 'Civility' not in resultCols
assert 'Embarked' not in resultCols

assert result.schema['SexVec'].simpleString() == 'SexVec:vector'
assert result.schema['CivilityVec'].simpleString() == 'CivilityVec:vector'
assert result.schema['EmbarkedVec'].simpleString() == 'EmbarkedVec:vector'

+--------+----+-------------+-------------+--------------+
|Survived| Age|       SexVec|  EmbarkedVec|   CivilityVec|
+--------+----+-------------+-------------+--------------+
|       0|22.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       1|38.0|    (1,[],[])|(3,[1],[1.0])|(17,[2],[1.0])|
|       1|26.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       1|35.0|    (1,[],[])|(3,[0],[1.0])|(17,[2],[1.0])|
|       0|35.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0|null|(1,[0],[1.0])|(3,[2],[1.0])|(17,[0],[1.0])|
|       0|54.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0| 2.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[3],[1.0])|
|       1|27.0|    (1,[],[])|(3,[0],[1.0])|(17,[2],[1.0])|
|       1|14.0|    (1,[],[])|(3,[1],[1.0])|(17,[2],[1.0])|
|       1| 4.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       1|58.0|    (1,[],[])|(3,[0],[1.0])|(17,[1],[1.0])|
|       0|20.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0])|
|       0|39.0|(1,[0],[1.0])|(3,[0],[1.0])|(17,[0],[1.0]

# Question

Now that we have created all of our numeric features, we need to assemble them into the same column. This is the goal of the [VectorAssembler](https://spark.apache.org/docs/2.2.0/ml-features.html#vectorassembler) transformer.

## Vector assembler example

In [41]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"],outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [45]:
def feature_assemble(df, featureCols):
    """
    Assemble all features in the featureCols list into one column called 'features'.
    """
    # YOUR CODE HERE
    assembler = VectorAssembler(inputCols=featureCols,outputCol="features")
    output = assembler.transform(df)
    print("Assembled columns->" ,featureCols, "to vector column 'features'")
    output.select("features").show(truncate=False)
    return output

In [46]:
"""
Graded cell

2 points
"""
result = feature_assemble(data, ['Pclass', 'SibSp', 'Parch'])
assert 'features' in result.columns
assert result.schema['features'].simpleString() == 'features:vector'

Assembled columns-> ['Pclass', 'SibSp', 'Parch'] to vector column 'features'
+-------------+
|features     |
+-------------+
|[3.0,1.0,0.0]|
|[1.0,1.0,0.0]|
|[3.0,0.0,0.0]|
|[1.0,1.0,0.0]|
|[3.0,0.0,0.0]|
|[3.0,0.0,0.0]|
|[1.0,0.0,0.0]|
|[3.0,3.0,1.0]|
|[3.0,0.0,2.0]|
|[2.0,1.0,0.0]|
|[3.0,1.0,1.0]|
|[1.0,0.0,0.0]|
|[3.0,0.0,0.0]|
|[3.0,1.0,5.0]|
|[3.0,0.0,0.0]|
|[2.0,0.0,0.0]|
|[3.0,4.0,1.0]|
|[2.0,0.0,0.0]|
|[3.0,1.0,0.0]|
|[3.0,0.0,0.0]|
+-------------+
only showing top 20 rows



---
#### All the data preparation has been made. After running the following cell, we can concentrate on running ML modelling.

For comparison purposes, let's try a Logistic Regression from MLlib and ML on the dataset.

In [47]:
# prepare the data !
features = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'SexVec', 'CivilityVec', 'EmbarkedVec']
prepared_data = feature_assemble(one_hot_encode(extract_civility(replace_na(data))), features)
prepared_data = prepared_data.withColumnRenamed("Survived", "label").select(['label', 'features'])
train, test = prepared_data.randomSplit([0.75, 0.25], 0)

train.cache()
test.cache()

+--------+------+--------------------+------+----+-----+-----+-------+--------+
|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+--------------------+------+----+-----+-----+-------+--------+
|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|       C|
|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|  7.925|       S|
|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|       S|
|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|    Moran, Mr. James|  male|null|    0|    0| 8.4583|       Q|
|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|11.1333|       S|
|       1|     2|Nasser, Mrs. Nich...|fe

DataFrame[label: int, features: vector]

In [51]:
train.show(truncate=False)

+-----+-------------------------------------------------------------------------+
|label|features                                                                 |
+-----+-------------------------------------------------------------------------+
|0    |(26,[0,1,2,3,4,5,6,23],[1.0,52.0,1.0,1.0,79.65,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[1.0,64.0,1.0,4.0,263.0,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,19.0,1.0,1.0,36.75,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,23.0,2.0,1.0,11.5,1.0,1.0,1.0])              |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,31.0,1.0,1.0,26.25,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,36.0,1.0,2.0,27.75,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,43.0,1.0,1.0,26.25,1.0,1.0,1.0])             |
|0    |(26,[0,1,2,3,4,5,6,23],[2.0,60.0,1.0,1.0,39.0,1.0,1.0,1.0])              |
|0    |(26,[0,1,2,3,4,5,6,23],[3.0,14.0,4.0,1.0,39.6875,1.0,1.0,1.0])           |
|0    |(26,[0,1,

---
## MLlib - RDD based API

We will first use the RDD-based [Logistic Regression](https://spark.apache.org/docs/2.2.0/mllib-linear-methods.html#logistic-regression). The exercise comes into two steps :

1. First, you must create a RDD of LabeledPoint(label, features). Also careful as we are using `pyspark.ml.linalg.SparseVector` but the RDD-based API expects `pyspark.mllib.linalg.SparseVector`, so we need to [convert it](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=linearregressionwithsgd#pyspark.mllib.linalg.Vectors.fromML).
2. Then you can apply LogisticRegression on it.

# Question

Train a logistic regression model on the train dataset.

In [52]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

In [80]:
def dataframe_to_labeledpoints(df):
    """
    This function takes the conversion from a DataFrame of columns [label, features] to a 
    RDD of LabeledPoint.    
    """
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors
    
    return df.rdd.map(lambda row: LabeledPoint(row[0], Vectors.fromML(row[1])))

def train_mllib_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """    
    # Build the model
    model = LogisticRegressionWithLBFGS.train(train)
    #model = LinearRegressionWithSGD.train(PipelinedRDD, iterations=100, step=0.00000001)
    return model

In [81]:
"""
Graded cell

4 points
"""
from pyspark.mllib.evaluation import BinaryClassificationMetrics

train_rdd = dataframe_to_labeledpoints(train)
test_rdd = dataframe_to_labeledpoints(test)
model = train_mllib_logistic(train_rdd)

predictionAndLabels = test_rdd.map(lambda lp: (float(model.predict(lp.features)), lp.label))

metrics = BinaryClassificationMetrics(predictionAndLabels)
assert metrics.areaUnderROC > 0.75  # I managed ~0.8 on my first try

In [82]:
metrics.areaUnderROC

0.8043478260869564

---
## ML - DataFrame based API

We now compare with using the ML [Logistic regression](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#binomial-logistic-regression). It should work directly on our dataset.

In [72]:
from pyspark.ml.classification import LogisticRegression

In [73]:
def train_ml_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """
    # YOUR CODE HERE
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    lrModel = lr.fit(train)
    
    # Print the coefficients and intercept for logistic regression
    print("Coefficients: " + str(lrModel.coefficients))
    print("Intercept: " + str(lrModel.intercept))
    
    return lrModel 

In [74]:
"""
Graded cell

3 points
"""
from pyspark.ml.evaluation import BinaryClassificationEvaluator

model = train_ml_logistic(train)
assert model.summary.areaUnderROC > 0.75 # managed 0.87 on my first try

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
assert evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}) > 0.75 # managed 0.88 on my first try

Coefficients: (26,[5,6],[-0.06631722609963366,-0.06586495461998135])
Intercept: -0.41982427590474464


In [77]:
evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

0.8251811594202898

In [None]:
spark.stop()