# Get Hadoop Access

In [23]:

import org.apache.spark.sql.SparkSession

// @hidden_cell
// This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
// You might want to remove those credentials before you share your notebook.
def setHadoopConfig18a70142643c45b7981f698a88e430e2(name: String) = {
    // This function sets the Hadoop configuration so it is possible to
    // access data from Bluemix Object Storage using Spark

    val prefix = "fs.swift.service." + name
    sc.hadoopConfiguration.set(prefix + ".auth.url", "https://identity.open.softlayer.com" + "/v3/auth/tokens")
    sc.hadoopConfiguration.set(prefix + ".auth.endpoint.prefix","endpoints")
    sc.hadoopConfiguration.set(prefix + ".tenant", "d31104c25a814c6c91023cd95c34cfc5")
    sc.hadoopConfiguration.set(prefix + ".username", "48c530d8f60f48688f3a81a28d73b2ae")
    sc.hadoopConfiguration.set(prefix + ".password", "v2v36a5U?ugFD]QD")
    sc.hadoopConfiguration.setInt(prefix + ".http.port", 8080)
    sc.hadoopConfiguration.set(prefix + ".region", "dallas")
    sc.hadoopConfiguration.setBoolean(prefix + ".public", false)
}

// you can choose any name
val name = "keystone"
setHadoopConfig18a70142643c45b7981f698a88e430e2(name)

val spark = SparkSession.
    builder().
    getOrCreate()

val df_churn = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load("swift://DubaiH2OMeetup." + name + "/churn.all.csv")


1


In [24]:
df_churn.select("state", "phone_number", "international_plan", "voice_mail_plan").show(10)

+-----+------------+------------------+---------------+
|state|phone_number|international_plan|voice_mail_plan|
+-----+------------+------------------+---------------+
|   KS|    382-4657|                no|            yes|
|   OH|    371-7191|                no|            yes|
|   NJ|    358-1921|                no|             no|
|   OH|    375-9999|               yes|             no|
|   OK|    330-6626|               yes|             no|
|   AL|    391-8027|               yes|             no|
|   MA|    355-9993|                no|            yes|
|   MO|    329-9001|               yes|             no|
|   LA|    335-4719|                no|             no|
|   WV|    330-8173|               yes|            yes|
+-----+------------+------------------+---------------+
only showing top 10 rows



## Split into train & test datasets

In [1]:
val Array(train_churn, test_churn) = df_churn.randomSplit(Array(.7, .3), seed=1)

In [2]:
train_churn.count

3490

In [6]:
test_churn.count

1510

In [7]:
df_churn.count

5000

## Feature Extraction

In [9]:
import org.apache.spark.ml.feature.{ StringIndexer, IndexToString }

val churn_indexer = new StringIndexer().
                        setInputCol("churned").
                        setOutputCol("churn_label").
                        fit(df_churn)

val churn_originalLabel_converter = new IndexToString().
                                        setInputCol("prediction").
                                        setOutputCol("predictedLabel").
                                        setLabels(churn_indexer.labels)

val international_plan_indexer = new StringIndexer().
                                    setInputCol("international_plan").
                                    setOutputCol("international_plan_label")

val voice_mail_plan_indexer = new StringIndexer().
                                    setInputCol("voice_mail_plan").
                                    setOutputCol("voice_mail_plan_label")

In [10]:
val excluded_cols = List("state", "area_code", "phone_number", "international_plan", "voice_mail_plan", "churned")
val num_features = df_churn.columns.filterNot(col => excluded_cols.contains(col))
num_features

Array(account_length, number_vmail_messages, total_day_minutes, total_day_calls, total_day_charge, total_eve_minutes, total_eve_calls, total_eve_charge, total_night_minutes, total_night_calls, total_night_charge, total_intl_minutes, total_intl_calls, total_intl_charge, number_customer_service_calls)

## Build the Pipeline

In [11]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{ RandomForestClassifier }
import org.apache.spark.mllib.evaluation.{ BinaryClassificationMetrics, MulticlassMetrics }
import org.apache.spark.ml.evaluation.{ BinaryClassificationEvaluator }
import org.apache.spark.ml.linalg.{ DenseVector => MLDenseVector }


val assembler = new VectorAssembler().
                    setInputCols(num_features ++ Array("international_plan_label", "voice_mail_plan_label")).
                    setOutputCol("features")


val model = new RandomForestClassifier().
                    setLabelCol("churn_label").
                    setFeaturesCol("features")

val pipeline = new Pipeline().setStages(Array(
                    churn_indexer, 
                    international_plan_indexer, 
                    voice_mail_plan_indexer, 
                    assembler, 
                    model,
                    churn_originalLabel_converter))

## Train the model on training dataset

In [12]:
val learned_model = pipeline.fit(train_churn)

## Perf. on training dataset (AUC)

In [13]:
val evaluator = new BinaryClassificationEvaluator().
                        setLabelCol("churn_label").
                        setMetricName("areaUnderROC")

In [14]:
val churn_predictions_train = learned_model.transform(train_churn)
evaluator.evaluate(churn_predictions_train)

0.933808047398309

In [15]:
churn_predictions_train.select("phone_number", "churned", "churn_label", "prediction", "predictedLabel", "probability").show()

+------------+-------+-----------+----------+--------------+--------------------+
|phone_number|churned|churn_label|prediction|predictedLabel|         probability|
+------------+-------+-----------+----------+--------------+--------------------+
|    373-1028| False.|        0.0|       0.0|        False.|[0.96053113136784...|
|    341-9764| False.|        0.0|       0.0|        False.|[0.95039189293736...|
|    399-1526|  True.|        1.0|       0.0|        False.|[0.53300650560664...|
|    378-7733| False.|        0.0|       0.0|        False.|[0.95410079297021...|
|    389-7073| False.|        0.0|       0.0|        False.|[0.95740684022308...|
|    362-8331| False.|        0.0|       0.0|        False.|[0.95845555551214...|
|    352-9130| False.|        0.0|       0.0|        False.|[0.88622877332364...|
|    375-5562| False.|        0.0|       0.0|        False.|[0.96396555229922...|
|    356-5244| False.|        0.0|       0.0|        False.|[0.96030880317658...|
|    365-6756| F

## Perf. on test dataset (AUC)

In [16]:
val churn_predictions_test = learned_model.transform(test_churn)
evaluator.evaluate(churn_predictions_test)

0.9107948973116434

In [17]:
churn_predictions_test.select("phone_number", "churned", "churn_label", "prediction", "predictedLabel", "probability", "rawPrediction").show()

+------------+-------+-----------+----------+--------------+--------------------+--------------------+
|phone_number|churned|churn_label|prediction|predictedLabel|         probability|       rawPrediction|
+------------+-------+-----------+----------+--------------+--------------------+--------------------+
|    375-4450| False.|        0.0|       0.0|        False.|[0.96315495912603...|[19.2630991825207...|
|    414-7942| False.|        0.0|       0.0|        False.|[0.96240331885511...|[19.2480663771022...|
|    390-2346| False.|        0.0|       0.0|        False.|[0.94944124734433...|[18.9888249468866...|
|    336-6533| False.|        0.0|       0.0|        False.|[0.83655599205242...|[16.7311198410484...|
|    404-1931| False.|        0.0|       0.0|        False.|[0.95242437278993...|[19.0484874557986...|
|    389-4602| False.|        0.0|       0.0|        False.|[0.80754245100406...|[16.1508490200813...|
|    334-4506| False.|        0.0|       0.0|        False.|[0.9627711336

## Perf. Confusion Matrix

In [18]:
val predictionsAndLabels = churn_predictions_test.
                            select("churn_label", "prediction").
                            rdd.map { e => (e.getDouble(0), e.getDouble(1))}

val metrics = new MulticlassMetrics(predictionsAndLabels)

val confusionMatrix = metrics.confusionMatrix

confusionMatrix

1285.0  99.0
11.0    115.0

## Perf. ROC

In [19]:
val rawPredictionAndLabels = churn_predictions_test.
                            select("prediction", "probability").
                            rdd.map { e => 
                                val (pred, Array(prob_class0, prob_class1)) = (e.getDouble(0), e(1).asInstanceOf[MLDenseVector].toArray)
                                (prob_class1, pred)
                            }

val metrics = new BinaryClassificationMetrics(rawPredictionAndLabels)

val roc = metrics.roc.collect

In [20]:
roc.take(20).foreach(println)

(0.0,0.0)
(0.0,0.007936507936507936)
(0.0,0.015873015873015872)
(0.0,0.023809523809523808)
(0.0,0.031746031746031744)
(0.0,0.03968253968253968)
(0.0,0.047619047619047616)
(0.0,0.05555555555555555)
(0.0,0.06349206349206349)
(0.0,0.07142857142857142)
(0.0,0.07936507936507936)
(0.0,0.0873015873015873)
(0.0,0.09523809523809523)
(0.0,0.10317460317460317)
(0.0,0.1111111111111111)
(0.0,0.11904761904761904)
(0.0,0.12698412698412698)
(0.0,0.1349206349206349)
(0.0,0.14285714285714285)
(0.0,0.15079365079365079)


## Perf. Lift Curve

In [21]:
val nbPoints = 100

val predProbs = churn_predictions_test.select("churn_label", "probability").
                    rdd.map { e => 
                            val (pred, Array(prob_class0, prob_class1)) = (e(0), e(1).asInstanceOf[MLDenseVector].toArray)
                            (pred, prob_class1)
                    }.
                    collect.
                    sortBy(-_._2)

val sampleSize = predProbs.size

val evalMetrics = (1 to nbPoints).reverse.map { e =>
                                       
    val threshold: Double = e.toDouble / nbPoints
                                                                              
    // Sensitivity or Recall
    val sensDF = predProbs.filter(f => f._2 > threshold)
    val nSens = sensDF.size
    val tp = sensDF.map(_._1).filter(_ == 1.0).size // true pos
    val fn = sensDF.map(_._1).filter(_ == 0.0).size // false neg
    val sensitivity: Double = if(nSens > 0) tp.toDouble / nSens else 0.0
                                        
    // Specificity computation
    //val specDF = predProbs.filter(f => f._2 <= threshold)
    //val tn = specDF.map(_._1).filter(_ == 0.0).size // true neg
    //val fp = specDF.map(_._1).filter(_ == 1.0).size // false pos
    //val nSpec  = specDF.size
    //val specificity: Double = if(nSpec > 0) tn.toDouble / nSpec else 0.0
    
    // Precision
    //val precision = tp.toDouble / (tp + fp) 

    // Support
    val supp = nSens.toDouble / sampleSize
    //(supp, sensitivity) // GAIN CURVE                                                 
    (supp, sensitivity / supp) // LIFT CURVE
    // (1-specificity, sensitivity) // ROC 
}.
distinct.
filter(_._2 > 0)

In [25]:
evalMetrics.take(20).foreach(println)

(0.0013245033112582781,755.0)
(0.001986754966887417,503.3333333333333)
(0.0033112582781456954,302.0)
(0.003973509933774834,251.66666666666666)
(0.008609271523178808,116.15384615384616)
(0.011920529801324504,83.88888888888889)
(0.015231788079470199,65.65217391304347)
(0.016556291390728478,60.4)
(0.017880794701986755,55.925925925925924)
(0.02052980132450331,47.138397502601464)
(0.023178807947019868,41.91020408163265)
(0.025827814569536423,37.72518080210388)
(0.02781456953642384,35.09637188208617)
(0.029801324503311258,32.809876543209874)
(0.03377483443708609,29.027297193387156)
(0.03642384105960265,26.955371900826446)
(0.0423841059602649,23.22509765625)
(0.046357615894039736,21.26326530612245)
(0.04966887417218543,19.864888888888892)
(0.051655629139072845,19.110782380013152)


## Saving the model

In [26]:
val model_path = "swift://DubaiH2OMeetup." + name + "/rf_churn.model_v0"
sc.parallelize(Seq(learned_model), 1).saveAsObjectFile(model_path)

# Reload Model

In [27]:
import org.apache.spark.ml.PipelineModel
val loaded_model = sc.objectFile[PipelineModel](model_path).first

In [28]:
val churn_predictions_test_fromLoadedModel = loaded_model.transform(test_churn)
evaluator.evaluate(churn_predictions_test_fromLoadedModel)

0.9107948973116434

In [29]:
churn_predictions_test_fromLoadedModel.select("phone_number", "churned", "churn_label", "prediction", "probability").show()

+------------+-------+-----------+----------+--------------------+
|phone_number|churned|churn_label|prediction|         probability|
+------------+-------+-----------+----------+--------------------+
|    375-4450| False.|        0.0|       0.0|[0.96315495912603...|
|    414-7942| False.|        0.0|       0.0|[0.96240331885511...|
|    390-2346| False.|        0.0|       0.0|[0.94944124734433...|
|    336-6533| False.|        0.0|       0.0|[0.83655599205242...|
|    404-1931| False.|        0.0|       0.0|[0.95242437278993...|
|    389-4602| False.|        0.0|       0.0|[0.80754245100406...|
|    334-4506| False.|        0.0|       0.0|[0.96277113366150...|
|    394-5202| False.|        0.0|       0.0|[0.96185869884476...|
|    362-7200| False.|        0.0|       0.0|[0.96076517735521...|
|    372-8261| False.|        0.0|       0.0|[0.95800230109628...|
|    396-2335| False.|        0.0|       0.0|[0.95409161290265...|
|    394-4548| False.|        0.0|       0.0|[0.95456121573704