<img src="./images/pic1.PNG" alt="drawing" width="200"/>

# Summary

We will be using TransmogrifAI to help build a model so we may make a submission to the [Titanic: Machine Learning from Disaster](https://www.kaggle.com/c/titanic) Kaggle competition.  This is an excellent challenge if you are new to data science as it serves to introduce
* feature engineering
* binary classification


Please see [TransmogrifAI Github](https://github.com/salesforce/TransmogrifAI) and [TransmogrifAI Docs](https://docs.transmogrif.ai/en/stable/examples/Titanic-Binary-Classification.html) for the original introductory example using the titanic dataset.  Both examples do the same thing, build a binary classifier, but in a slightly different.  Our walkthrough will be closer to the github example. 

#### Competition Description
The sinking of the RMS Titanic is one of the most infamous shipwrecks in history.  On April 15, 1912, during her maiden voyage, the Titanic sank after colliding with an iceberg, killing 1502 out of 2224 passengers and crew. This sensational tragedy shocked the international community and led to better safety regulations for ships.

One of the reasons that the shipwreck led to such loss of life was that there were not enough lifeboats for the passengers and crew. Although there was some element of luck involved in surviving the sinking, some groups of people were more likely to survive than others, such as women, children, and the upper-class.

In this challenge, we ask you to complete the analysis of what sorts of people were likely to survive. In particular, we ask you to apply the tools of machine learning to predict which passengers survived the tragedy.

If you have completed the challenge previously, please go fill out the [google form](https://goo.gl/forms/0tWOTCBMJE8qZ60H2)

# Check Scala Version

In [1]:
assert(scala.util.Properties.versionString == "version 2.11.12")

# Get TransmogrifAI

In [2]:
%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.5.0

# Get Spark

In [3]:
%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.0

# Get Started

In [4]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf

import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf
import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators


In [5]:
val conf = new SparkConf().setMaster("local[*]").setAppName("automl-app") //.setExecutorEnv(Array( ("memory", "10g"))) // Spark configuration
val sc = new SparkContext(conf)  // initialize spark context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)  // initialize sql context
implicit val spark = SparkSession.builder.config(conf).getOrCreate() // start spark session 
import spark.implicits._

org.apache.spark.sql.SparkSession$implicits$@13360fd8

Everything below intentionally left blank

# Get Data

In [6]:
val readerOptions = Map( "inferSchema" -> "true", "header" -> "true")

In [7]:
val rawData = sqlContext.read.format("csv").options(readerOptions).load("data/train.csv")
rawData.printSchema
rawData.show(5)

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   

null

In [8]:
val doNotTouch = rawData.columns.splitAt(3)._2

[Name, Sex, Age, SibSp, Parch, Ticket, Fare, Cabin, Embarked]

In [9]:
val query = s"""
    select PassengerId, 
    cast(Survived as double) as Survived,
    cast(Pclass as string) as Pclass, 
    ${doNotTouch.mkString(",")}
    from data
"""


select PassengerId,
cast(Survived as double) as Survived,
cast(Pclass as string) as Pclass,
Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
from data


In [10]:
rawData.createOrReplaceTempView("data")

In [11]:
val passengerData = spark.sql(query)
passengerData.printSchema
passengerData.show(5)

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|     0.0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|     1.0|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|     

null

In [12]:
passengerData.filter("Survived = 1").count / passengerData.count.toDouble

0.3838383838383838

# Declare Target and Features

In [13]:
val (target, features) = FeatureBuilder.fromDataFrame[RealNN](passengerData, response = "Survived")
OutputCell.HIDDEN

In [14]:
target

Feature(name = Survived, uid = RealNN_000000000002, isResponse = true, originStage = FeatureGeneratorStage_000000000002, parents = [], distributions = [])

In [15]:
features.foreach(feature => println(feature.name, feature.uid))

(PassengerId,Integral_000000000001)
(Pclass,Text_000000000003)
(Name,Text_000000000004)
(Sex,Text_000000000005)
(Age,Real_000000000006)
(SibSp,Integral_000000000007)
(Parch,Integral_000000000008)
(Ticket,Text_000000000009)
(Fare,Real_00000000000a)
(Cabin,Text_00000000000b)
(Embarked,Text_00000000000c)


# Transmogrify (Feature Engineering)

In [16]:
filter( lambda x: x > 2, ls) rera

<console>: 1

In [17]:
val id = features(0)
val featureVector = features.filter{ feature => feature.name != "PassengerId"}.transmogrify()

Feature(name = Age-Cabin-Embarked-Fare-Name-Parch-Pclass-Sex-SibSp-Ticket_4-stagesApplied_OPVector_000000000010, uid = OPVector_000000000010, isResponse = false, originStage = VectorsCombiner_000000000010, parents = [OPVector_00000000000d,OPVector_00000000000e,OPVector_00000000000f], distributions = [])

In [18]:
featureVector.prettyParentStages

+-- combVec
|    +-- smartTxtVec
|    |    +-- ConcatText(Embarked)
|    |    +-- ConcatText(Cabin)
|    |    +-- ConcatText(Ticket)
|    |    +-- ConcatText(Sex)
|    |    +-- ConcatText(Name)
|    |    +-- ConcatText(Pclass)
|    +-- vecReal
|    |    +-- SumReal(Fare)
|    |    +-- SumReal(Age)
|    +-- vecInt
|    |    +-- SumIntegral(Parch)
|    |    +-- SumIntegral(SibSp)


In [19]:
featureVector.originStage

VectorsCombiner_000000000010

# Sanity Check (Feature Refinement)

In [20]:
val checkedFeatureVector = target.sanityCheck(featureVector, removeBadFeatures=true)

Feature(name = Age-Cabin-Embarked-Fare-Name-Parch-Pclass-Sex-SibSp-Survived-Ticket_5-stagesApplied_OPVector_000000000011, uid = OPVector_000000000011, isResponse = false, originStage = SanityChecker_000000000011, parents = [RealNN_000000000002,OPVector_000000000010], distributions = [])

In [21]:
checkedFeatureVector.originStage.explainParams.split("\n").mkString("\n\n")

categoricalLabel: If true, then label is treated as categorical (eg. Cramer's V will be calculated between it and categorical features). If this is not set, then use a max class fraction of 0.1 to estimate whether label iscategorical or not. (undefined)

checkSample: Rate to downsample the data for statistical calculations (note: actual sampling will not be exact due to Spark's dataset sampling behavior) (default: 1.0, current: 1.0)

correlationExclusion: Setting for what categories of feature vector columns to exclude from the correlation calculation (default: NoExclusion, current: NoExclusion)

correlationType: Which coefficient to use for computing correlation (default: Pearson, current: Pearson)

featureLabelCorrOnly: If true, then only calculate the correlations between the features and the label. Otherwise, calculate the entire correlation matrix, which includes all feature-feature correlations. (default: false, current: false)

inputFeatures: Input features (default: [Lcom.sales

# Model Selection

In [22]:
val modelSelector = BinaryClassificationModelSelector.withCrossValidation(
    numFolds=3, parallelism=8
).setInput(
    (target,checkedFeatureVector )
).setOutputFeatureName("prediction")

ModelSelector_00000000001d

In [23]:
modelSelector.validator.getParams

In [24]:
modelSelector.models.foreach(println)

(OpLogisticRegression_000000000016,[Lorg.apache.spark.ml.param.ParamMap;@2f506d41)
(OpRandomForestClassifier_000000000017,[Lorg.apache.spark.ml.param.ParamMap;@324bdf5d)
(OpGBTClassifier_000000000018,[Lorg.apache.spark.ml.param.ParamMap;@3bd1bdde)
(OpLinearSVC_000000000019,[Lorg.apache.spark.ml.param.ParamMap;@54feb1e9)


In [25]:
modelSelector.models.map{ model => model._2.length}.sum

48

In [26]:
val tup = ("string1", "string2")
tup._1 
tup._2

string2

The following was done so that we would just run through each estimator, each with 1 set of hyperparameters so that everything would execute in a timely fashion for the meeting.  

In [27]:
val limitedModelsToTry = modelSelector.models.map{ model => (model._1, model._2.take(1))}

[[(OpLogisticRegression_000000000016,[Lorg.apache.spark.ml.param.ParamMap;@7f960aa6), (OpRandomForestClassifier_000000000017,[Lorg.apache.spark.ml.param.ParamMap;@2808a57b), (OpGBTClassifier_000000000018,[Lorg.apache.spark.ml.param.ParamMap;@32bfe32c), (OpLinearSVC_000000000019,[Lorg.apache.spark.ml.param.ParamMap;@ff531bd)]]

In [28]:
val limitedModelSelector = BinaryClassificationModelSelector.withCrossValidation(
    numFolds=3, parallelism=8, modelsAndParameters = limitedModelsToTry
).setInput(
    (target,checkedFeatureVector )
).setOutputFeatureName("prediction")

ModelSelector_000000000022

If you want to run this against all models and hyperparameters commend and uncomment the 1st and 2nd lines of code accordingly.  

In [29]:
val prediction = limitedModelSelector.getOutput()
// val prediction = modelSelector.getOutput()

Feature(name = prediction, uid = Prediction_000000000022, isResponse = true, originStage = ModelSelector_000000000022, parents = [RealNN_000000000002,OPVector_000000000011], distributions = [])



# Setting up a TransmogrifAI Workflow

In [30]:
val workflow = new OpWorkflow().setResultFeatures(id, prediction)

com.salesforce.op.OpWorkflow@6cb2a8ef

# Train a Workflow

In [31]:
val Array(train, test) = passengerData.randomSplit( Array( 0.7, 0.3)) 

[PassengerId: int, Survived: double ... 10 more fields]

In [32]:
println(train.count)
println(test.count)

605
286


In [33]:
workflow.setInputDataset(train)

val fittedWorkflow = workflow.train()

com.salesforce.op.OpWorkflowModel@1ecb95bc

In [34]:
println(fittedWorkflow.summaryPretty())

Evaluated OpGBTClassifier, OpRandomForestClassifier, OpLogisticRegression, OpLinearSVC models using Cross Validation and area under precision-recall metric.
Evaluated 1 OpGBTClassifier model with area under precision-recall metric between [0.8564210197461917, 0.8564210197461917].
Evaluated 1 OpRandomForestClassifier model with area under precision-recall metric between [0.81747725163217, 0.81747725163217].
Evaluated 1 OpLogisticRegression model with area under precision-recall metric between [0.7955008409293264, 0.7955008409293264].
Evaluated 1 OpLinearSVC model with area under precision-recall metric between [0.7839172485128754, 0.7839172485128754].
+--------------------------------------------------------+
|            Selected Model - OpGBTClassifier            |
+--------------------------------------------------------+
| Model Param           | Value                          |
+-----------------------+--------------------------------+
| cacheNodeIds          | false               

# Evaluate on Test Data

In [35]:
fittedWorkflow.setInputDataset(test)

com.salesforce.op.OpWorkflowModel@1ecb95bc

In [38]:
val eval = Evaluators.BinaryClassification().setLabelCol(target).setPredictionCol(prediction)

OpBinaryClassificationEvaluator_000000000048

In [39]:
val metrics = fittedWorkflow.evaluate(eval)

{
  "Precision" : 0.7428571428571429,
  "Recall" : 0.7090909090909091,
  "F1" : 0.7255813953488373,
  "AuROC" : 0.8305526859504131,
  "AuPR" : 0.7956716812102792,
  "Error" : 0.2062937062937063,
  "TP" : 78.0,
  "TN" : 149.0,
  "FP" : 27.0,
  "FN" : 32.0,
  "thresholds" : [ 0.9702154349918662, 0.9670782573697327, 0.9418867915882827, 0.9417846471900747, 0.9403493518903523, 0.9393256965416754, 0.93519985322478, 0.9334282956102187, 0.9325007085834183, 0.9303093588668164, 0.9291263945418542, 0.9271557149731809, 0.9264380132264233, 0.9263830752293651, 0.9233095393848113, 0.9228231957603084, 0.9208319031079146, 0.92082009018026, 0.9192601698015893, 0.919015584038735, 0.9180178902399556, 0.9176832532469993, 0.9176142974416633, 0.916973578591742, 0.9161449577543586, 0.9126239680886669, 0.9106484616507198, 0.9104604256896982, 0.9068812716149011, 0.9059923149041862, 0.9055188001669879, 0.9022772621208082, 0.9018078938904991, 0.9015801018564829, 0.8999272262987055, 0.8986158874776211, 

In [40]:
metrics.toMap

# Kaggle Test Data

In [43]:
val rawKaggleData = sqlContext.read.format("csv").options(readerOptions).load("data/test.csv").selectExpr("*", "1d as Survived")
rawKaggleData.createOrReplaceTempView("data")

null

In [44]:
val kaggle = spark.sql(query)
kaggle.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     1.0|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|     1.0|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| null|       S|
|        894|     1.0|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| null|       Q|
|        895|     1.0|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| null|       S|
|        896|     1.0|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



null

In [45]:
fittedWorkflow.setInputDataset(kaggle)

val scoredKaggle = fittedWorkflow.score().select("PassengerId", "prediction")
scoredKaggle.show(5)

+-----------+--------------------+
|PassengerId|          prediction|
+-----------+--------------------+
|        892|[probability_1 ->...|
|        893|[probability_1 ->...|
|        894|[probability_1 ->...|
|        895|[probability_1 ->...|
|        896|[probability_1 ->...|
+-----------+--------------------+
only showing top 5 rows



null

In [46]:
scoredKaggle.take(1)

[[892,Map(probability_1 -> 0.08294475215166364, probability_0 -> 0.9170552478483364, rawPrediction_0 -> 1.2014964847389487, prediction -> 0.0, rawPrediction_1 -> -1.2014964847389487)]]

In [48]:
val getScore = udf{ map: Map[String, Double] => if(map("probability_1") > 0.5) 1 else 0 }

UserDefinedFunction(<function1>,IntegerType,Some(List(MapType(StringType,DoubleType,false))))

In [52]:
scoredKaggle.withColumn( 
    "SurvivalPrediction", getScore( scoredKaggle.col("prediction")) 
).select("PassengerId", "SurvivalPrediction").repartition(1).write.csv("myprediction_1")