Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [49]:
NAME = "Karamoko"
COLLABORATORS = "Karamoko"

---

# Machine Learning in Spark

Following the evolution of Spark, there are two ways to do Machine Learning on Spark :

* MLlib, or `spark.mllib`, was the first ML library implemented in the core Spark library and runs on RDDs. As of today, the library is in maintenance mode, but as we did for RDDs vs DataFrames, it is important that we cover some aspects of the older library. MLlib is also the only library that supports training models for Spark Streaming. 
* ML, or `spark.ml` is now the primary ML library on Spark, and runs on DataFrames. Its API is close to those of other mainstream librairies like scikit-learn.

We will dive into both APIs in this notebook, using the `titanic.csv` file for classification purposes on the `Survived` column.

_I think at this point of your career, you all know what the [Titanic dataset](https://www.kaggle.com/c/titanic/data) is..._

In [50]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In [51]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.mllib.linalg import VectorUDT

---
## Data preparation

Even though MLlib is designed with RDDs and DStreams in focus, for ease of transforming the data we will read the data and convert it to a DataFrame. Afterwards we will build RDDs for training in MLlib, or stay in DataFrame for training in ML.

In [52]:
filePath = 'titanic.csv'
data = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(filePath)
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [53]:
data.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


From the first summary statistics, we see that the `Age`, `Cabin` and `Embarked` variables can have null values. Also `PassengerId` and `Ticket` look useless for future predictions.

# Question
  
* Drop `Cabin`, `Ticket` and `PassengerId`
* Using `.na.fill` function on a DataFrame :
    * For `Age`, replace `None` by the mean value for the column. 
    * For `Embarked` columns, replace `None` by the most frequent value for the column. 

In [54]:
def replace_na(df):
    """
    Deal with na values, and drop selected columns    
    """
    # Drop the columns that are not useful for prediction
    df = df.drop('Cabin', 'Ticket', 'PassengerId')
    
    # Replace NaN in 'Age' with the mean of the 'Age' column
    mean_age = df.select('Age').agg({'Age': 'mean'}).collect()[0][0]
    df = df.na.fill({'Age': mean_age})
    
    # Replace NaN in 'Embarked' with the most frequent value of 'Embarked'
    most_frequent_embarked = df.groupBy('Embarked').count().orderBy('count', ascending=False).first()['Embarked']
    df = df.na.fill({'Embarked': most_frequent_embarked})
    
    return df
    raise NotImplementedError()

In [55]:
"""
Graded cell

3 points
"""
result = replace_na(data)
assert float(result.describe().toPandas().loc[2]['Age']) - 13 < 0.1
assert int(result.describe().toPandas().loc[0]['Embarked']) == 891
assert list(result.toPandas().columns.values) == ['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

For the following two questions, we will use [Transformers](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#transformers). Technically, a Transformer implements a method `transform()`, which converts one DataFrame into another, generally by appending one or more columns.

Example: 

```python
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])
continuousDataFrame.show()

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
```

Result :

```
+---+-------+
| id|feature|
+---+-------+
|  0|    0.1|
|  1|    0.8|
|  2|    0.2|
+---+-------+

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+
```

**Note:** contrary to previous notebooks, I have not imported all of the libraries needed to solve the remaining exercises. When you want to import a library, please import it in the same notebook cell as where you implement your code, otherwise it may impact the automatic grading.

# Question

Through some regex, the [regex_extract UDF](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) and [SQLTransformer](https://spark.apache.org/docs/latest/ml-features.html#sqltransformer), get the title of a person from the `Name` column in a `Civility` column. Drop the `Name` column afterwards.

Example

```
Braund, Mr. Owen       --> Mr
Andria, Doctor. Steve  --> Doctor
```

_Not a hint: while it is perfectly possible to write a custom UDF to solve this question, it breaks the purpose of using Dataframes for cleaning because UDFs don't benefit from SparkSQL's optimizer engine and have to transform back to Java objects for processing. Spark built-in UDFs don't share this problem._

In [56]:
from pyspark.ml.feature import SQLTransformer
from pyspark.sql.functions import regexp_extract

def extract_civility(df):
    """
    Return dataframe dropping Name and replacing with Title
    """
    # Appliquer une expression régulière pour extraire la civility (titre)
    df = df.withColumn('Civility', regexp_extract('Name', r'^(.*?),', 1))
    
    # Utiliser SQLTransformer pour le reste de la transformation
    sql_transformer = SQLTransformer(
        statement="SELECT * FROM __THIS__"
    )

    # Appliquer le SQLTransformer et supprimer la colonne 'Name'
    transformed_df = sql_transformer.transform(df).drop("Name")
    
    return transformed_df
    raise NotImplementedError()

In [57]:
"""
Graded cell

4 points
"""
result = extract_civility(data)
resultCols = result.columns
assert 'Name' not in resultCols
assert 'Civility' in resultCols
assert list(result.select('Civility').distinct().toPandas()['Civility'].sort_values().values) == [
    'Capt',
    'Col',
    'Don',
    'Dr',
    'Jonkheer',
    'Lady',
    'Major',
    'Master',
    'Miss',
    'Mlle',
    'Mme',
    'Mr',
    'Mrs',
    'Ms',
    'Rev',
    'Sir',
    'the Countess'
]

AssertionError: 

# Question 

[One hot encode](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder) `Sex`, `Civility` and `Embarked` columns into `SexVec`, `CivilityVec` and `EmbarkedVec`. 
- Don't forget to drop the original columns.
- For string type input data, it is necessary to encode categorical features using [StringIndexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer) first, then fit the One Hot Encoder on the transformed dataset.

In [58]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
def one_hot_encode(df):
    """
    Return dataframe one hot encoding selected columns    
    """
    # StringIndexer for 'Sex'
    sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
    sex_encoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")

    # StringIndexer for 'Civility'
    civility_indexer = StringIndexer(inputCol="Civility", outputCol="CivilityIndex")
    civility_encoder = OneHotEncoder(inputCol="CivilityIndex", outputCol="CivilityVec")

    # StringIndexer for 'Embarked'
    embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
    embarked_encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")

    # Create a pipeline to apply the transformations
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, civility_indexer, civility_encoder, embarked_indexer, embarked_encoder])

    # Fit and transform the data using the pipeline
    model = pipeline.fit(df)
    transformed_df = model.transform(df)

    # Drop the original columns and intermediate columns (e.g., SexIndex, CivilityIndex, EmbarkedIndex)
    transformed_df = transformed_df.drop("Sex", "Civility", "Embarked", "SexIndex", "CivilityIndex", "EmbarkedIndex")
    
    return transformed_df
    raise NotImplementedError()

In [59]:
"""
Graded cell

4 points
"""
result = one_hot_encode(extract_civility(data))
resultCols = result.columns
assert len(resultCols) == 12

assert 'SexVec' in resultCols
assert 'CivilityVec' in resultCols
assert 'EmbarkedVec' in resultCols

assert 'Sex' not in resultCols
assert 'Civility' not in resultCols
assert 'Embarked' not in resultCols

assert result.schema['SexVec'].simpleString() == 'SexVec:vector'
assert result.schema['CivilityVec'].simpleString() == 'CivilityVec:vector'
assert result.schema['EmbarkedVec'].simpleString() == 'EmbarkedVec:vector'

# Question

Now that we have created all of our numeric features, we need to assemble them into the same column. This is the goal of the [VectorAssembler](https://spark.apache.org/docs/2.2.0/ml-features.html#vectorassembler) transformer.

In [60]:
from pyspark.ml.feature import VectorAssembler
def feature_assemble(df, featureCols):
    """
    Assemble all features in the featureCols list into one column called 'features'.
    """
    assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
    assembled_df = assembler.transform(df)
    return assembled_df
    raise NotImplementedError()

In [61]:
"""
Graded cell

2 points
"""
result = feature_assemble(data, ['Pclass', 'SibSp', 'Parch'])
assert 'features' in result.columns
assert result.schema['features'].simpleString() == 'features:vector'

---
#### All the data preparation has been made. After running the following cell, we can concentrate on running ML modelling.

For comparison purposes, let's try a Logistic Regression from MLlib and ML on the dataset.

In [62]:
# prepare the data !
features = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'SexVec', 'CivilityVec', 'EmbarkedVec']
prepared_data = feature_assemble(one_hot_encode(extract_civility(replace_na(data))), features)
prepared_data = prepared_data.withColumnRenamed("Survived", "label").select(['label', 'features'])
train, test = prepared_data.randomSplit([0.75, 0.25], 0)

train.cache()
test.cache()

DataFrame[label: int, features: vector]

---
## MLlib - RDD based API

We will first use the RDD-based [Logistic Regression](https://spark.apache.org/docs/2.2.0/mllib-linear-methods.html#logistic-regression). The exercise comes into two steps :

1. First, you must create a RDD of LabeledPoint(label, features). Also careful as we are using `pyspark.ml.linalg.SparseVector` but the RDD-based API expects `pyspark.mllib.linalg.SparseVector`, so we need to [convert it](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=linearregressionwithsgd#pyspark.mllib.linalg.Vectors.fromML).
2. Then you can apply LogisticRegression on it.

# Question

Train a logistic regression model on the train dataset.

In [63]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics
def dataframe_to_labeledpoints(df):
    """
    This function takes the conversion from a DataFrame of columns [label, features] to a RDD of LabeledPoint.    
    """
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors
    
    return df.rdd.map(lambda row: LabeledPoint(row[0], Vectors.fromML(row[1])))

def train_mllib_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """
    # Train the logistic regression model using SGD (Stochastic Gradient Descent)
    model = LogisticRegressionWithSGD.train(train_rdd)
    return model
    raise NotImplementedError()

In [64]:
"""
Graded cell

4 points
"""
from pyspark.mllib.evaluation import BinaryClassificationMetrics

train_rdd = dataframe_to_labeledpoints(train)
test_rdd = dataframe_to_labeledpoints(test)
model = train_mllib_logistic(train_rdd)

predictionAndLabels = test_rdd.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = BinaryClassificationMetrics(predictionAndLabels)

print(f"Test AUC: {metrics.areaUnderROC}")
assert metrics.areaUnderROC > 0.70  # I managed ~0.8 on my first try



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 253.0 failed 1 times, most recent failure: Lost task 0.0 in stage 253.0 (TID 204) (10.70.99.235 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1227, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 174, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\mllib\__init__.py", line 26, in <module>
    import numpy
ModuleNotFoundError: No module named 'numpy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1227, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 174, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "E:\Application\Anaconda\envs\pyspark-tutorial\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\mllib\__init__.py", line 26, in <module>
    import numpy
ModuleNotFoundError: No module named 'numpy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


---
## ML - DataFrame based API

We now compare with using the ML [Logistic regression](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#binomial-logistic-regression). It should work directly on our dataset.

In [65]:
from pyspark.ml.classification import LogisticRegression
def train_ml_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """
    # Initialize the Logistic Regression model
    lr = LogisticRegression(featuresCol='features', labelCol='label')
    
    # Fit the model on the training data
    model = lr.fit(train)
    
    return model
    raise NotImplementedError()

In [66]:
"""
Graded cell

3 points
"""
from pyspark.ml.evaluation import BinaryClassificationEvaluator

model = train_ml_logistic(train)
print(f"Train AUC: {model.summary.areaUnderROC}")
assert model.summary.areaUnderROC > 0.70 # managed 0.87 on my first try

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()

print(f"Test AUC: {evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'})}")
assert evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}) > 0.70 # managed 0.88 on my first try

Train AUC: 0.9992457420924574
Test AUC: 0.7611846250787642


In [67]:
spark.stop()