#### Download the titanic dataset

A dataset for classification, predicting the survival of passengers

In [ ]:
val remote = "https://s3-eu-west-1.amazonaws.com/kensuio-training/data/titanic.csv"
val local = "titanic.csv"

In [ ]:
val dataDir = sys.props("java.io.tmpdir") + "/data/logistic_regression"
new java.io.File(dataDir).mkdirs()

Remove file, because download will fail if it already exists

In [ ]:
:sh rm ${dataDir}/$local

In [ ]:
val source = scala.io.Source.fromURL(remote)
val f = new java.io.FileWriter(new java.io.File(s"${dataDir}/$local"), false)
source.foreach(f.append(_))
f.close

Read the csv file, letting spark infer the schema

In [ ]:
val csvDF = sparkSession.read.option("inferSchema", true)
                             .option("header", true)
                             .csv(s"${dataDir}/$local")
                             .filter(! isnull($"survived"))         

## Explore the data

First look at column names, then assess the nature of data:
* numerical or categorical
* missing values (null)
* relevant columns
* new features needed

In [ ]:
csvDF.columns


A view of distinct values in a column give an idea of how to use them.
The column type as well.

In [ ]:
csvDF.select("pclass").distinct

In [ ]:
csvDF.filter(isnull($"pclass")).count

In [ ]:
csvDF.select("survived").distinct

In [ ]:
csvDF.filter(isnull($"survived")).count

In [ ]:
csvDF.select("sex").distinct

In [ ]:
csvDF.filter(isnull($"sex")).count

In [ ]:
csvDF.select("age").distinct

In [ ]:
csvDF.filter(isnull($"age")).count

In [ ]:
csvDF.select("sibsp").distinct

In [ ]:
csvDF.filter(isnull($"sibsp")).count

In [ ]:
csvDF.select("parch").distinct

In [ ]:
csvDF.filter(isnull($"parch")).count

In [ ]:
csvDF.select("ticket").distinct

In [ ]:
csvDF.select("fare").distinct

In [ ]:
csvDF.filter(isnull($"fare")).count

In [ ]:
csvDF.select("cabin").distinct

In [ ]:
csvDF.select("name").take(10)

## Features transformations

We want to extract the title from the name, e.g. Mr or Miss. These could influence survival.

We are in the world of spark SQL, where UDF (User Defined Functions) must be created to perform custom transformations on columns.

#### Title extraction as a new feature

In [ ]:
val Pattern = ".*, (.*?)\\..*".r
val title: (String => String) = {
  case Pattern(t) => t
  case _ => ""
}
val titleUDF = udf(title)

In [ ]:
csvDF.select("name").withColumn("title", titleUDF(col("name")))
     .select("title").distinct

#### filling missing values

As we have missing data in the age and fare columns (numerical data), we will replace these with the column average

In [ ]:
val avgAge = csvDF.select("age")
  .agg(avg("age"))
  .collect() match {
  case Array(Row(avg: Double)) => avg
  case _ => 0
}

In [ ]:
val avgFare = csvDF.select("fare")
  .agg(avg("fare"))
  .collect() match {
  case Array(Row(avg: Double)) => avg
  case _ => 0
}

In [ ]:
val dfFilled = csvDF.withColumnRenamed("home.dest", "dest")
                    .na.fill(Map("fare" -> avgFare, "age" -> avgAge))

In [ ]:
dfFilled.filter($"name" === "Baumann, Mr. John D").select("age").show

In [ ]:
val frame = dfFilled.withColumn("title", titleUDF(col("name")))

## Now create clean categorical representations of features

Categorical variables to be used are:
* survived
* pclass
* title
* sex

We will not use:
* name
* ticket
* cabin
* embarked
* boat
* body
* home.dest

#### Spark ml feature transformers and extractors

We will use some of the Spark ml feature manipulation Classes:

In [ ]:
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.IndexToString
import org.apache.spark.ml.feature.VectorAssembler

In [ ]:
val categoricalFeatColNames = Seq("pclass", "title", "sex")
val stringIndexers = categoricalFeatColNames.map { colName =>
  new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "Indexed")
    .fit(frame)
}

In [ ]:
val labelIndexer = new StringIndexer()
.setInputCol("survived")
.setOutputCol("survivedIndexed")
.fit(frame)


In [ ]:
val numericFeatColNames = Seq("age", "sibsp", "parch", "fare")
val idxdCategoricalFeatColName = categoricalFeatColNames.map(_ + "Indexed")
val allIdxdFeatColNames = numericFeatColNames ++ idxdCategoricalFeatColName
val assembler = new VectorAssembler()
  .setInputCols(Array(allIdxdFeatColNames: _*))
  .setOutputCol("features")

In [ ]:

import org.apache.spark.ml.classification.{LogisticRegression,LogisticRegressionModel}
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary
import org.apache.spark.ml.Pipeline

In [ ]:
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.0)
  .setLabelCol("survivedIndexed")
  .setFeaturesCol("features")

In [ ]:
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

In [ ]:
val Array(training, test) = frame.randomSplit(Array(8, 2))

In [ ]:
val featurizazer = (stringIndexers :+ labelIndexer :+ assembler).toArray


In [ ]:
def featurizaze(df: DataFrame, zazer: List[org.apache.spark.ml.Transformer]): DataFrame = zazer match {
  case head :: tail => featurizaze(head.transform(df), tail)
  case Nil => df
}

In [ ]:
val trainingFeat = featurizaze(training, featurizazer.toList)
val testFeat = featurizaze(test, featurizazer.toList)

In [ ]:
val model = lr.fit(trainingFeat)

In [ ]:
val modelEval = model.evaluate(testFeat)
                     .asInstanceOf[BinaryLogisticRegressionSummary]

In [ ]:
modelEval.areaUnderROC

In [ ]:
modelEval.roc.collect

### Using the Pipeline concept (parametrizable and tunable)

A Pipeline is and oirdered sequence of transformers and evaluators

In [ ]:
val pipeline = new Pipeline().setStages(
      (stringIndexers :+ labelIndexer :+ assembler :+ lr :+ labelConverter).toArray)

In [ ]:
val model = pipeline.fit(training)

In [ ]:
val predictions = model.transform(test)

In [ ]:
predictions.select("survived", "predictedLabel").collect

## k-fold cross validation & grid search

In [ ]:
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

In [ ]:
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(1, 0.1, 0.01))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.33, 0.66, 1.0))
  .build()

In [ ]:
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("survivedIndexed")

In [ ]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

In [ ]:
val crossValidatorModel = cv.fit(training)

In [ ]:
val predictions = crossValidatorModel.transform(test)

In [ ]:
predictions.select("survived", "predictedLabel").collect

In [ ]:
evaluator.evaluate(predictions)