Total: 47/50

Imports

In [1]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType, IntegerType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.rdd
import org.apache.spark.sql.DataFrameReader
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.DataFrameNaFunctions
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import scala.collection.mutable
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} 
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.feature.Binarizer

1) Did age have any affect on the survival of the passengers? Divide the passengers into age groups spanning 5 years each - [0, 5), [5, 10), [10, 15), … . For each group compute the number of passengers in each group. Then compute the percent of survivors in each group

In [2]:
val reader = spark.read

val titanic = reader.format("csv").option("header",true).
option("delimiter", "\t").option("inferschema",true).load("titanic.tsv").
withColumnRenamed("home.dest","homeDest")

val averageAge = titanic.select(avg(col("age")))
val averageCalculatedAge = averageAge.first().getDouble(0)
val titanicFilled = titanic.na.fill(averageCalculatedAge,Seq("age"))

val splits = Array(0.0,5.0,10.0,15.0,20.0,25.0,30.0,35.0,40.0,45.0,50.0,55.0,60.0,65.0,70.0,75.0,80.0,85.0,90.0)

val bucketizer = new Bucketizer().
setInputCol("age").
setOutputCol("bucketedAge").
setSplits(splits)

val bucketedData = bucketizer.transform(titanicFilled)

val everyOne = bucketedData.groupBy("bucketedAge").count().
withColumnRenamed("count","Count of All")

val alive = bucketedData.where("survived == 'y'").groupBy("bucketedAge").count().
withColumnRenamed("count","Count of Survived")

val finalTable = everyOne.join(alive, Seq("bucketedAge")).sort("bucketedAge")
val PercentageSurvivors = finalTable.select(col("bucketedAge"),col("Count of All"),col("Count of Survived"),((col("Count of Survived")/col("Count of All"))*100).as("Percent of Survivors - %"))

val rangeDataFrame = spark.createDataFrame(Seq((0.0, "[0,5)"),(1.0, "[5,10)"),(2.0, "[10,15)"),(3.0, "[15,20)"),(4.0, "[20,25)"),(5.0, "[25,30)")
,(6.0, "[30,35)"),(7.0, "[35,40)"),(8.0, "[40,45)"),(9.0, "[45,50)"),(10.0, "[55,60)"),(11.0, "[60,65)")
,(12.0, "[65,70)"),(13.0, "[70,75)"),(14.0, "[75,80)"),(15.0, "[80,85)"), (16.0, "[85,90)"))).toDF("bucketedAge","Age Groups")

val resultOneR1  = rangeDataFrame.join(PercentageSurvivors, Seq("bucketedAge"))
val resultOneR = resultOneR1.drop(resultOneR1.col("bucketedAge")) 
resultOneR.show


+----------+------------+-----------------+------------------------+
|Age Groups|Count of All|Count of Survived|Percent of Survivors - %|
+----------+------------+-----------------+------------------------+
|     [0,5)|          50|               32|                    64.0|
|    [5,10)|          31|               17|       54.83870967741935|
|   [10,15)|          27|               11|       40.74074074074074|
|   [15,20)|         116|               45|      38.793103448275865|
|   [20,25)|         184|               71|       38.58695652173913|
|   [25,30)|         424|              130|      30.660377358490564|
|   [30,35)|         132|               54|      40.909090909090914|
|   [35,40)|         100|               44|                    44.0|
|   [40,45)|          69|               20|      28.985507246376812|
|   [45,50)|          66|               32|      48.484848484848484|
|   [55,60)|          43|               21|      48.837209302325576|
|   [60,65)|          27|         

In [None]:
-1: Hard coding for age groups.
-1: Determining correlation would have produced accurate results.

# Inference
- It can be deduced from the above table that children (0 to 18) have approximately 50% chances of survival and old aged people (60 to 90) have approximately 79% chances of survival while adults (18 to 59) have 41% chances of survival.

For the following problems divide the data into a training set and a test set. After you have created your models in problems 2-4 compute the percent false positives and false negatives you get from your model on the test set.

2) Logistic on age. Using logistic regression with independent variable age and dependent variable survived create a model to classify passengers as survivors.

In [7]:
val reader = spark.read

val titanicTwo = reader.format("csv").option("header",true).
option("delimiter", "\t").option("inferschema",true).load("titanic.tsv").
withColumnRenamed("home.dest","homeDest")

val averageAge = titanicTwo.select(avg(col("age")))
val averageCalculatedAge = averageAge.first().getDouble(0)
val titanicFilledTwo = titanicTwo.na.fill(averageCalculatedAge,Seq("age"))

val survivedFormula = new RFormula().setFormula("survived ~ age")

val fittedRF = survivedFormula.fit(titanicFilledTwo)
val preparedDF = fittedRF.transform(titanicFilledTwo)

val preparedFilteredDF = preparedDF.select(col("age"),col("survived"),col("features"),col("label"))

val Array(train,test) = preparedFilteredDF.randomSplit(Array(0.7,0.3))

val lr = new LogisticRegression()
val lrModel = lr.fit(train)
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

val resultPredictions = lrModel.evaluate(test).predictions

val wrongPredictions = resultPredictions.where(expr("label != prediction"))
val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("errors"))

val correctPredictions = resultPredictions.where(expr("label == prediction"))
val countCorrectPredictions = correctPredictions.groupBy("label").agg(count("prediction").alias("Correct"))

val TableOneS = countErrors.join(countCorrectPredictions, Seq("label"), "outer").na.fill(0,Seq("errors")).na.fill(0,Seq("Correct"))
val TableTwoS = TableOneS.withColumn("Total",(col("errors")+col("Correct")))
val TableThreeS = TableTwoS.withColumn("%-False Positive,False Negative", ((col("errors")/col("Total"))*100)).withColumn("%-True Negative,True Positive", ((col("correct")/col("Total"))*100))
TableThreeS.show

val totalS = TableThreeS.where("label == 0.0").first.getLong(1)+TableThreeS.where("label == 0.0").first.getLong(2)+ TableThreeS.where("label == 1.0").first.getLong(1)+ TableThreeS.where("label == 1.0").first.getLong(2)
val falseS = TableThreeS.where("label == 0.0").first.getLong(1)+TableThreeS.where("label == 1.0").first.getLong(1)

println("False Positive: " + TableThreeS.where("label == 0.0").first.getDouble(4) + "%")
println("False Negative: " + TableThreeS.where("label == 1.0").first.getDouble(4) + "%")
println("Mis-classification: " + ((falseS.toDouble/totalS.toDouble)*100)+ "%")

Coefficients: [-0.00857435917162859] Intercept: -0.21399430003990538
+-----+------+-------+-----+-------------------------------+-----------------------------+
|label|errors|Correct|Total|%-False Positive,False Negative|%-True Negative,True Positive|
+-----+------+-------+-----+-------------------------------+-----------------------------+
|  0.0|     0|    239|  239|                            0.0|                        100.0|
|  1.0|   143|      0|  143|                          100.0|                          0.0|
+-----+------+-------+-----+-------------------------------+-----------------------------+

False Positive: 0.0%
False Negative: 100.0%
Mis-classification: 37.43455497382199%


In [None]:
-1: False positives is expected to be non-zero.

3) Logistic on age, sex and pclass. Same as problem two but use independent variables sex, age, and pclass. Since sex and pclass are categorical they need special treatment. 

In [4]:
val reader = spark.read

val titanicFour = reader.format("csv").option("header",true).
option("delimiter", "\t").option("inferschema",true).load("titanic.tsv").
withColumnRenamed("home.dest","homeDest")

val averageAge = titanicFour.select(avg(col("age")))
val averageCalculatedAge = averageAge.first().getDouble(0)
val titanicFilledFour = titanicFour.na.fill(averageCalculatedAge,Seq("age"))

val trimmedTitanicFilledFour = titanicFilledFour.select(col("survived"),col("sex"),col("age"),col("pclass"))

val indexerS = new StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
val indexedS = indexerS.fit(trimmedTitanicFilledFour).transform(trimmedTitanicFilledFour)
//indexedS.show()

val indexerPC = new StringIndexer().setInputCol("pclass").setOutputCol("pclassIndex")
val indexedPC = indexerPC.fit(indexedS).transform(indexedS)
//indexedPC.show(150)

val Array(trainFour, testFour) = indexedPC.randomSplit(Array(0.7, 0.3))

In [5]:
val stages = new mutable.ArrayBuffer[PipelineStage]()
stages += new RFormula().setFormula("survived ~ age + sexIndex + pclassIndex")
stages += new LogisticRegression().setLabelCol("label").setFeaturesCol("features")

val pipeline = new Pipeline().setStages(stages.toArray)

val pipelineModel = pipeline.fit(trainFour)
val lorModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]

println(s"Coefficients: ${lorModel.coefficients} Intercept: ${lorModel.intercept}")

val survivalFormula = new RFormula().setFormula("survived ~ age + sexIndex + pclassIndex")
val survivalFormulaFittedRF = survivalFormula.fit(testFour)
val survivalFormulapreparedDF = survivalFormulaFittedRF.transform(testFour)

val resultPredictionsFour = lorModel.transform(survivalFormulapreparedDF)

val wrongPredictionsFour = resultPredictionsFour.where(expr("label != prediction"))
val countErrorsFour = wrongPredictionsFour.groupBy("label").agg(count("prediction").alias("errors"))
//countErrorsFour.show

val correctPredictionsFour = resultPredictionsFour.where(expr("label == prediction"))
val countCorrectPredictionsFour = correctPredictionsFour.groupBy("label").agg(count("prediction").alias("Correct"))
//countCorrectPredictionsFour.show

val TableOne = countErrorsFour.join(countCorrectPredictionsFour, Seq("label"))
val TableTwo = TableOne.withColumn("Total",(col("errors")+col("Correct"))).na.fill(0,Seq("errors")).na.fill(0,Seq("Correct"))
val TableThree = TableTwo.withColumn("%-False Positive, False Negative", ((col("errors")/col("Total"))*100)).withColumn("%-True Negative,True Positive", ((col("correct")/col("Total"))*100))
TableThree.show

val totalR = TableThree.where("label == 0.0").first.getLong(1)+TableThree.where("label == 0.0").first.getLong(2)+ TableThree.where("label == 1.0").first.getLong(1)+ TableThree.where("label == 1.0").first.getLong(2)
val falseR = TableThree.where("label == 0.0").first.getLong(1)+TableThree.where("label == 1.0").first.getLong(1)

println("False Positive: " + TableThree.where("label == 0.0").first.getDouble(4) + "%")
println("False Negative: " + TableThree.where("label == 1.0").first.getDouble(4) + "%")
println("Mis-classification: " + ((falseR.toDouble/totalR.toDouble)*100)+ "%")

Coefficients: [-0.017873882643542453,2.419825794214862,0.5971172350419962] Intercept: -1.419504110164697
+-----+------+-------+-----+--------------------------------+-----------------------------+
|label|errors|Correct|Total|%-False Positive, False Negative|%-True Negative,True Positive|
+-----+------+-------+-----+--------------------------------+-----------------------------+
|  0.0|    32|    203|  235|              13.617021276595745|            86.38297872340426|
|  1.0|    56|    108|  164|              34.146341463414636|            65.85365853658537|
+-----+------+-------+-----+--------------------------------+-----------------------------+

False Positive: 13.617021276595745%
False Negative: 34.146341463414636%
Mis-classification: 22.05513784461153%


4) Decision tree. Instead of using logistic regression use Decision tree with the independent variables sex, age, and pclass

In [6]:
val dStages = new mutable.ArrayBuffer[PipelineStage]()
dStages += new RFormula().setFormula("survived ~ age + sexIndex + pclassIndex")
dStages += new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")

val dPipeline = new Pipeline().setStages(dStages.toArray)

val dPipelineModel = dPipeline.fit(trainFour)
val dLorModel = dPipelineModel.stages.last.asInstanceOf[DecisionTreeRegressionModel]

val survivalFormula = new RFormula().setFormula("survived ~ age + sexIndex + pclassIndex")
val survivalFormulaFittedRF = survivalFormula.fit(testFour)
val survivalFormulapreparedDF = survivalFormulaFittedRF.transform(testFour)

val dResultPredictions = dLorModel.transform(survivalFormulapreparedDF)

val binarizer: Binarizer = new Binarizer().
  setInputCol("prediction").
  setOutputCol("binarized_prediction").
  setThreshold(0.5)

val predictionBinary = binarizer.transform(dResultPredictions) 

val dWrongPredictionsFour = predictionBinary.where(expr("label != binarized_prediction"))
val dCountErrorsFour = dWrongPredictionsFour.groupBy("label").agg(count("binarized_prediction").alias("errors"))
//dCountErrorsFour.show

val dCorrectPredictionsFour = predictionBinary.where(expr("label == binarized_prediction"))
val dCountCorrectPredictionsFour = dCorrectPredictionsFour.groupBy("label").agg(count("binarized_prediction").alias("Correct"))
//dCountCorrectPredictionsFour.show

val dTableOne = dCountErrorsFour.join(dCountCorrectPredictionsFour, Seq("label"), "outer").na.fill(0,Seq("errors")).na.fill(0,Seq("Correct"))
val dTableTwo = dTableOne.withColumn("Total",(col("errors")+col("Correct")))
val dTableThree = dTableTwo.withColumn("%-False Positive, False Negative", ((col("errors")/col("Total"))*100)).withColumn("%-True Negative,True Positive", ((col("correct")/col("Total"))*100))
dTableThree.show

val dtotal = dTableThree.where("label == 0.0").first.getLong(1)+dTableThree.where("label == 0.0").first.getLong(2)+ dTableThree.where("label == 1.0").first.getLong(1)+ dTableThree.where("label == 1.0").first.getLong(2)
val dfalse = dTableThree.where("label == 0.0").first.getLong(1)+dTableThree.where("label == 1.0").first.getLong(1)

println("False Positive: " + dTableThree.where("label == 0.0").first.getDouble(4) + "%")
println("False Negative: " + dTableThree.where("label == 1.0").first.getDouble(4) + "%")
println("Mis-classification: " + ((dfalse.toDouble/dtotal.toDouble)*100)+ "%")

+-----+------+-------+-----+--------------------------------+-----------------------------+
|label|errors|Correct|Total|%-False Positive, False Negative|%-True Negative,True Positive|
+-----+------+-------+-----+--------------------------------+-----------------------------+
|  0.0|    26|    209|  235|              11.063829787234042|            88.93617021276596|
|  1.0|    61|    103|  164|               37.19512195121951|            62.80487804878049|
+-----+------+-------+-----+--------------------------------+-----------------------------+

False Positive: 11.063829787234042%
False Negative: 37.19512195121951%
Mis-classification: 21.804511278195488%


5) How do the models created in problems 2-4 compare based on the false positives & false negatives the produce on your test data.

Ans 5) 
- From problem 2 it can be inferred that the logistic regression model created using age as the independent variable and survived as dependednt variable results in False Positive being 0.0% and False Negative being 100%. Thus the miss-classification rate is around 40%.
- In problem 3, the indpendent variables are age, sex and pclass for logistic regression. The model produced here is much more accurate. The misclassification (False Positive and False Negative) rate is around 20% - 25%
- In problem 4, instead of using logistic regression, using Decision tree with age, sex and pclass produces a misclassification (False Positive and False Negative) rate of around 20% - 25%.

Thus the model in problem 3 and problem 4 is more accurate than the model in problem 2