# Question 1:
Did age have any affect on the survival of the passengers? Divide the passengers into age groups spanning 5 years each - [0, 5), [5, 10), [10, 15), … . For each group compute the number of passengers in each group. Then compute the percent of survivors in each group.

In [15]:

import org.apache.spark.sql.functions._ 

//reading data from .tsv file
val reader = spark.read
reader.option("header",true).option("inferSchema",true).option("delimiter","\t")
val fileData = reader.csv("/Users/raghavnyati/Desktop/titanic.tsv")

//creating different buckets for age groups spanning 5 years
import org.apache.spark.ml.feature.Bucketizer
val splits = (0 to 20).map(_ * 5.0).toArray
val bucketizer = new Bucketizer()
bucketizer.setInputCol("age").setOutputCol("bucket").setSplits(splits)

//assigning each row to respective bucket
val bucketed = bucketizer.transform(fileData)
bucketed.select("survived", "age", "bucket").show(15)

val df1 = bucketed.filter(col("survived") === "y").groupBy("bucket").agg(count("survived").as("survived_passengers"))

val df2 = bucketed.groupBy("bucket").agg(count("survived").as("total_passengers"))

val joinedDF = df1.as('a).join(df2.as('b), col("a.bucket") === col("b.bucket")).select(col("a.bucket"),col("a.survived_passengers"),col("b.total_passengers")).withColumn("percentage", (col("a.survived_passengers")/col("b.total_passengers"))*100).sort(col("a.bucket"))
joinedDF.sort("percentage").show

val explanation: String =
    """
      |Yes! Age has affect on the survival of the passenger. As shown in the above table:
      |1) The highest percentage ~64% of survivers belongs to 0 bucket which is age ranging between [0,5) years. 
      |2) The second highest percentage ~54.8% of survivers belongs to bucket 1 which has age ranging between [5,10) years.
      |This clearly shows that children below age 10 were given priority over other passengers. 
      |Further, if we check more about the percentage of survived passengers in other buckets then we will realize
      |that there were factors also in addition to age which played a role in it. 
      |However, the role of age cannot be undermined.
      """.stripMargin

println(explanation)

+--------+----+------+
|survived| age|bucket|
+--------+----+------+
|       y|29.0|   5.0|
|       y|null|  null|
|       n| 2.0|   0.0|
|       n|30.0|   6.0|
|       n|25.0|   5.0|
|       y|48.0|   9.0|
|       y|63.0|  12.0|
|       n|39.0|   7.0|
|       y|53.0|  10.0|
|       n|71.0|  14.0|
|       n|47.0|   9.0|
|       y|18.0|   3.0|
|       y|24.0|   4.0|
|       y|26.0|   5.0|
|       y|80.0|  16.0|
+--------+----+------+
only showing top 15 rows

+------+-------------------+----------------+------------------+                
|bucket|survived_passengers|total_passengers|        percentage|
+------+-------------------+----------------+------------------+
|   8.0|                 20|              69|28.985507246376812|
|   5.0|                 56|             160|              35.0|
|  12.0|                 10|              27| 37.03703703703704|
|   4.0|                 71|             184| 38.58695652173913|
|   3.0|                 45|             116|38.793103448275865|
|

# Question 2:
For the following problems divide the data into a training set and a test set. After you have created your models in problems 2-4 compute the percent false positives and false negatives you get from your model on the test set.

Logistic on age. Using logistic regression with independent variable age and dependent variable survived create a model to classify passengers as survivors.

In [1]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType, IntegerType}
import scala.collection.mutable
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}

//creating custom schema for required data fields
val schema = new StructType(Array(
  new StructField("survived", StringType, true),
  new StructField("sex", StringType, true),
  new StructField("age", DoubleType, true)))

//reading from tsv file in custom schema 
val df = spark.read.format("csv").
                     schema(schema).
                     option("header",true).
                     option("delimiter","\t").
                     load("/Users/raghavnyati/Desktop/titanic.tsv")

//filtering data for age!=null
val data = df.filter("age is not null")

//splitting data into training (70%) and testing (30%) data
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))

//creating logistic regression model
val stages = new mutable.ArrayBuffer[PipelineStage]()
stages += new RFormula().setFormula("survived ~ age")
stages += new LogisticRegression().setLabelCol("label").setFeaturesCol("features")
val pipeline = new Pipeline().setStages(stages.toArray)

//training network
val pipelineModel = pipeline.fit(train)
val lorModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]
println(s"Weights: ${lorModel.coefficients} Intercept: ${lorModel.intercept}")

// Preparing test data
val titanicFormala = new RFormula().setFormula("survived ~ age")
val fittedRF = titanicFormala.fit(test)
val preparedDF = fittedRF.transform(test) 

val predictions = lorModel.transform(preparedDF)

//counting wrong predictions
import org.apache.spark.sql.functions._
val wrongPredictions = predictions.where(expr("label != prediction"))
println("\nShowing top 5 rows from wrong predictions: ")
wrongPredictions.show(5)

val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
countErrors.show

//counting correct predictions
val correctPredictions = predictions.where(expr("label == prediction"))
val countCorrectPredictions = correctPredictions.groupBy("label").agg(count("prediction").alias("Correct"))
countCorrectPredictions.show

//*********************************** Evaluating our model ********************************

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.linalg.DenseVector

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(predictions)
val lp = predictions.select( "label", "prediction")
val totalCount = predictions.count()
val totalCorrect = lp.filter(col("label") === col("prediction")).count()
val totalWrong = lp.filter(not(col("label") === col("prediction"))).count()
val trueNegative = lp.filter(col("prediction") === 0.0).filter(col("label") === col("prediction")).count().toFloat
val truePositive = lp.filter(col("prediction") === 1.0).filter(col("label") === col("prediction")).count().toFloat
val falseNegative = lp.filter(col("prediction") === 0.0).filter(not(col("label") === col("prediction"))).count().toFloat
val falsePositive = lp.filter(col("prediction") === 1.0).filter(not(col("label") === col("prediction"))).count().toFloat
val ratioWrong = totalWrong.toDouble/totalCount.toDouble
val ratioCorrect = totalCorrect.toDouble/totalCount.toDouble

println("Accuracy: " + accuracy)
println("Total count = " + totalCount)
println("Total Correct = " + totalCorrect)
println("Total Wrong = " + totalWrong)
println("True Negative = " + trueNegative)
println("True Positive = " + truePositive)
println("False Negative = " + falseNegative)
println("False Positive = " + falsePositive)
println("Ratio Wrong = " + ratioWrong)
println("Ratio Correct = " + ratioCorrect)

val  predictionAndLabels =predictions.select("rawPrediction", "label").rdd.map(x => (x(0).asInstanceOf[DenseVector](1), x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under the precision-recall curve: " + metrics.areaUnderPR)
println("Accuracy or area under ROC curve : " + metrics.areaUnderROC)

println("")
printf(s"""
          |+----------------- Confusion matrix ----------------------+
          ||                | %-15s      |     %-15s                 
          |+----------------+------------+---------------------------+
          || Predicted = 0  | %-15f      |     %-15f                
          || Predicted = 1  | %-15f      |     %-15f                
          |+----------------+------------+---------------------------+
         """.stripMargin, "Actual = 0", "Actual = 1", trueNegative, falsePositive, falseNegative, truePositive)

Weights: [-0.0026351308410778903] Intercept: -0.32515461615182006

Showing top 5 rows from wrong predictions: 
+--------+------+------+--------+-----+--------------------+--------------------+----------+
|survived|   sex|   age|features|label|       rawPrediction|         probability|prediction|
+--------+------+------+--------+-----+--------------------+--------------------+----------+
|       y|female|0.1667|[0.1667]|  1.0|[0.32559389246302...|[0.58068691839705...|       0.0|
|       y|female|   1.0|   [1.0]|  1.0|[0.32778974699289...|[0.58122149125606...|       0.0|
|       y|female|   1.0|   [1.0]|  1.0|[0.32778974699289...|[0.58122149125606...|       0.0|
|       y|female|   2.0|   [2.0]|  1.0|[0.33042487783397...|[0.58186275257119...|       0.0|
|       y|female|   5.0|   [5.0]|  1.0|[0.33833027035720...|[0.58378486871137...|       0.0|
+--------+------+------+--------+-----+--------------------+--------------------+----------+
only showing top 5 rows

+-----+------+
|label|Error

# Question 3: 
Logistic on age, sex and pclass. Same as problem two but use independent variables sex, age, and pclass. Since sex and pclass are categorical they need special treatment. 

In [2]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType, IntegerType}
import scala.collection.mutable
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}

//creating custom schema for required data fields
val schema = new StructType(Array(
  new StructField("survived", StringType, true),
  new StructField("sex", StringType, true),
  new StructField("age", DoubleType, true),
  new StructField("pclass", StringType, true)))

//reading from tsv file in custom schema 
val df = spark.read.format("csv").
                     schema(schema).
                     option("header",true).
                     option("delimiter","\t").
                     load("/Users/raghavnyati/Desktop/titanic.tsv")

//filtering data for age!=null
val data = df.filter("age is not null")

//scaling categorical variables - sex and pclass
import org.apache.spark.ml.feature.StringIndexer

val sex_indexer: StringIndexer = new StringIndexer().
  setInputCol("sex").
  setOutputCol("indexed_sex")

val pclass_indexer = new StringIndexer().
    setInputCol("pclass").
    setOutputCol("indexed_pclass")

val nDataFrame = sex_indexer.fit(data).transform(data)
nDataFrame.show(10)

val newDf = pclass_indexer.fit(nDataFrame).transform(nDataFrame) 

//splitting data into training (70%) and testing (30%) data
val Array(train, test) = newDf.randomSplit(Array(0.7, 0.3))

//creating logistic regression model
val stages = new mutable.ArrayBuffer[PipelineStage]()
stages += new RFormula().setFormula("survived ~ age + indexed_sex + indexed_pclass")
stages += new LogisticRegression().setLabelCol("label").setFeaturesCol("features")
val pipeline = new Pipeline().setStages(stages.toArray)

//training data
val pipelineModel = pipeline.fit(train)
val lorModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]
println(s"Weights: ${lorModel.coefficients} Intercept: ${lorModel.intercept}")

// Preparing test data
val titanicFormala = new RFormula().setFormula("survived ~ age + indexed_sex + indexed_pclass")
val fittedRF = titanicFormala.fit(test)
val preparedDF = fittedRF.transform(test) 

val predictions = lorModel.transform(preparedDF)

//counting wrong predictions
import org.apache.spark.sql.functions._
val wrongPredictions = predictions.where(expr("label != prediction"))
println("\nShowing top 5 rows from wrong predictions: ")
wrongPredictions.show(5)

val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
countErrors.show

//counting correct predictions
val correctPredictions = predictions.where(expr("label == prediction"))
val countCorrectPredictions = correctPredictions.groupBy("label").agg(count("prediction").alias("Correct"))
countCorrectPredictions.show

//****************************Evaluating our model********************************

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.linalg.DenseVector

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(predictions)
val lp = predictions.select( "label", "prediction")
val totalCount = predictions.count()
val totalCorrect = lp.filter(col("label") === col("prediction")).count()
val totalWrong = lp.filter(not(col("label") === col("prediction"))).count()
val trueNegative = lp.filter(col("prediction") === 0.0).filter(col("label") === col("prediction")).count().toFloat
val truePositive = lp.filter(col("prediction") === 1.0).filter(col("label") === col("prediction")).count().toFloat
val falseNegative = lp.filter(col("prediction") === 0.0).filter(not(col("label") === col("prediction"))).count().toFloat
val falsePositive = lp.filter(col("prediction") === 1.0).filter(not(col("label") === col("prediction"))).count().toFloat
val ratioWrong = totalWrong.toDouble/totalCount.toDouble
val ratioCorrect = totalCorrect.toDouble/totalCount.toDouble

println("Accuracy: " + accuracy)
println("Total count = " + totalCount)
println("Total Correct = " + totalCorrect)
println("Total Wrong = " + totalWrong)
println("True Negative = " + trueNegative)
println("True Positive = " + truePositive)
println("False Negative = " + falseNegative)
println("False Positive = " + falsePositive)
println("Ratio Wrong = " + ratioWrong)
println("Ratio Correct = " + ratioCorrect)

val  predictionAndLabels =predictions.select("rawPrediction", "label").rdd.map(x => (x(0).asInstanceOf[DenseVector](1), x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under the precision-recall curve: " + metrics.areaUnderPR)
println("Accuracy or area under ROC curve : " + metrics.areaUnderROC)

println("")
printf(s"""
          |+----------------- Confusion matrix ----------------------+
          ||                | %-15s      |     %-15s                 
          |+----------------+------------+---------------------------+
          || Predicted = 0  | %-15f      |     %-15f                
          || Predicted = 1  | %-15f      |     %-15f                
          |+----------------+------------+---------------------------+
         """.stripMargin, "Actual = 0", "Actual = 1", trueNegative, falsePositive, falseNegative, truePositive)


+--------+------+----+------+-----------+
|survived|   sex| age|pclass|indexed_sex|
+--------+------+----+------+-----------+
|       y|female|29.0| first|        1.0|
|       n|female| 2.0| first|        1.0|
|       n|  male|30.0| first|        0.0|
|       n|female|25.0| first|        1.0|
|       y|  male|48.0| first|        0.0|
|       y|female|63.0| first|        1.0|
|       n|  male|39.0| first|        0.0|
|       y|female|53.0| first|        1.0|
|       n|  male|71.0| first|        0.0|
|       n|  male|47.0| first|        0.0|
+--------+------+----+------+-----------+
only showing top 10 rows

Weights: [-0.011606839911331652,2.4536931565469082,0.5947070768342849] Intercept: -1.5026982336399672

Showing top 5 rows from wrong predictions: 
+--------+------+---+------+-----------+--------------+-------------+-----+--------------------+--------------------+----------+
|survived|   sex|age|pclass|indexed_sex|indexed_pclass|     features|label|       rawPrediction|         proba

# Question 4: 
Decision tree. Instead of using logistic regression use Decision tree with the independent variables sex, age, and pclass. 

In [4]:
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType, IntegerType}
import org.apache.spark.ml.feature.RFormula

newDf.show(5)

val supervised = new RFormula().setFormula("survived ~ age + indexed_sex + indexed_pclass")

val fittedRF = supervised.fit(newDf)
val preparedDF = fittedRF.transform(newDf)
val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))

val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")

val model = dt.fit(train)

val predictions = model.transform(test)
predictions.show(5)

import org.apache.spark.ml.feature.Binarizer

val binarizer: Binarizer = new Binarizer().
  setInputCol("prediction").
  setOutputCol("binarized_prediction").
  setThreshold(0.5)

val predictionBinary = binarizer.transform(predictions) 
predictionBinary.show(5)

import org.apache.spark.sql.functions._
val wrongPredictions = predictionBinary.where(expr("label != binarized_prediction"))
wrongPredictions.show

val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
countErrors.show

val correctPredictions = predictionBinary.where(expr("label == binarized_prediction"))
val countCorrectPredictions = correctPredictions.groupBy("label").agg(count("prediction").alias("Correct"))
countCorrectPredictions.show

//***********************************************************************************


import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction").setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(predictions)
val lp = predictions.select( "label", "prediction")
val totalCount = predictions.count()
val totalCorrect = lp.filter(col("label") === col("prediction")).count()
val totalWrong = lp.filter(not(col("label") === col("prediction"))).count()
val trueNegative = lp.filter(col("prediction") === 0.0).filter(col("label") === col("prediction")).count().toFloat
val truePositive = lp.filter(col("prediction") === 1.0).filter(col("label") === col("prediction")).count().toFloat
val falseNegative = lp.filter(col("prediction") === 0.0).filter(not(col("label") === col("prediction"))).count().toFloat
val falsePositive = lp.filter(col("prediction") === 1.0).filter(not(col("label") === col("prediction"))).count().toFloat
val ratioWrong = totalWrong.toDouble/totalCount.toDouble
val ratioCorrect = totalCorrect.toDouble/totalCount.toDouble

println("Accuracy: " + accuracy)
println("Total count = " + totalCount)
println("Total Correct = " + totalCorrect)
println("Total Wrong = " + totalWrong)
println("True Negative = " + trueNegative)
println("True Positive = " + truePositive)
println("False Negative = " + falseNegative)
println("False Positive = " + falsePositive)
println("Ratio Wrong = " + ratioWrong)
println("Ratio Correct = " + ratioCorrect)

println("")
printf(s"""
          |+----------------- Confusion matrix ----------------------+
          ||                | %-15s      |     %-15s                 
          |+----------------+------------+---------------------------+
          || Predicted = 0  | %-15f      |     %-15f                
          || Predicted = 1  | %-15f      |     %-15f                
          |+----------------+------------+---------------------------+
         """.stripMargin, "Actual = 0", "Actual = 1", trueNegative, falsePositive, falseNegative, truePositive)

+--------+------+----+------+-----------+--------------+
|survived|   sex| age|pclass|indexed_sex|indexed_pclass|
+--------+------+----+------+-----------+--------------+
|       y|female|29.0| first|        1.0|           1.0|
|       n|female| 2.0| first|        1.0|           1.0|
|       n|  male|30.0| first|        0.0|           1.0|
|       n|female|25.0| first|        1.0|           1.0|
|       y|  male|48.0| first|        0.0|           1.0|
+--------+------+----+------+-----------+--------------+
only showing top 5 rows

+--------+------+----+------+-----------+--------------+--------------+-----+----------+
|survived|   sex| age|pclass|indexed_sex|indexed_pclass|      features|label|prediction|
+--------+------+----+------+-----------+--------------+--------------+-----+----------+
|       n|female| 9.0| third|        1.0|           0.0| [9.0,1.0,0.0]|  0.0|       0.0|
|       n|female| 9.0| third|        1.0|           0.0| [9.0,1.0,0.0]|  0.0|       0.0|
|       n|female|

# Question 5:
How do the models created in problems 2-4 compare based on the false positives & false negatives the produce on your test data.

# Solution:
In question 2: 
We got the confusion matrix like this: 

+----------------- Confusion matrix ----------------------+                                                           
 ++++++++++++ | Actual = 0           |     Actual = 1                                                               
+---------------+------------+---------------------------+                                                           
 Predicted = 0  | 173.000000           |     0.000000                       
 Predicted = 1  | 128.000000           |     0.000000                       
+----------------+------------+---------------------------+

In question 3, we got the confusion matrix as below: 

+----------------- Confusion matrix ----------------------+                                                           
  ++++++++++++| Actual = 0           |     Actual = 1                                                               
+----------------+------------+---------------------------+                                                           
 Predicted = 0  | 164.000000           |     28.000000                                                                 
 Predicted = 1  | 42.000000            |     90.000000                                                                 
+----------------+------------+---------------------------+                                                           

In question 4, we got the confusion matrix as below: 

+----------------- Confusion matrix ----------------------+                                                           
 ++++++++++++  | Actual = 0           |     Actual = 1                                                              
+----------------+------------+---------------------------+                                                           
 Predicted = 0  | 21.000000            |     1.000000                                                                 
 Predicted = 1  | 4.000000             |     28.000000                                                                
+----------------+------------+---------------------------+                                                           

Now,on comparing the values for all three models, I found the follwoing: 
1) Got the highest accuracy in question 4 model using decision tree.
2) The lowest false positive is in model for question 2 where we had one independent and one dependent variable.
   Also, the value for false Positive for model with decision tree in question 4 is less than the the model in
   question 3.
3) The value for False Negative is lowest in the case of question 4 model with decision trees. If we talk about
   False Negative values than Model4 is thes best followed by model 3 which is better than model 2.
   
   On comparing all three models in question 2-4, I found the model in question 4 is the best with highest accuracy of 86.58%. Model 3 is the second best here with accuracy of 80.45% and model 2 has accuracy of 58.04%. Therefore, it is apparent that number of independent variables play a vital role in training a network. As in question 2, we had age as only independent variable while in question 3 we had pclass and sex in addition to age as an independent variables. 
   
   Note- Model 4is decision tree mdel trained in question 4. While model 2 referes to the Logistic Regression model trained in question 2 and model 3 refers to the Logistic Regression model trained in question 3. 
   