What do all the numbers tell you?
89/90

In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.regression.DecisionTreeRegressor


In [2]:
val mifemdata = spark.read.format("csv").option("header",true).option("inferschema",true).option("sep",",").load("mifem.csv")
val mifemCleandata = mifemdata.na.drop
mifemCleandata.show()
mifemCleandata.printSchema()
val mifemDataset = mifemCleandata.select("outcome","age","yronset","premi","smstat","diabetes","highbp","hichol","angina","stroke")


+---+-------+---+-------+-----+------+--------+------+------+------+------+
|_c0|outcome|age|yronset|premi|smstat|diabetes|highbp|hichol|angina|stroke|
+---+-------+---+-------+-----+------+--------+------+------+------+------+
|  1|   live| 63|     85|    n|     x|       n|     y|     y|     n|     n|
|  6|   live| 55|     85|    n|     c|       n|     y|     y|     n|     n|
|  8|   live| 68|     85|    y|    nk|      nk|     y|    nk|     y|     n|
| 10|   live| 64|     85|    n|     x|       n|     y|     n|     y|     n|
| 11|   dead| 67|     85|    n|    nk|      nk|    nk|    nk|    nk|    nk|
| 15|   live| 66|     85|    n|     x|      nk|    nk|    nk|    nk|    nk|
| 21|   live| 63|     85|    n|     n|       n|     y|     n|     n|     n|
| 22|   dead| 68|     85|    y|     n|       n|     y|     y|     y|     y|
| 23|   dead| 46|     85|    n|     c|       n|     y|    nk|    nk|     n|
| 28|   dead| 66|     85|    y|     c|       n|     y|     n|     n|     y|
| 36|   dead

In [3]:
val categoryColNames = List("premi","smstat","diabetes","highbp","hichol","angina","stroke")
    val stringIndexersFeatures = categoryColNames.map { columnName =>
      new StringIndexer()
        .setInputCol(columnName)
        .setOutputCol(columnName + "Indexed")
        .fit(mifemDataset)
}




In [4]:
val outcomeIndexer = new StringIndexer().setInputCol("outcome").setOutputCol("outcomeIndexed").fit(mifemDataset)
var catColNameIndexed = new ListBuffer[String]()
catColNameIndexed = categoryColNames.map(_ + "Indexed").to[ListBuffer]
catColNameIndexed += "age"
catColNameIndexed += "yronset"


ListBuffer(premiIndexed, smstatIndexed, diabetesIndexed, highbpIndexed, hicholIndexed, anginaIndexed, strokeIndexed, age, yronset)

In [5]:
val assembler = new VectorAssembler().setInputCols(Array(catColNameIndexed: _*)).setOutputCol("Features")
val pipelineMifem = new Pipeline().setStages(Array(stringIndexersFeatures: _*)++ Array(outcomeIndexer,assembler))
val indexedData = pipelineMifem.fit(mifemDataset).transform(mifemDataset)
val outcomeFeatureData = indexedData.select("outcome","outcomeIndexed","Features")
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(outcomeIndexer.labels)
outcomeFeatureData.show()

+-------+--------------+--------------------+
|outcome|outcomeIndexed|            Features|
+-------+--------------+--------------------+
|   live|           0.0|(9,[1,4,7,8],[2.0...|
|   live|           0.0|(9,[1,4,7,8],[1.0...|
|   live|           0.0|[1.0,3.0,2.0,0.0,...|
|   live|           0.0|(9,[1,5,7,8],[2.0...|
|   dead|           1.0|[0.0,3.0,2.0,2.0,...|
|   live|           0.0|[0.0,2.0,2.0,2.0,...|
|   live|           0.0|(9,[7,8],[63.0,85...|
|   dead|           1.0|[1.0,0.0,0.0,0.0,...|
|   dead|           1.0|[0.0,1.0,0.0,0.0,...|
|   dead|           1.0|[1.0,1.0,0.0,0.0,...|
|   dead|           1.0|(9,[1,7,8],[1.0,5...|
|   live|           0.0|(9,[4,7,8],[1.0,6...|
|   live|           0.0|[0.0,1.0,0.0,1.0,...|
|   live|           0.0|(9,[4,5,7,8],[1.0...|
|   dead|           1.0|[0.0,2.0,0.0,1.0,...|
|   live|           0.0|(9,[3,5,7,8],[1.0...|
|   dead|           1.0|(9,[3,7,8],[1.0,6...|
|   dead|           1.0|[2.0,0.0,1.0,0.0,...|
|   live|           0.0|[1.0,2.0,0

In [6]:
 val corr1 = indexedData.stat.corr("outcomeIndexed","age")
    var correlation = "age : " + corr1 + "\n"
    val corr2 = indexedData.stat.corr("outcomeIndexed","yronset")
    correlation += "yronset : " + corr2 + "\n"
    val corr3 = indexedData.stat.corr("outcomeIndexed","premiIndexed")
    correlation += "premiIndexed : " + corr3 + "\n"
    val corr4 = indexedData.stat.corr("outcomeIndexed","smstatIndexed")
    correlation += "smstatIndexed : " + corr4 + "\n"
    val corr5 = indexedData.stat.corr("outcomeIndexed","diabetesIndexed")
    correlation += "diabetesIndexed : " + corr5 + "\n"
    val corr6 = indexedData.stat.corr("outcomeIndexed","highbpIndexed")
    correlation += "highbpIndexed : " + corr6 + "\n"
    val corr7 = indexedData.stat.corr("outcomeIndexed","hicholIndexed")
    correlation += "hicholIndexed : " + corr7 + "\n"
    val corr8 = indexedData.stat.corr("outcomeIndexed","anginaIndexed")
    correlation += "anginaIndexe : " + corr8 + "\n"
    val corr9 = indexedData.stat.corr("outcomeIndexed","strokeIndexed")
    correlation += "strokeIndexed : " + corr9 + "\n"

    print(correlation)  
    val printing = sc.parallelize(Seq(correlation))
    printing.saveAsTextFile("correlation")

age : 0.12576387152792115
yronset : -0.08693864419542775
premiIndexed : 0.24278346151896188
smstatIndexed : 0.1809504994879566
diabetesIndexed : 0.3129038101726261
highbpIndexed : 0.22478366832372196
hicholIndexed : 0.16789778400628724
anginaIndexe : 0.32674068022275105
strokeIndexed : 0.36918698354044277


In [7]:
val randomForestModel = new RandomForestClassifier().setLabelCol("outcomeIndexed").setFeaturesCol("Features")
val logisticRegressionModel = new LogisticRegression().setLabelCol("outcomeIndexed").setFeaturesCol("Features")
val linearSVMModel = new LinearSVC().setMaxIter(10).setRegParam(0.1).setLabelCol("outcomeIndexed").setFeaturesCol("Features")
val NaiveBayesModel = new NaiveBayes().setLabelCol("outcomeIndexed").setFeaturesCol("Features")
val gbtModel = new GBTClassifier().setMaxIter(10).setLabelCol("outcomeIndexed").setFeaturesCol("Features")
val decisiontreeModel = new DecisionTreeClassifier().setLabelCol("outcomeIndexed").setFeaturesCol("Features")


In [8]:
val pipelinerandomForest = new Pipeline().setStages(Array(randomForestModel,labelConverter))
val pipelinelogisticRegression = new Pipeline().setStages(Array(logisticRegressionModel,labelConverter))
val pipelinelinearSVM = new Pipeline().setStages(Array(linearSVMModel,labelConverter))
val pipelineNaiveBayes = new Pipeline().setStages(Array(NaiveBayesModel,labelConverter))
val pipelineGBT = new Pipeline().setStages(Array(gbtModel,labelConverter))
val pipelinedecisionTree = new Pipeline().setStages(Array(decisiontreeModel,labelConverter))

In [9]:
val Array(trainingData, testData) = outcomeFeatureData.randomSplit(Array(0.7, 0.3))

 FUNCTIONS CORRECT AND WRONG

In [10]:
def correctPredictions(predictions: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] ): DataFrame = {
val correctPredictions = predictions.where(expr("outcome == predictedLabel"))
val countCorrectPredictions = correctPredictions.groupBy("outcome").agg(count("outcome").alias("Correct"))
countCorrectPredictions.show()
countCorrectPredictions.toDF
}

def wrongPredictions(predictions: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]): DataFrame = {
val wrongPredictions = predictions.where(expr("outcome != predictedLabel"))
val countErrors = wrongPredictions.groupBy("outcome").agg(count("outcome").alias("Error"))
countErrors.toDF
}


In [11]:
 def accuracy(correctPredictions:DataFrame ,wrongPredictions:DataFrame,testData : DataFrame):(Double,String)=
  {
    val totalRecords=testData.count()
    val trueNegative = correctPredictions.where(col("outcome") ===  "live").select("Correct")
    var tn: Long = 0
    if(trueNegative.count() == 0 ){
      tn = 0
    } else
    {
      tn = trueNegative.head().getLong(0)
    }
    var conf = "True Negative : " + tn + "\n"
    val truePositive = correctPredictions.where(col("outcome") ===  "dead").select("Correct")
    var tp: Long = 0
    if(truePositive.count() == 0 ){
      tp = 0
    } else {
      tp = truePositive.head().getLong(0)
    }
    conf += "True Positive : " + tp + "\n"

    val falseNegative = wrongPredictions.where(col("outcome") ===  "dead").select("Error")
    var fn: Long = 0
    if(falseNegative.count() == 0 ){
      fn = 0
    } else
    {
      fn = falseNegative.head().getLong(0)
    }
    conf += "False Negative : " + fn + "\n"
    val falsePositive = wrongPredictions.where(col("outcome") ===  "live").select("Error")
    var fp: Long = 0
    if(falsePositive.count() == 0 ){
      tp = 0
    } else {
      fp = falsePositive.head().getLong(0)
    }
    conf += "False Positive : " + fp+ "\n"

    val accuracy = (tp.toDouble + tn.toDouble)/totalRecords
    (accuracy,conf)
  }

RANDOM FOREST

In [12]:
val randomForest = pipelinerandomForest.fit(trainingData)
randomForest.save("RandomForesModel")
val predictionsrandomForest = randomForest.transform(testData)
var randomForestCorrect=correctPredictions(predictionsrandomForest)
var randomForestWrong=wrongPredictions(predictionsrandomForest)
randomForestWrong.show()
val (randomAccuracy,matrixRandom) = accuracy(randomForestCorrect,randomForestWrong,testData)
var accuracyString = "RandomForest : " + randomAccuracy + "\n"
var confRandom = sc.parallelize(Seq(matrixRandom))
confRandom.saveAsTextFile("RandomForestConfusion")
print(randomAccuracy)

+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    293|
|   dead|     28|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   live|    4|
|   dead|   81|
+-------+-----+

0.7906403940886699

LOGISTIC REGRESSION 


In [13]:
val logisticModel = pipelinelogisticRegression.fit(trainingData)
logisticModel.save("logisticRegressionModel")
val predictionsLogistic = logisticModel.transform(testData)
var logisticCorrect=correctPredictions(predictionsLogistic)
var logisticWrong=wrongPredictions(predictionsLogistic)
logisticWrong.show()
val (logisticAccuracy,matrixLogistic)= accuracy(logisticCorrect,logisticWrong,testData)
accuracyString += "logisticRegression : " + logisticAccuracy + "\n"
var confMatrixLogistic = sc.parallelize(Seq(matrixLogistic))
confMatrixLogistic.saveAsTextFile("LogisticConfusion")
print(logisticAccuracy)

+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    290|
|   dead|     28|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   live|    7|
|   dead|   81|
+-------+-----+

0.7832512315270936

LINEAR SVM 

In [14]:
val linearSVM = pipelinelinearSVM.fit(trainingData)
linearSVM.save("LinearSVMModel")
val predictionslinearSVM = linearSVM.transform(testData)
var linearSVMCorrect=correctPredictions(predictionslinearSVM)
var linearSVMWrong=wrongPredictions(predictionslinearSVM)
linearSVMWrong.show()
val (linearSVMAccuracy,matrixLinearSVM) = accuracy(linearSVMCorrect,linearSVMWrong,testData)
accuracyString += "linearSVM : " + linearSVMAccuracy + "\n"
var confMatrixLinearSVM = sc.parallelize(Seq(matrixLinearSVM))
confMatrixLinearSVM.saveAsTextFile("LinearSVMConfusion")
print(linearSVMAccuracy)

+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    297|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   dead|  109|
+-------+-----+

0.7315270935960592

NaiveBayesModel

In [15]:
val naiveBayes = pipelineNaiveBayes.fit(trainingData)
naiveBayes.save("NaiveBayesModel")
val predictionsNaiveBayes = naiveBayes.transform(testData)
var naiveBayesCorrect=correctPredictions(predictionsNaiveBayes)
var naiveBayesWrong=wrongPredictions(predictionsNaiveBayes)
naiveBayesWrong.show()
val (naiveBayesAccuracy,matrixNaiveBayes) = accuracy(naiveBayesCorrect,naiveBayesWrong,testData)
accuracyString += "NaiveBayes: " + naiveBayesAccuracy + "\n"
var confMatrixNaiveBayes = sc.parallelize(Seq(matrixNaiveBayes))
confMatrixNaiveBayes.saveAsTextFile("NaiveBayesConfusion")
print(naiveBayesAccuracy)


+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    278|
|   dead|     36|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   live|   19|
|   dead|   73|
+-------+-----+

0.7733990147783252

gbtModel

In [16]:
val gbt = pipelineGBT.fit(trainingData)
gbt.save("GradientBoostingTreeModel")
val predictionsgbt = gbt.transform(testData)
var gbtCorrect=correctPredictions(predictionsgbt)
var gbtWrong=wrongPredictions(predictionsgbt)
gbtWrong.show()
val (gbtAccuracy,matrixGBT)= accuracy(gbtCorrect,gbtWrong,testData)
accuracyString += "GradientBoostingTree : " + gbtAccuracy + "\n"
val confMatrixGBT = sc.parallelize(Seq(matrixGBT))
confMatrixGBT.saveAsTextFile("GBTConfusion")
print(gbtAccuracy)

+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    282|
|   dead|     32|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   live|   15|
|   dead|   77|
+-------+-----+

0.7733990147783252

decisiontreeModel

In [17]:
val decisionTree = pipelinedecisionTree.fit(trainingData)
decisionTree.save("DecisionTreeModel")
val predictionsdecisionTree = decisionTree.transform(testData)
var decisionTreeCorrect=correctPredictions(predictionsgbt)
var decisionTreeWrong=wrongPredictions(predictionsgbt)
decisionTreeWrong.show()
val (decisionTreeAccuracy,matrixDecisionTree) = accuracy(decisionTreeCorrect,decisionTreeWrong,testData)
accuracyString += "DecisionTree : " + decisionTreeAccuracy + "\n"
print(decisionTreeAccuracy)
val confMatrixDT = sc.parallelize(Seq(matrixDecisionTree))
confMatrixDT.saveAsTextFile("DecisionTreeConfusion")

+-------+-------+
|outcome|Correct|
+-------+-------+
|   live|    282|
|   dead|     32|
+-------+-------+

+-------+-----+
|outcome|Error|
+-------+-----+
|   live|   15|
|   dead|   77|
+-------+-----+

0.7733990147783252

In [18]:
 var accuracyCompare = sc.parallelize(Seq(accuracyString))
 accuracyCompare.saveAsTextFile("CompareAccuracies")

DATA ANALYSIS AGE WISE 

In [19]:
import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions.{desc, asc}
import org.apache.spark.ml.feature.SQLTransformer

val minAge= mifemDataset.agg(min("age")).first.getInt(0)
print(minAge)
var ArraySplits= ArrayBuffer[Double]()
val maxAge= mifemDataset.agg(max("age")).first.getInt(0)
print(maxAge)
for( i<-minAge to maxAge  by 5)
{
    ArraySplits.append(i)  
   
}
ArraySplits.append(Double.PositiveInfinity)
ArraySplits.toArray

3569

Array(35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 65.0, Infinity)

In [20]:
val AgeBucketize = new Bucketizer().setInputCol("age").setOutputCol("AgeBucketed").setSplits(ArraySplits.toArray)
AgeBucketize.transform(mifemDataset).select("age","AgeBucketed" ,"outcome").orderBy(desc("age") ).orderBy(desc("AgeBucketed") ).show()
val AgeBucketsGroups =AgeBucketize.transform(mifemDataset).select("age","AgeBucketed" ,"outcome")
AgeBucketsGroups.show
AgeBucketsGroups.registerTempTable("AgeBucketsGroupsTable")
val totalfemales = AgeBucketsGroups.groupBy("AgeBucketed").agg(count("*") as "TotalFemales")
totalfemales.sort("AgeBucketed").show()

+---+-----------+-------+
|age|AgeBucketed|outcome|
+---+-----------+-------+
| 69|        6.0|   live|
| 69|        6.0|   dead|
| 69|        6.0|   live|
| 69|        6.0|   dead|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   dead|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   dead|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   live|
| 69|        6.0|   dead|
| 69|        6.0|   live|
| 69|        6.0|   live|
+---+-----------+-------+
only showing top 20 rows

+---+-----------+-------+
|age|AgeBucketed|outcome|
+---+-----------+-------+
| 63|        5.0|   live|
| 55|        4.0|   live|
| 68|        6.0|   live|
| 64|        5.0|   live|
| 67|        6.0|   dead|
| 66|        6.0|   live|
| 63|        5.0|   live|
| 68|        6.0|   dead|
| 46|        2.0|   dead|
| 66|        6.0|   dead|
| 59|       

In [21]:
val Survivors = AgeBucketsGroups.where(col("outcome") === "live").groupBy("AgeBucketed").agg(count("*") as "Survived")
val SurvivedTable = totalfemales.join(Survivors,Seq("AgeBucketed"),"left_outer")
val survivalPercent = new SQLTransformer().setStatement("SELECT *,`survived`/`totalfemales` *100 as `Survial%` FROM __THIS__ order by AgeBucketed")
val AgeGroupWisePercent = survivalPercent.transform(SurvivedTable)
AgeGroupWisePercent.show

+-----------+------------+--------+-----------------+
|AgeBucketed|TotalFemales|Survived|         Survial%|
+-----------+------------+--------+-----------------+
|        0.0|          16|      14|             87.5|
|        1.0|          32|      29|           90.625|
|        2.0|          60|      46|76.66666666666667|
|        3.0|         107|      93|86.91588785046729|
|        4.0|         214|     178|83.17757009345794|
|        5.0|         371|     266|71.69811320754717|
|        6.0|         495|     348| 70.3030303030303|
+-----------+------------+--------+-----------------+

