# Model definition - deep learning algorithm

- Choose, justify and apply a model performance indicator (e.g. F1 score, true positive rate, within cluster sum of squared error, …) to assess your model and justify the choice of an algorithm

- Implement your algorithm in at least one deep learning and at least one non-deep learning algorithm, compare and document model performance

- Apply at least one additional iteration in the process model involving at least the feature creation task and record impact on model performance (e.g. data normalizing, PCA, …)

Depending on the algorithm class and data set size you might choose specific technologies / frameworks to solve your problem. Please document all your decisions in the ADD (Architectural Decisions Document).

Once you think you have achieved a descent model performance save the notebook according to the process model’s naming convention and proceed to the model training task.

## 1. Load the dataset (train and test separately)
- Loading the locally processed data set with feature engineering but not with the dummy encoding from pandas

## 2. Model performance indicator
## 3. Keras Sequential model
## 4. Additional iteration in the process model

## Import Modules

In [1]:
import ibmos2spark
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import IntegerType

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20181230185459-0002
KERNEL_ID = 28a4e7d9-e634-42e5-bf86-be7905cd4e44


## 1. Load the dataset (train and test separately)
- Loading the locally processed data set with feature engineering but not with the dummy encoding from pandas

In [2]:
# The code was removed by Watson Studio for sharing.

## 2. Model performance indicator

F1 Score because one seeks a balance between Precision and Recall AND there is an uneven class distribution. F1 score = 2 * (precision x recall)/(precision + recall).

**precision** is true positive / (true positive + false positive)

Precision talks about how precise/accurate the model is out of those predicted positive, how many of them are actual positive.

**recall** is true positive / (true positive + false negative)

Recall calculates how many of the Actual Positives the model capture through labeling it as Positive (True Positive)

## 3. Keras Sequential model

- Some more processing (indexing, type conversion, ...) is required to process the columns.

### Processing 
- Survived in training only: StringIndexer to convert from string to index, drop column 
- Pclass: StringIndexer to convert from string to index, drop column 
- Sex: StringIndexer to convert from male/female to 1/9, drop column 
- Age: Convert to float
- Name: Drop, one does not need it
- Ticket: Drop, one does not need it
- Cabin: Drop, not needed
- SibSp: Convert to integer
- Parch: Convert to integer
- Fare: Conver to float
- Embarked: StringIndexer to convert from string to index, drop column 
- Title: StringIndexer to convert from string to index, drop column 
- Num_cabins: Convert to integer
- Cabin_letter: StringIndexer to convert from string to index, drop column
- Ticket_letters: StringIndexer to convert from string to index, drop column
- TravelGroup: StringIndexer to convert from string to index, drop column

In [23]:
# Survived only for the training data
indexer = StringIndexer(inputCol='Survived', outputCol='SurvivedIndex')
df_train = indexer.fit(df_train_load).transform(df_train_load)
df_train = df_train.drop('Survived')

In [24]:
# Pclass
indexer = StringIndexer(inputCol='Pclass', outputCol='PclassIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test_load).transform(df_test_load)
df_train = df_train.drop('Pclass')
df_test = df_test.drop('Pclass')

In [25]:
# Sex
indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('Sex')
df_test = df_test.drop('Sex')


In [26]:
# Age
df_train = df_train.withColumn("Age", df_train["Age"].cast("float"))
df_test = df_test.withColumn("Age", df_test["Age"].cast("float"))

In [27]:
# Name
df_train = df_train.drop('Name')
df_test = df_test.drop('Name')

In [28]:
# Ticket
df_train = df_train.drop('Ticket')
df_test = df_test.drop('Ticket')

In [29]:
# Cabin
df_train = df_train.drop('Cabin')
df_test = df_test.drop('Cabin')

In [30]:
# SibSp
df_train = df_train.withColumn("SibSp", df_train["SibSp"].cast(IntegerType()))
df_test = df_test.withColumn("SibSp", df_test["SibSp"].cast(IntegerType()))

In [31]:
# Parch
df_train = df_train.withColumn("Parch", df_train["Parch"].cast(IntegerType()))
df_test = df_test.withColumn("Parch", df_test["Parch"].cast(IntegerType()))

In [32]:
# Fare
df_train = df_train.withColumn("Fare", df_train["Fare"].cast("float"))
df_test = df_test.withColumn("Fare", df_test["Fare"].cast("float"))

In [33]:
# Embarked
indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('Embarked')
df_test = df_test.drop('Embarked')

In [34]:
# Title
indexer = StringIndexer(inputCol='Title', outputCol='TitleIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('Title')
df_test = df_test.drop('Title')

In [35]:
# Num_cabins
df_train = df_train.withColumn("Num_cabins", df_train["Num_cabins"].cast(IntegerType()))
df_test = df_test.withColumn("Num_cabins", df_test["Num_cabins"].cast(IntegerType()))

In [36]:
# Cabin_letter
indexer = StringIndexer(inputCol='Cabin_letter', outputCol='Cabin_letterIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('Cabin_letter')
df_test = df_test.drop('Cabin_letter')

In [37]:
# Ticket_letters
indexer = StringIndexer(inputCol='Ticket_letters', outputCol='Ticket_lettersIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('Ticket_letters')
df_test = df_test.drop('Ticket_letters')

In [38]:
df_train.show(5)

+-----------+----+-----+-----+-------+----------+-----------+-------------+-----------+--------+-------------+----------+-----------------+-------------------+
|PassengerId| Age|SibSp|Parch|   Fare|Num_cabins|TravelGroup|SurvivedIndex|PclassIndex|SexIndex|EmbarkedIndex|TitleIndex|Cabin_letterIndex|Ticket_lettersIndex|
+-----------+----+-----+-----+-------+----------+-----------+-------------+-----------+--------+-------------+----------+-----------------+-------------------+
|          1|22.0|    1|    0|   7.25|         0|        Duo|          0.0|        0.0|     0.0|          0.0|       0.0|              0.0|                3.0|
|          2|38.0|    1|    0|71.2833|         1|        Duo|          1.0|        1.0|     1.0|          1.0|       2.0|              1.0|                1.0|
|          3|26.0|    0|    0|  7.925|         0|      Alone|          1.0|        0.0|     1.0|          0.0|       1.0|              0.0|                4.0|
|          4|35.0|    1|    0|   53.1|  

In [43]:
df_train.select("TravelGroup").distinct().show()

+-----------+
|TravelGroup|
+-----------+
|       null|
|        Duo|
| GroupSmall|
|      Alone|
| GroupLarge|
+-----------+



In [19]:
# TravelGroup
indexer = StringIndexer(inputCol='TravelGroup', outputCol='TravelGroupIndex')
df_train = indexer.fit(df_train).transform(df_train)
df_test = indexer.fit(df_test).transform(df_test)
df_train = df_train.drop('TravelGroup')
df_test = df_test.drop('TravelGroup')

In [20]:
df_train.show(5)

Py4JJavaError: An error occurred while calling o658.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 4 times, most recent failure: Lost task 0.3 in stage 33.0 (TID 36, 172.30.235.11, executor 1): org.apache.spark.SparkException: Failed to execute user defined function(: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(Thread.java:811)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:251)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:246)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
	at java.lang.reflect.Method.invoke(Method.java:508)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:811)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:251)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:246)
	... 19 more
