In [1]:
sc

org.apache.spark.SparkContext@6c66fe41

In [10]:
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.functions._

    //Getting data from tsv file
      println("Start")
    
    val titanicDF = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).option("delimiter", "\t").option("mode", "DROPMALFORMED").csv("/home/shahyash1993/titanic.tsv")

    val notNullVal = titanicDF.where(col("age") > 0).count()
    val totalVal = titanicDF.count()

    println(">>Count: " + notNullVal)
    println(">>Total Count: " + totalVal)
    println(">>Diff: " + (totalVal - notNullVal))

    //Dropping Null values
    val droppedNullAgeDF = titanicDF.na.drop("any", Seq("age"))
    println(">>After dropping: " + droppedNullAgeDF.count())

    //Logic to create age groups
    val modifiedDroppedNullAgeDF = droppedNullAgeDF.orderBy(col("age")).withColumn("ageGroup", floor(col("age") / 5) * 5)

    val countY = count(when(modifiedDroppedNullAgeDF("survived".toString()) === "y", true))

    //finding survival percentage
    val finalDF = modifiedDroppedNullAgeDF.groupBy(col("ageGroup")).agg(countY / count("ageGroup") * 100 as "Survival (%)")

    finalDF.orderBy("ageGroup").show()

    println(">>End")


Start
>>Count: 1045
>>Total Count: 1309
>>Diff: 264
>>After dropping: 1045
+--------+------------------+
|ageGroup|      Survival (%)|
+--------+------------------+
|       0|              64.0|
|       5| 54.83870967741935|
|      10| 40.74074074074074|
|      15|38.793103448275865|
|      20| 38.58695652173913|
|      25|              35.0|
|      30|40.909090909090914|
|      35|              44.0|
|      40|28.985507246376812|
|      45|48.484848484848484|
|      50|48.837209302325576|
|      55| 40.74074074074074|
|      60| 37.03703703703704|
|      65|               0.0|
|      70|               0.0|
|      75|             100.0|
|      80|             100.0|
+--------+------------------+

>>End


In [11]:
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

    //Getting data from  tsv filess
    val titanicDF = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).option("delimiter","\t").option("mode","DROPMALFORMED").csv("/home/shahyash1993/titanic.tsv")

    //Dropping null values
    val droppedNullAgeDF = titanicDF.na.drop("any",Seq("age"))
    println(">>After dropping: "+droppedNullAgeDF.count())

    //Renaming home.dest because of the unusual error
    val newTitanicDF = droppedNullAgeDF.withColumnRenamed("home.dest","home")

    //<<Machine Learning Stuff>>>
    import org.apache.spark.ml.feature.RFormula

    //Generating formula to predict survival based on the age
    val examFormula = new RFormula().setFormula("survived ~ age").setFeaturesCol("features").setLabelCol("label")

    //Splitting the data into Trainset and Testsets
    val fittedRF = examFormula.fit(newTitanicDF)
    val preparedDF = fittedRF.transform(newTitanicDF)
    val Array(trainDF, testDF) = preparedDF.randomSplit(Array(0.9, 0.1))
    trainDF.show

    println("total: "+newTitanicDF.count())
    println("train: "+trainDF.count())
    println("test: "+testDF.count())

    //fitting the  model
    import org.apache.spark.ml.classification.LogisticRegression
    val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("label")
    val lrModel = lr.fit(trainDF)

    val trainedModelDF = lrModel.evaluate(testDF).predictions
    trainedModelDF.show()

    //Finding out the wrong predictions
    val wrongPredictions = trainedModelDF.where(expr("label != prediction"))
    val incorrectPredicition = wrongPredictions.count()
    println("Incorrect Predictons: "+incorrectPredicition)

    //finding errors
    val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
    countErrors.show

    //Finding out TP, TN, FP, FN
    val truePositives = trainedModelDF.where(expr("label == 1 AND prediction == 1")).count()
    println("TP: "+truePositives)

    val trueNegatives= trainedModelDF.where(expr("label == 0 AND prediction == 0")).count()
    println("TN: "+trueNegatives)

    val falsePositive= trainedModelDF.where(expr("label == 0 AND prediction == 1")).count()
    println("FP: "+falsePositive)

    val falseNegative= trainedModelDF.where(expr("label == 1 AND prediction == 0")).count()
    println("FN: "+falseNegative)

    val totalCount= trainedModelDF.count()
    println("total: "+totalCount)

    val percentFP:Double = falsePositive*100/totalCount
    val percentFN:Double = falseNegative*100/totalCount

    println("%FP=" + percentFP+"%")
    println("%FN=" + percentFN+"%")


>>After dropping: 1045
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+--------+-----+
|survived|   sex| age|pclass|                name|sibsp|parch|            ticket|   fare|  cabin|embarked|boat|body|                home|features|label|
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+--------+-----+
|       n|female| 2.0| first|Allison, Miss. He...|    1|    2|            113781| 151.55|C22 C26|       S|null|null|Montreal, PQ / Ch...|   [2.0]|  0.0|
|       n|female| 2.0| third|Andersson, Miss. ...|    4|    2|            347082| 31.275|   null|       S|null|null| Sweden Winnipeg, MN|   [2.0]|  0.0|
|       n|female| 2.0| third|Skoog, Miss. Marg...|    3|    2|            347088|   27.9|   null|       S|null|null|                null|   [2.0]|  0.0|
|       n|female| 2.0| third|Strom, Miss. Telm...|    0|   

Incorrect Predictons: 48
+-----+------+
|label|Errors|
+-----+------+
|  1.0|    48|
+-----+------+

TP: 0
TN: 52
FP: 0
FN: 48
total: 100
%FP=0.0%
%FN=48.0%


In [13]:
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

    //Getting data from tsv file
    val titanicDF = spark.read.format("org.apache.spark.csv").option("header", true).option("inferSchema", true).option("delimiter","\t").option("mode","DROPMALFORMED").csv("/home/shahyash1993/titanic.tsv")

    //Dropping null values from three cols
    var droppedNullAgeDF = titanicDF.na.drop("any",Seq("age","sex","pclass"))
    println(">>After dropping: "+droppedNullAgeDF.count())

    //Renaming the home.dest to home due to unusual error
    val newTitanicDF = droppedNullAgeDF.withColumnRenamed("home.dest","home")
    println("Total data we are playing with: "+newTitanicDF.count())

    //<<<Machine Learning stuff>>>
    import org.apache.spark.ml.feature.RFormula

    //Formula to predict survival based on age, sex and pclass
    val examFormula = new RFormula().setFormula("survived ~ age + sex + pclass").setFeaturesCol("features").setLabelCol("label")

    //Splitting dataset into trainset and testset
    val fittedRF = examFormula.fit(newTitanicDF)
    val preparedDF = fittedRF.transform(newTitanicDF)
    val Array(trainDF, testDF) = preparedDF.randomSplit(Array(0.9, 0.1))
    trainDF.show

    println("total: "+newTitanicDF.count())
    println("train: "+trainDF.count())
    println("test: "+testDF.count())

    //fitting the  model
    import org.apache.spark.ml.classification.LogisticRegression
    val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("label")
    val lrModel = lr.fit(trainDF)

    val trainedModelDF = lrModel.evaluate(testDF).predictions
    trainedModelDF.show()

    //Finding wrong predictions
    val wrongPredictions = trainedModelDF.where(expr("label != prediction"))
    val incorrectPredicition = wrongPredictions.count()
    println("Incorrect Predictons: "+incorrectPredicition)

    val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
    countErrors.show

    //Finding TP, TN, FP, FN
    val truePositives = trainedModelDF.where(expr("label == 1 AND prediction == 1")).count()
    println("TP: "+truePositives)

    val trueNegatives= trainedModelDF.where(expr("label == 0 AND prediction == 0")).count()
    println("TN: "+trueNegatives)

    val falsePositive= trainedModelDF.where(expr("label == 0 AND prediction == 1")).count()
    println("FP: "+falsePositive)

    val falseNegative= trainedModelDF.where(expr("label == 1 AND prediction == 0")).count()
    println("FN: "+falseNegative)

    val totalCount= trainedModelDF.count()
    println("total: "+totalCount)

    val percentFP:Double = falsePositive*100/totalCount
    val percentFN:Double = falseNegative*100/totalCount

    println("%FP=" + percentFP+"%")
    println("%FN=" + percentFN+"%")

>>After dropping: 1045
Total data we are playing with: 1045
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+------------------+-----+
|survived|   sex| age|pclass|                name|sibsp|parch|            ticket|   fare|  cabin|embarked|boat|body|                home|          features|label|
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+------------------+-----+
|       n|female| 1.0| third|Klasen, Miss. Ger...|    1|    1|            350405|12.1833|   null|       S|null|null|                null| [1.0,0.0,1.0,0.0]|  0.0|
|       n|female| 2.0| first|Allison, Miss. He...|    1|    2|            113781| 151.55|C22 C26|       S|null|null|Montreal, PQ / Ch...| [2.0,0.0,0.0,1.0]|  0.0|
|       n|female| 2.0| third|Andersson, Miss. ...|    4|    2|            347082| 31.275|   null|       S|null|null| Sweden W

Incorrect Predictons: 29
+-----+------+                                                                  
|label|Errors|
+-----+------+
|  0.0|    13|
|  1.0|    16|
+-----+------+

TP: 34
TN: 52
FP: 13
FN: 16
total: 115
%FP=11.0%
%FN=13.0%


In [16]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructField, StructType, StringType, DoubleType, IntegerType}
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.functions._

    val schema = new StructType(Array(
      new StructField("survived", StringType, true),
      new StructField("sex", StringType, true),
      new StructField("age", DoubleType, true),
      new StructField("pclass", StringType, true),
      new StructField("name", StringType, true),
      new StructField("sibsp", IntegerType, true),
      new StructField("parch", IntegerType, true),
      new StructField("ticket", StringType, true),
      new StructField("fare", DoubleType, true),
      new StructField("cabin", StringType, true),
      new StructField("embarked", StringType, true),
      new StructField("boat", StringType, true),
      new StructField("body", IntegerType, true),
      new StructField("home", StringType, true)
    ))

    //Getting data from tsv file
    val data = spark.read.format("csv").schema(schema).option("header", true).option("delimiter","\t").load("/home/shahyash1993/titanic.tsv")

    //Dropping null values from three cols
    val droppedNullAgeDF = data.na.drop("any",Seq("age","sex","pclass"))
    println(">>After dropping: "+droppedNullAgeDF.count())
    println("Data:"+data.count())

    //Formula to predict survival based on age, sex and pclass
    val supervised = new RFormula().setFormula("survived ~ age + sex + pclass")
    val Array(train, test) = supervised.fit(data).transform(droppedNullAgeDF).randomSplit(Array(0.9, 0.1))
    train.show

    //2
    val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")
    val model = dt.fit(train)

    //predictions
    val predictions = model.transform(test)
    predictions.show

    //3
    val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")

    val rmse = evaluator.evaluate(predictions)
    println("Root Mean Squared Error (RMSE) on test data = " + rmse)

    println("Learned regression tree model:\n" + model.toDebugString)

    //4
    val binarizer: Binarizer = new Binarizer().setInputCol("prediction").setOutputCol("binarized_prediction").setThreshold(0.5)

    val predictionBinary = binarizer.transform(predictions)
    predictionBinary.show

    //5
    val wrongPredictions = predictionBinary.where(expr("label != binarized_prediction"))
    wrongPredictions.show

    //6
    val countErrors = wrongPredictions.groupBy("label").agg(count("prediction").alias("Errors"))
    countErrors.show

    //7
    val correctPredictions = predictionBinary.where(expr("label == binarized_prediction"))
    val countCorrectPredictions = correctPredictions.groupBy("label").agg(count("prediction").alias("Correct"))
    countCorrectPredictions.show

    //Finding TP, TN, FP, FN
    val truePositives = predictionBinary.where(expr("label == 1 AND prediction == 1")).count()
    println("TP: "+truePositives)

    val trueNegatives= predictionBinary.where(expr("label == 0 AND prediction == 0")).count()
    println("TN: "+trueNegatives)

    val falsePositive= predictionBinary.where(expr("label == 0 AND prediction == 1")).count()
    println("FP: "+falsePositive)

    val falseNegative= predictionBinary.where(expr("label == 1 AND prediction == 0")).count()
    println("FN: "+falseNegative)

    val totalCount= predictionBinary.count()
    println("total: "+totalCount)

    val percentFP:Double = falsePositive*100/totalCount
    val percentFN:Double = falseNegative*100/totalCount

    println("%FP=" + percentFP+"%")
    println("%FN=" + percentFN+"%")

>>After dropping: 1045
Data:1309
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+------------------+-----+
|survived|   sex| age|pclass|                name|sibsp|parch|            ticket|   fare|  cabin|embarked|boat|body|                home|          features|label|
+--------+------+----+------+--------------------+-----+-----+------------------+-------+-------+--------+----+----+--------------------+------------------+-----+
|       n|female| 1.0| third|Klasen, Miss. Ger...|    1|    1|            350405|12.1833|   null|       S|null|null|                null| [1.0,0.0,1.0,0.0]|  0.0|
|       n|female| 2.0| first|Allison, Miss. He...|    1|    2|            113781| 151.55|C22 C26|       S|null|null|Montreal, PQ / Ch...| [2.0,0.0,0.0,1.0]|  0.0|
|       n|female| 2.0| third|Andersson, Miss. ...|    4|    2|            347082| 31.275|   null|       S|null|null| Sweden Winnipeg, MN| [2.0,0.0,1.0,0

Learned regression tree model:
DecisionTreeRegressionModel (uid=dtr_098c1b6c0c28) of depth 5 with 47 nodes
  If (feature 1 in {1.0})
   If (feature 0 <= 7.0)
    If (feature 2 in {1.0})
     If (feature 0 <= 2.0)
      Predict: 0.3
     Else (feature 0 > 2.0)
      Predict: 0.36363636363636365
    Else (feature 2 not in {1.0})
     Predict: 1.0
   Else (feature 0 > 7.0)
    If (feature 3 in {0.0})
     If (feature 0 <= 32.0)
      If (feature 0 <= 28.5)
       Predict: 0.13934426229508196
      Else (feature 0 > 28.5)
       Predict: 0.2631578947368421
     Else (feature 0 > 32.0)
      If (feature 0 <= 36.0)
       Predict: 0.0
      Else (feature 0 > 36.0)
       Predict: 0.08139534883720931
    Else (feature 3 not in {0.0})
     If (feature 0 <= 36.0)
      If (feature 0 <= 33.0)
       Predict: 0.42105263157894735
      Else (feature 0 > 33.0)
       Predict: 0.8
     Else (feature 0 > 36.0)
      If (feature 0 <= 54.0)
       Predict: 0.3090909090909091
      Else (feature 0 > 54.

+-----+------+
|label|Errors|
+-----+------+
|  0.0|    13|
|  1.0|    18|
+-----+------+

+-----+-------+                                                                 
|label|Correct|
+-----+-------+
|  0.0|     56|
|  1.0|     28|
+-----+-------+

TP: 2
TN: 4
FP: 1
FN: 1
total: 115
%FP=0.0%
%FN=0.0%


In [15]:
object Problem_45 {
/*
*
*Problem	TrainSet Size (%)	TestSet Size (%)		FP (%)	FN (%)
      2	                10                90	      0	      41
	                    50	              50	      1	      36
	                    90	              10	      0     	37
      3	                10	              90	      10    	11
	                    50	              50	      9     	12
	                    90	              10	      7     	9
      4	                10	              90	      7     	1
	                    50	              50	      0     	1
	                    90	              10	      0     	0


So based on the tests, It is quite obvious that by using just the age to predict the survival is very much meaningless,
Whether by using age, pclass and sex prediction of survival gives better results.
But, using the decision trees is the best way to predict the survival based on the same three parameters.
*
* */
}