# Apache Spark - Time series implementation from first principles

Topics Covered:

  1. Fitting a custom transformer model
  1. Transforming data
  1. Training ML models
  1. Examining trained models
  1. Evaluating model performance
  1. Testing model

When printing or logging messages during a program's execution, its often required to format variables and string formatting is useful in such situations:

---
## Libraries for Apache Spark

When running in a jupyter notebook, sometimes the required libraries may not exist in the classpath.

Load essential spark libraries from maven public repositories at runtime like this:

In [1]:
import $ivy.`org.apache.spark::spark-core:3.2.0`
import $ivy.`org.apache.spark::spark-mllib-local:3.2.0`
import $ivy.`org.apache.spark::spark-mllib:3.2.0`
import $ivy.`org.apache.spark::spark-graphx:3.2.0`
import $ivy.`org.apache.spark::spark-streaming:3.2.0`
import $ivy.`org.apache.spark::spark-tags:3.2.0`

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                          
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                                        
[39m
[32mimport [39m[36m$ivy.$                                   [39m

In [2]:
import $ivy.`org.scalanlp::breeze-viz:1.2`
import $ivy.`org.jfree:jfreechart:1.5.4`
import $ivy.`org.creativescala::doodle-core:0.9.21`

[32mimport [39m[36m$ivy.$                             
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                                      [39m

---

## Import Spark Libraries

In [3]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

[32mimport [39m[36morg.apache.spark.SparkContext
[39m
[32mimport [39m[36morg.apache.spark.SparkConf
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m

In [4]:
import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{col, udf, _}

[32mimport [39m[36morg.apache.spark.ml.linalg.{Matrix, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.sql.Row
[39m
[32mimport [39m[36morg.apache.spark.sql.Dataset
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, udf, _}[39m

In [5]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.graphx._
// To make some of the examples work we will also need RDD
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD[39m

In [6]:
import breeze.linalg._
import breeze.plot._

[32mimport [39m[36mbreeze.linalg._
[39m
[32mimport [39m[36mbreeze.plot._[39m

In [6]:
// this uses the IBM DB2 connector to read from a DB2 table
//import $ivy.`com.ibm.db2.jcc:db2jcc:db2jcc4`;

In [7]:
val appName = "Spark_Time_Series"

[36mappName[39m: [32mString[39m = [32m"Spark_Time_Series"[39m

## Setup the Logger

To control the volume of log messages, change the log4j configuraiton programatically like this:

In [8]:
import org.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

val logger: Logger = Logger.getLogger(appName)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
logger.setLevel(Level.INFO)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

[39m
[36mlogger[39m: [32mLogger[39m = org.apache.log4j.Logger@5e36a703

---
## Create Spark session

In [8]:
// close the spark session and spark context before starting a new one, if re-executing the notebook.

//spark.stop()
//sc.stop()

cmd8.sc:1: not found: value spark
val res8_0 = spark.stop()
             ^cmd8.sc:2: not found: value sc
val res8_1 = sc.stop()
             ^Compilation Failed

: 

In [9]:
val sparkConf = new SparkConf()
             .setAppName(appName)
             .setMaster("local[*]")
             //.setMaster("spark://localhost:7077")
             //.setMaster("spark://sparkmaster320:7077")
             .set("spark.driver.extraClassPath", "c:/bin/lib/db2jcc4.jar,c:/bin/lib/breeze-viz_2.12-1.2.jar")
             .set("spark.executor.extraClassPath", "c:/bin/lib/db2jcc4.jar,c:/bin/lib/breeze-viz_2.12-1.2.jar")
             .set("spark.default.parallelism", "6")

[36msparkConf[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@25d19ee9

In [10]:
// Apply the config to start a spark session:
val spark = org.apache.spark.sql.SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/05/15 10:29:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@45a8b08d

In [11]:
val sc = spark.sparkContext

[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@478fb89

## Get information on Spark Session

Use spark context and config objects to get essential information.

In [12]:
println("Spark Master: %s, User: %s, Version: %s, Deployment mode: %s".format(
        sc.master, sc.sparkUser, sc.version, sc.deployMode
    ))

println("Default Partitions: %d, Scheduling Mode: %s".format(
         sc.defaultMinPartitions, sc.getSchedulingMode
    ))

Spark Master: local[*], User: notebooker, Version: 3.2.0, Deployment mode: client
Default Partitions: 2, Scheduling Mode: FIFO


In [13]:
val config = sc.getConf

for ((k,v) <- config.getAll) println(s"Configuration Parameter: $k=$v")

Configuration Parameter: spark.driver.host=jupyterlab
Configuration Parameter: spark.app.startTime=1683715095650
Configuration Parameter: spark.app.id=local-1683715097940
Configuration Parameter: spark.default.parallelism=6
Configuration Parameter: spark.driver.port=41583
Configuration Parameter: spark.executor.extraClassPath=c:/bin/lib/db2jcc4.jar,c:/bin/lib/breeze-viz_2.12-1.2.jar
Configuration Parameter: spark.master=local[*]
Configuration Parameter: spark.driver.extraClassPath=c:/bin/lib/db2jcc4.jar,c:/bin/lib/breeze-viz_2.12-1.2.jar
Configuration Parameter: spark.executor.id=driver
Configuration Parameter: spark.app.name=Spark_Time_Series


[36mconfig[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@3dea9ece

In [14]:
config.getOption("spark.executor.extraClassPath")

[36mres13[39m: [32mOption[39m[[32mString[39m] = [33mSome[39m(
  [32m"c:/bin/lib/db2jcc4.jar,c:/bin/lib/breeze-viz_2.12-1.2.jar"[39m
)

In [15]:
config.getOption("spark.jars")

[36mres14[39m: [32mOption[39m[[32mString[39m] = [32mNone[39m

In [16]:
sys.env("PATH")

[36mres15[39m: [32mString[39m = [32m"/opt/conda/bin:/home/notebooker/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin"[39m

---

## Define data schema

Data can be loaded into a dataframe by reading from a csv file

In [19]:
// Declare a Schema
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}

val bank_telemkt_schema: StructType = new StructType()
    .add("age", DoubleType, true)
    .add("job", StringType, true)
    .add("marital", StringType, true)
    .add("education", StringType, true)
    .add("defaulted", StringType, true)
    .add("housing", StringType, true)
    .add("loan", StringType, true)
    .add("contact_no", StringType, true)
    .add("month_name", StringType, true)
    .add("day_of_week", StringType, true)
    .add("duration", DoubleType, true)
    .add("campaign", DoubleType, true)
    .add("pdays", DoubleType, true)
    .add("previous", DoubleType, true)
    .add("poutcome", StringType, true)
    .add("emp_var_rate", DoubleType, true)
    .add("cons_price_idx", DoubleType, true)
    .add("cons_conf_idx", DoubleType, true)
    .add("euribor3m", DoubleType, true)
    .add("nr_employed", DoubleType, true)
    .add("y", StringType, true);

[32mimport [39m[36morg.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}

[39m
[36mbank_telemkt_schema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"age"[39m, DoubleType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"job"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"marital"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"education"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"defaulted"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"housing"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"loan"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"contact_no"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"month_name"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"day_of_week"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"dura

In [20]:
// declare a class if you want to use DataSets instead of Data Frames:
case class ModelDataRecord(
                            age: Double,
                            job: String,
                            marital: String,
                            education: String,
                            defaulted: String,
                            housing: String,
                            loan: String,
                            contact_no: String,
                            month_name: String,
                            day_of_week: String,
                            duration: Double,
                            campaign: Double,
                            pdays: Double,
                            previous: Double,
                            poutcome: String,
                            emp_var_rate: Double,
                            cons_price_idx: Double,
                            cons_conf_idx: Double,
                            euribor3m: Double,
                            nr_employed: Double,
                            y: String
                          )

defined [32mclass[39m [36mModelDataRecord[39m

### Variable Names

Define variables with the column names.

These will be used during data transformation and model training/evaluation later.

In [21]:
val originalLabelColname="y"
val labelColname = "label"

val numerical_features = Array("age", "duration", "pdays", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed");
val categorical_features = Array("job", "marital", "education", "defaulted", "housing", "loan", "day_of_week", "poutcome", "month_name");

[36moriginalLabelColname[39m: [32mString[39m = [32m"y"[39m
[36mlabelColname[39m: [32mString[39m = [32m"label"[39m
[36mnumerical_features[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"age"[39m,
  [32m"duration"[39m,
  [32m"pdays"[39m,
  [32m"emp_var_rate"[39m,
  [32m"cons_price_idx"[39m,
  [32m"cons_conf_idx"[39m,
  [32m"euribor3m"[39m,
  [32m"nr_employed"[39m
)
[36mcategorical_features[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"job"[39m,
  [32m"marital"[39m,
  [32m"education"[39m,
  [32m"defaulted"[39m,
  [32m"housing"[39m,
  [32m"loan"[39m,
  [32m"day_of_week"[39m,
  [32m"poutcome"[39m,
  [32m"month_name"[39m
)

## Create a test dataframe

In [12]:
import org.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType, DoubleType}
import org.apache.spark.sql.Row

[32mimport [39m[36morg.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType, DoubleType}
[39m
[32mimport [39m[36morg.apache.spark.sql.Row[39m

In [17]:
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [17]:
// Read data from a file:

// val inputDF = spark.read
//     .option("header", "true")
//     .option("numPartitions", 6)
//     .schema(bank_telemkt_schema)
//     .csv("/home/datasets/bank_telemkt/bank-additional-full.csv")
//     .as[ModelDataRecord]

//inputDF.printSchema()

In [82]:
val rawData = Seq(
    ("T1", "X", 1),
    ("T2", "X", 2),
    ("T3", "X", 3),
    ("T4", "X", 4),
    ("T5", "X", 5),
    ("T6", "Y", 1),
    ("T7", "Y", 2),
    ("T6", "Y", 3)
);

[36mrawData[39m: [32mSeq[39m[([32mString[39m, [32mString[39m, [32mInt[39m)] = [33mList[39m(
  ([32m"T1"[39m, [32m"X"[39m, [32m1[39m),
  ([32m"T2"[39m, [32m"X"[39m, [32m2[39m),
  ([32m"T3"[39m, [32m"X"[39m, [32m3[39m),
  ([32m"T4"[39m, [32m"X"[39m, [32m4[39m),
  ([32m"T5"[39m, [32m"X"[39m, [32m5[39m),
  ([32m"T6"[39m, [32m"Y"[39m, [32m1[39m),
  ([32m"T7"[39m, [32m"Y"[39m, [32m2[39m),
  ([32m"T6"[39m, [32m"Y"[39m, [32m3[39m)
)

In [103]:
val testdf: DataFrame = spark.createDataFrame(rawData).toDF("id", "category", "ts")

[36mtestdf[39m: [32mDataFrame[39m = [id: string, category: string ... 1 more field]

In [102]:
spark

[36mres101[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@45a8b08d

In [83]:
val ts_schema = StructType( Array(
    StructField("id", IntegerType,true),
    StructField("category", StringType,true),
    StructField("ts", DoubleType,true)
    ))

[36mts_schema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"id"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"category"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"ts"[39m, DoubleType, [32mtrue[39m, {})
)

In [114]:
val testrdd:RDD[(String, String, Int)] = spark.sparkContext.parallelize(rawData);

[36mtestrdd[39m: [32mRDD[39m[([32mString[39m, [32mString[39m, [32mInt[39m)] = ParallelCollectionRDD[207] at parallelize at cmd113.sc:1

In [115]:
val rowRDD = testrdd.map(attributes => Row(attributes._1, attributes._2, attributes._3))

[36mrowRDD[39m: [32mRDD[39m[[32mRow[39m] = MapPartitionsRDD[208] at map at cmd114.sc:1

---

## Explore the Data

Show top 4 rows of the DataFrame loaded from this source.

In [35]:
val numrows = testdf.count().toInt

[36mnumrows[39m: [32mInt[39m = [32m8[39m

In [86]:
testdf.show(numrows)

+---+--------+---+
| id|category| ts|
+---+--------+---+
| T1|       X|  1|
| T2|       X|  2|
| T3|       X|  3|
| T4|       X|  4|
| T5|       X|  5|
| T6|       Y|  1|
| T7|       Y|  2|
| T6|       Y|  3|
+---+--------+---+



In [87]:
val duplicates_count = testdf.groupBy("id").count().filter("count > 1").collect()

[36mduplicates_count[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([T6,2])

### Run any type of SQL queries on the dataframe

First, declare the dataframe as a "view".

Then, it will be available to query using SQL statements.

In [45]:
testdf.createOrReplaceTempView("input")

Now, a simple SQL statement can be run to query from this dataframe 'inputDF' that has been declared as a 'view'

In [47]:
val jobcounts = spark.sql("SELECT category, count(1) as count from input group by category")

jobcounts.show()

+--------+-----+
|category|count|
+--------+-----+
|       X|    5|
|       Y|    2|
+--------+-----+



[36mjobcounts[39m: [32mDataFrame[39m = [category: string, count: bigint]

In [23]:
testdf.filter("category == 'X'").show()

+---+--------+---+
| id|category| ts|
+---+--------+---+
|  1|       X|  1|
|  2|       X|  2|
|  3|       X|  3|
|  4|       X|  4|
|  5|       X|  5|
+---+--------+---+



---

## TimeSeries Object

In [54]:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, lag}

import java.io.{IOException, ObjectStreamException}
import org.apache.spark.ml.util.Identifiable

[32mimport [39m[36morg.apache.spark.sql.{DataFrame, SparkSession}
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, lag}

[39m
[32mimport [39m[36mjava.io.{IOException, ObjectStreamException}
[39m
[32mimport [39m[36morg.apache.spark.ml.util.Identifiable[39m

In [78]:
// Define a class to represent the time series data
@SerialVersionUID(100L)
class TimeSeries(
  val TimeAnchor: String,
  val TimeCycle: Int,
  var TimeValues:DataFrame,
  var timeValueCol: String,
  var timeIndexCol: String,
  var InterpolationMethod: Int
) extends Serializable {

  override def toString:String = Identifiable.randomUID(
    prefix = f"TimeSeries[Cycle=$TimeCycle%s, Anchor=${TimeAnchor}]"
  )

  // array of column names of the covariates of the dataset
  var covariatesCol:List[String] = List()

  // usageIsForecast : is for forecast if true, is for fitting if false
  var usageIsForecast: Boolean = false;

  // keep only time series column y and covariates
  def keepOnlyCovariates():Unit = {
    var colsToKeep = timeIndexCol :: timeValueCol :: Nil
    colsToKeep :::= covariatesCol
    TimeValues = TimeValues.select(
      colsToKeep.map(x => col(x)): _*
    )
  }

  // sort time series in date/time order
  def sortTimeSeries():Unit = {
    TimeValues = TimeValues.sort(timeIndexCol)
  }

  // handle duplicate values
  def checkDuplicates():Unit = {
    // get count of grouped ts
    val duplicates_count = TimeValues.groupBy(timeIndexCol).count().filter("count > 1").collect()
    if(duplicates_count.length > 0 ){
      throw new RuntimeException("Duplicate time values not permitted in time series data.")
    }
  }

  def interpolateData():Unit = {
    // TODO: implement interpolation of missing and expected data points
    if( InterpolationMethod == 1){
      // linear interpolation
    }else if (InterpolationMethod == 2) {
      // exponential spline interpolation
    } else if (InterpolationMethod == 3) {
      // cubic spline interpolation
    }
  }

  private def writeObject (out: java.io.ObjectOutputStream ) = {

    throw new IOException()
  }

  private def readObject (in: java.io.ObjectInputStream) = {
    throw new IOException()
    // throw ClassNotFoundException
  }

  private def readObjectNoData() = {
    //throw ObjectStreamException();
  }

  sortTimeSeries();
  checkDuplicates();
  interpolateData()
  println("End of constructor");
}


defined [32mclass[39m [36mTimeSeries[39m

In [96]:
var ts1: TimeSeries = new TimeSeries("milliseconds", 1, testdf.filter("category == 'X'"), "ts", "id", 0)

End of constructor


In [104]:
ts1.TimeValues.show()

+---+---+
| id| ts|
+---+---+
| T1|  1|
| T2|  2|
| T3|  3|
| T4|  4|
| T5|  5|
+---+---+



In [98]:
ts1.keepOnlyCovariates

In [109]:
ts1.TimeValues.columns

[36mres108[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m([32m"id"[39m, [32m"ts"[39m)

In [113]:
ts1.TimeValues.columns.length

[36mres112[39m: [32mInt[39m = [32m2[39m

In [99]:
ts1.TimeValues.show()

+---+---+
| id| ts|
+---+---+
| T1|  1|
| T2|  2|
| T3|  3|
| T4|  4|
| T5|  5|
+---+---+



In [101]:
ts1.interpolateData

In [100]:
var ts2: TimeSeries = new TimeSeries("milliseconds", 1, testdf.filter("category == 'Y'"), "ts", "id", 1)

: 

In [53]:
ts2.TimeValues.show()

+---+--------+---+
| id|category| ts|
+---+--------+---+
|  6|       Y|  1|
|  7|       Y|  2|
|  6|       Y|  3|
+---+--------+---+



In [69]:
val timeval: String = "ts";
val timeindex: String = "id";
val covariates = List("category");

[36mtimeval[39m: [32mString[39m = [32m"ts"[39m
[36mtimeindex[39m: [32mString[39m = [32m"id"[39m
[36mcovariates[39m: [32mList[39m[[32mString[39m] = [33mList[39m([32m"category"[39m)

In [73]:
var listCols = timeval :: timeindex :: Nil
listCols :::= covariates
testdf.select(listCols.map(m=>col(m)):_*)

---
## Transform the data

  - Convert text columns into indexed data representing categorical variables
  - Apply one-hot encoding to categorical variables
  - Scale numerical variables using min-max values
  - 'Assemble' feature columns together for ML algorithms to use for training
  - Others: A wide variety of transformations are available out-of-the-box

In [112]:
import org.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.linalg

[32mimport [39m[36morg.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}
[39m
[32mimport [39m[36morg.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
[39m
[32mimport [39m[36morg.apache.spark.ml.linalg[39m

In [113]:
// first of all, index the binary label column:
val labelIndexer = new StringIndexer()
      .setInputCol(originalLabelColname)
      .setOutputCol("label")

[36mlabelIndexer[39m: [32mStringIndexer[39m = strIdx_648d3b0473d8

After declaring the indexer, we "fit" it on the data.

In [114]:
val fittedIndexer = labelIndexer.fit(inputDF)

[36mfittedIndexer[39m: [32mml[39m.[32mfeature[39m.[32mStringIndexerModel[39m = StringIndexerModel: uid=strIdx_648d3b0473d8, handleInvalid=error

Now, this "fitted" transformer is ready to be used. We run the transform on our dataset to get the required result.

In this case, change the target variable form text to a categorical variable.

In [115]:
val transformedDF = fittedIndexer.transform(inputDF)

[36mtransformedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 20 more fields]

In [116]:
transformedDF.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
|56.0|housemaid|married|   basic.4y|       no|     no|  no| telephone|       may|        mon|   261.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|  0.0|
|57.0| services|married|high.school|  unknown|     no|  no| telephone|       may|        mon|   149.0|     1.0|999.0|     0.0|nonexistent|         1

----

### Create data transformation Pipeline

Instead of applying transformations one by one, let us collect these into a pipeline programatically and apply them all at once.

In the code below, We'll use an ArrayBuffer to dynamically collect all transformations.

In [117]:
// this buffer "xforms" will accumulate all our transformations till we're ready to put them in a pipeline
var xforms = scala.collection.mutable.ArrayBuffer.empty[PipelineStage];

[36mxforms[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m()

### Step 1: first of all, index the binary label column:

In [118]:
val labelIndexer = new StringIndexer()
  .setInputCol(originalLabelColname)
  .setOutputCol("label")

// add this to the array buffer:
xforms += labelIndexer;

[36mlabelIndexer[39m: [32mStringIndexer[39m = strIdx_b92509fa5ad4
[36mres117_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4
)

In [119]:
xforms.length

[36mres118[39m: [32mInt[39m = [32m1[39m

### Step 2: Next, add a column indexer for each categorical column.

Notice how the colun name is used to set the input column name and create the output column name prefixed with "idx_".

In [120]:
categorical_features.foreach(
    x => 
    xforms += new StringIndexer().setInputCol(x).setOutputCol("idx_" + x)
    )

println(s"Indexing categorical variables. Count of transformations at this point is now = ${xforms.length}")

Indexing categorical variables. Count of transformations at this point is now = 9


### Step 3: Next, apply one-hot encoding to all categorical variables:

In [121]:
categorical_features.foreach(x => xforms += new OneHotEncoder().setInputCol("idx_" + x).setOutputCol("vec_idx_" + x))

println(s"One-hot encoding all categorical variables. Count of transformations at this point is now = ${xforms.length}")

One-hot encoding all categorical variables. Count of transformations at this point is now = 17


At this point, let us ather all column names, these will be used in vector assembler later:

In [122]:
var allColNames = scala.collection.mutable.ArrayBuffer.empty[String]

categorical_features.foreach(x => allColNames += "vec_idx_%s".format(x))

In [123]:
// gather all numerical variables to assemble into a vector for applying scaling:
var numericalColNames = scala.collection.mutable.ArrayBuffer.empty[String]

numerical_features.foreach(y => numericalColNames += y)

### Important Concept

Note: The vector assembler gathers all numerical variables and creates a vector out of these. This is then used for other transformations.

Almost all ML algorithms operate on a vector column of dependent variables. Hence, vector assembler is necessity in most situations.

### Step 4: Assemble all numerical features.

In [124]:
val assembler1 = new VectorAssembler()
  .setInputCols(numerical_features.toArray)
  .setOutputCol("numericalfeatures")

xforms += assembler1;

println(s"Assembled together all numerical variables.\n Count of transformations at this point is now = ${xforms.length}")

Assembled together all numerical variables.
 Count of transformations at this point is now = 18


[36massembler1[39m: [32mVectorAssembler[39m = VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
[36mres123_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
)

### Step 5: Apply a min-max scaler for the numerical features:

In [125]:
xforms += new MinMaxScaler().setInputCol("numericalfeatures").setOutputCol("scaledfeatures");
allColNames += "scaledfeatures"

println(s"Scaled all numerical variables by min-max scaler.\n Count of transformations at this point is now = ${xforms.length}")

Scaled all numerical variables by min-max scaler.
 Count of transformations at this point is now = 19


[36mres124_0[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8,
  minMaxScal_d34452318ea3
)
[36mres124_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mString[39m] = [33mArrayBuffer[39m(
  [32m"vec_idx_job"[39m,
  [32m"vec_idx_marital"[39m,
  [32m"vec_idx_education"[39m,
  [32m"vec_idx_defaulted"[39m,
  [32m"vec_idx_housing"[39m,
  [32m"vec_idx_loan"[39m,


### Step 6: Finally, collect all columns into the "features" column, this is a vector column which is the set of all dependent variables to be used for model training.

In [126]:
val assembler2 = new VectorAssembler()
  .setInputCols(allColNames.toArray)
  .setOutputCol("features")

xforms += assembler2;

println(s"Collect all scaled numerical variables and the categorical variables together.")
println(s"Count of transformations at this point is now = ${xforms.length}")

Collect all scaled numerical variables and the categorical variables together.
Count of transformations at this point is now = 20


[36massembler2[39m: [32mVectorAssembler[39m = VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9
[36mres125_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8,
  minMaxScal_d34452318ea3,
  VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9
)

In [127]:
// print out all the transformations
xforms.toArray.map(x => println(x))

strIdx_b92509fa5ad4
strIdx_81f85490b36c
strIdx_ce69f1f871b5
strIdx_c015f894853f
strIdx_32f039b5508e
strIdx_aeb9100a7da0
strIdx_df5472840729
strIdx_c594c9129ba0
strIdx_2994d75552c7
oneHotEncoder_a2281c834864
oneHotEncoder_2e9a416be7af
oneHotEncoder_dd81b0e3ca9f
oneHotEncoder_2edcf25b0c71
oneHotEncoder_0c3912d2c3ba
oneHotEncoder_051edbcf4b3a
oneHotEncoder_456c6d53b3ae
oneHotEncoder_49ca0d8f1c3d
VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
minMaxScal_d34452318ea3
VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9


[36mres126[39m: [32mArray[39m[[32mUnit[39m] = [33mArray[39m(
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  ()
)

### Step 7: Next, create a model pipeline with all these transformations

In [128]:
println(s"Assembling pipeline with the following transformations: ${xforms.mkString}")

val xformPipeline = new Pipeline()
  .setStages(xforms.toArray);

Assembling pipeline with the following transformations: strIdx_b92509fa5ad4strIdx_81f85490b36cstrIdx_ce69f1f871b5strIdx_c015f894853fstrIdx_32f039b5508estrIdx_aeb9100a7da0strIdx_df5472840729strIdx_c594c9129ba0strIdx_2994d75552c7oneHotEncoder_a2281c834864oneHotEncoder_2e9a416be7afoneHotEncoder_dd81b0e3ca9foneHotEncoder_2edcf25b0c71oneHotEncoder_0c3912d2c3baoneHotEncoder_051edbcf4b3aoneHotEncoder_456c6d53b3aeoneHotEncoder_49ca0d8f1c3dVectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8minMaxScal_d34452318ea3VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9


[36mxformPipeline[39m: [32mPipeline[39m = pipeline_ba250921493c

### Step 8: Fit the pipeline to create the transformer object. 

In [129]:
val dataTransformPipelineFitted = xformPipeline.fit(inputDF);

[36mdataTransformPipelineFitted[39m: [32mPipelineModel[39m = pipeline_ba250921493c

In [130]:
println("Now saving the transformation pipeline to disk at: ")
dataTransformPipelineFitted.write.overwrite().save("/tmp/dataTransformPipeline")

Now saving the transformation pipeline to disk at: 


23/01/30 12:13:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
23/01/30 12:13:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: Fil

### Step 9: Use the fitted transformer to apply the transformations on the dataset

In [55]:
// Run the transformation pipeline on the dataset to prepare the data for model building
val preparedDF: org.apache.spark.sql.DataFrame = dataTransformPipelineFitted.transform(inputDF);

println("Completed transforming data using the pipeline.")

23/01/29 21:36:20 INFO SparkMLDemo: Completed transforming data using the pipeline.


[36mpreparedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 39 more fields]

In [56]:
preparedDF.show(4)

23/01/29 21:36:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|   vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome|   numericalfeatures|      scaledfeatures|      

### Define a convenience function to create the feature transformation pipeline programatically.

In [139]:
import org.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}

/**
* Transforms the raw dataset columns to a form usable for training the models -
* e.g. string to categorical variables, one-hot encoding, scaling of continuous variables, etc.
*
* @param inputDF Input raw dataset
* @param labelColname Name of the label column
* @param categoricalFeatures List of column names which are categorical features
* @param numericalFeatures List of column names which are numerical features
* @return The fitted transformation pipeline
*/
def firTransformDataPipeline(inputDF: Dataset[ModelDataRecord],
                labelColname: String,
                categoricalFeatures: Array[String],
                numericalFeatures: Array[String]): PipelineModel = {

    // this buffer "xforms" will accumulate all our transformations till we're ready to put them in a pipeline
    var xforms = scala.collection.mutable.ArrayBuffer.empty[PipelineStage];

    // first of all, index the binary label column:
    val labelIndexer = new StringIndexer()
      .setInputCol(labelColname)
      .setOutputCol("label")
    xforms += labelIndexer;

    // add a column indexer for each categorical column:
    categoricalFeatures.foreach(x => xforms += new StringIndexer().setInputCol(x).setOutputCol("idx_" + x))
    logger.info("Indexing categorical variables.")

    categoricalFeatures.foreach(x => xforms += new OneHotEncoder().setInputCol("idx_" + x).setOutputCol("vec_idx_" + x))
    logger.info("On-hot encoding all categorical variables.")

    // gather all column names, these will be used in vector assembler later:
    var allColNames = scala.collection.mutable.ArrayBuffer.empty[String]
    categoricalFeatures.foreach(x => allColNames += "vec_idx_%s".format(x))

    // gather all numerical variables to assemble into a vector for scaling
    var numericalColNames = scala.collection.mutable.ArrayBuffer.empty[String]
    numericalFeatures.foreach(y => numericalColNames += y)
    val assembler1 = new VectorAssembler()
      .setInputCols(numericalColNames.toArray)
      .setOutputCol("numericalfeatures")
    xforms += assembler1;
    logger.info("Assembled together all numerical variables.")

    // apply a min-max scaler for the numerical features:
    xforms += new MinMaxScaler().setInputCol("numericalfeatures").setOutputCol("scaledfeatures");
    allColNames += "scaledfeatures"
    logger.info("Scaled all numerical variables by min-max scaler.")

    // finally, collect all columns into the "features" column, this is a vector object
    val assembler2 = new VectorAssembler()
      .setInputCols(allColNames.toArray)
      .setOutputCol("features")
    xforms += assembler2;

    logger.info("Assembling pipeline with the following transformations: \n" + xforms.mkString(" \n"))
    val xformPipeline = new Pipeline()
      .setStages(xforms.toArray);

    val xformFitted = xformPipeline.fit(inputDF);
    logger.info("Completed fitting the pipeline")

    return xformFitted
}

[32mimport [39m[36morg.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}

/**
* Transforms the raw dataset columns to a form usable for training the models -
* e.g. string to categorical variables, one-hot encoding, scaling of continuous variables, etc.
*
* @param inputDF Input raw dataset
* @param labelColname Name of the label column
* @param categoricalFeatures List of column names which are categorical features
* @param numericalFeatures List of column names which are numerical features
* @return The fitted transformation pipeline
*/
[39m
defined [32mfunction[39m [36mfirTransformDataPipeline[39m

In [144]:
categorical_features

[36mres143[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"job"[39m,
  [32m"marital"[39m,
  [32m"education"[39m,
  [32m"defaulted"[39m,
  [32m"housing"[39m,
  [32m"loan"[39m,
  [32m"day_of_week"[39m,
  [32m"poutcome"[39m
)

In [146]:
val xformPipeline = firTransformDataPipeline(inputDF, originalLabelColname, categorical_features, numerical_features)

23/01/30 13:48:52 INFO SparkMLDemo: Indexing categorical variables.
23/01/30 13:48:52 INFO SparkMLDemo: On-hot encoding all categorical variables.
23/01/30 13:48:52 INFO SparkMLDemo: Assembled together all numerical variables.
23/01/30 13:48:52 INFO SparkMLDemo: Scaled all numerical variables by min-max scaler.
23/01/30 13:48:52 INFO SparkMLDemo: Assembling pipeline with the following transformations: 
strIdx_387de59ef340 
strIdx_da034cc60575 
strIdx_cd96b3a27cc3 
strIdx_cdb9382316bb 
strIdx_533d8e9f698e 
strIdx_710fcc1c2f06 
strIdx_583318018d1f 
strIdx_bd235a3f2b37 
strIdx_17810b26410a 
strIdx_f9e59d5bb2d9 
oneHotEncoder_23267482a5e1 
oneHotEncoder_9a097096c385 
oneHotEncoder_1743522806e8 
oneHotEncoder_606c90442245 
oneHotEncoder_86b7ed7edbcc 
oneHotEncoder_7cff57b6ffd5 
oneHotEncoder_7cbb64f05541 
oneHotEncoder_246deca3c0cb 
oneHotEncoder_631138749c83 
VectorAssembler: uid=vecAssembler_5dfd2d2c6bb9, handleInvalid=error, numInputCols=8 
minMaxScal_e1a93ba7a235 
VectorAssembler: uid=v

[36mxformPipeline[39m: [32mPipelineModel[39m = pipeline_09def6d18db0

### Investigate the fitted pipeline and its stages:

In [193]:
val pipelineStages = xformPipeline.parent.extractParamMap.toSeq(0).value.asInstanceOf[Array[PipelineStage]]

[36mpipelineStages[39m: [32mArray[39m[[32mPipelineStage[39m] = [33mArray[39m(
  strIdx_387de59ef340,
  strIdx_da034cc60575,
  strIdx_cd96b3a27cc3,
  strIdx_cdb9382316bb,
  strIdx_533d8e9f698e,
  strIdx_710fcc1c2f06,
  strIdx_583318018d1f,
  strIdx_bd235a3f2b37,
  strIdx_17810b26410a,
  strIdx_f9e59d5bb2d9,
  oneHotEncoder_23267482a5e1,
  oneHotEncoder_9a097096c385,
  oneHotEncoder_1743522806e8,
  oneHotEncoder_606c90442245,
  oneHotEncoder_86b7ed7edbcc,
  oneHotEncoder_7cff57b6ffd5,
  oneHotEncoder_7cbb64f05541,
  oneHotEncoder_246deca3c0cb,
  oneHotEncoder_631138749c83,
  VectorAssembler: uid=vecAssembler_5dfd2d2c6bb9, handleInvalid=error, numInputCols=8,
  minMaxScal_e1a93ba7a235,
  VectorAssembler: uid=vecAssembler_72339199a57d, handleInvalid=error, numInputCols=10
)

Print the parameters of all the stages:

In [207]:
var counter = 0
pipelineStages.map( x => {println(counter + ":" + x.getClass.toString + ": " + x.toString + ": " + x.extractParamMap); counter +=1;});

0:class org.apache.spark.ml.feature.StringIndexer: strIdx_387de59ef340: {
	strIdx_387de59ef340-handleInvalid: error,
	strIdx_387de59ef340-inputCol: y,
	strIdx_387de59ef340-outputCol: label,
	strIdx_387de59ef340-stringOrderType: frequencyDesc
}
1:class org.apache.spark.ml.feature.StringIndexer: strIdx_da034cc60575: {
	strIdx_da034cc60575-handleInvalid: error,
	strIdx_da034cc60575-inputCol: job,
	strIdx_da034cc60575-outputCol: idx_job,
	strIdx_da034cc60575-stringOrderType: frequencyDesc
}
2:class org.apache.spark.ml.feature.StringIndexer: strIdx_cd96b3a27cc3: {
	strIdx_cd96b3a27cc3-handleInvalid: error,
	strIdx_cd96b3a27cc3-inputCol: marital,
	strIdx_cd96b3a27cc3-outputCol: idx_marital,
	strIdx_cd96b3a27cc3-stringOrderType: frequencyDesc
}
3:class org.apache.spark.ml.feature.StringIndexer: strIdx_cdb9382316bb: {
	strIdx_cdb9382316bb-handleInvalid: error,
	strIdx_cdb9382316bb-inputCol: education,
	strIdx_cdb9382316bb-outputCol: idx_education,
	strIdx_cdb9382316bb-stringOrderType: frequenc

### Now, transform the data to form usable for model training and inference

In [147]:
val preparedDF: org.apache.spark.sql.DataFrame = xformPipeline.transform(inputDF)

[36mpreparedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 41 more fields]

In [148]:
preparedDF.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+--------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+------------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|idx_month_name|   vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome

## Select subset of features

Let us create an additional model which uses a selected subset of features and evaluate this model's performance vs. the one with all the features.

In [229]:
val best_subset_categorical = Array(
    "campaign", "month_name", "job", "day_of_week", "education", "marital"
)

val best_subset_numerical = Array(
    "duration", "pdays", "euribor3m", "cons_price_idx", "age"
)

[36mbest_subset_categorical[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"campaign"[39m,
  [32m"month_name"[39m,
  [32m"job"[39m,
  [32m"day_of_week"[39m,
  [32m"education"[39m,
  [32m"marital"[39m
)
[36mbest_subset_numerical[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"duration"[39m,
  [32m"pdays"[39m,
  [32m"euribor3m"[39m,
  [32m"cons_price_idx"[39m,
  [32m"age"[39m
)

Fit a new transformation pipeline with subset of features.

In [230]:
val xformPipeline2 = firTransformDataPipeline(inputDF, originalLabelColname, best_subset_categorical, best_subset_numerical)

23/01/30 15:48:32 INFO SparkMLDemo: Indexing categorical variables.
23/01/30 15:48:32 INFO SparkMLDemo: On-hot encoding all categorical variables.
23/01/30 15:48:32 INFO SparkMLDemo: Assembled together all numerical variables.
23/01/30 15:48:32 INFO SparkMLDemo: Scaled all numerical variables by min-max scaler.
23/01/30 15:48:32 INFO SparkMLDemo: Assembling pipeline with the following transformations: 
strIdx_137ba1783896 
strIdx_b09b13da7231 
strIdx_cfb343f416ed 
strIdx_7bb6f881461d 
strIdx_8ecf1d33f435 
strIdx_06062ff464e4 
strIdx_68c715321a64 
oneHotEncoder_70bc9bd017bc 
oneHotEncoder_2794034dd822 
oneHotEncoder_f07f9736b2e7 
oneHotEncoder_3ef22556dc1c 
oneHotEncoder_eff9887e73d9 
oneHotEncoder_3ab2df8bc876 
VectorAssembler: uid=vecAssembler_60128f25a3ca, handleInvalid=error, numInputCols=5 
minMaxScal_e7ce508fa355 
VectorAssembler: uid=vecAssembler_5566e14c9193, handleInvalid=error, numInputCols=7
23/01/30 15:48:35 INFO SparkMLDemo: Completed fitting the pipeline


[36mxformPipeline2[39m: [32mPipelineModel[39m = pipeline_c49d59177ceb

In [231]:
val preparedDF2 = xformPipeline2.transform(inputDF)

[36mpreparedDF2[39m: [32mDataFrame[39m = [age: double, job: string ... 35 more fields]

In [232]:
preparedDF2.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+------------+--------------+-------+---------------+-------------+-----------+----------------+------------------+--------------+-------------------+-----------------+---------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_campaign|idx_month_name|idx_job|idx_day_of_week|idx_education|idx_marital|vec_idx_campaign|vec_idx_month_name|   vec_idx_job|vec_idx_day_of_week|vec_idx_education|vec_idx_marital|   numericalfeatures|      scaledfeatures|            features|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-

## Split data into Test-Train sets

Randomly select records to split the prepared data into train-test datasets.

Here a ratio of 90% training and 10% testing has been specified.

In [241]:
val Array(trainingDF, testDF) = preparedDF.randomSplit(Array(0.9, 0.1))

[36mtrainingDF[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]
[36mtestDF[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]

In [242]:
// At this point, these two datasets may be "cached" for improving Spark performance:
trainingDF.cache()
testDF.cache()

[36mres241_0[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]
[36mres241_1[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]

In [243]:
trainingDF.show(5)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+---------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+------------------+--------------------+--------------------+--------------------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|idx_month_name|    vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome|v

Repeat test-train split for best subset data as well:

In [233]:
val Array(trainingDF2, testDF2) = preparedDF2.randomSplit(Array(0.9, 0.1))

[36mtrainingDF2[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]
[36mtestDF2[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]

In [234]:
trainingDF2.cache()
testDF2.cache()

[36mres233_0[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]
[36mres233_1[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]

In [235]:
trainingDF2.show(5)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+------------+--------------+-------+---------------+-------------+-----------+----------------+------------------+---------------+-------------------+-----------------+---------------+--------------------+--------------------+--------------------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_campaign|idx_month_name|idx_job|idx_day_of_week|idx_education|idx_marital|vec_idx_campaign|vec_idx_month_name|    vec_idx_job|vec_idx_day_of_week|vec_idx_education|vec_idx_marital|   numericalfeatures|      scaledfeatures|            features|
+----+-------+-------+-----------+---------+-------+----+----------+----------+-----

---
## Train a Logistic Regression Model

Use the Spark Mlib libraries to train different machine learning models on this dataset.

In [236]:
import org.apache.spark.ml.classification.{GBTClassifier, LogisticRegression, LogisticRegressionModel, RandomForestClassifier}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit, TrainValidationSplitModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

[32mimport [39m[36morg.apache.spark.ml.classification.{GBTClassifier, LogisticRegression, LogisticRegressionModel, RandomForestClassifier}
[39m
[32mimport [39m[36morg.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit, TrainValidationSplitModel}
[39m
[32mimport [39m[36morg.apache.spark.ml.evaluation.BinaryClassificationEvaluator[39m

In [237]:
val lr = new LogisticRegression()

[36mlr[39m: [32mLogisticRegression[39m = logreg_c3d52860028e

Set parameters using setter methods.

In [238]:
lr.setMaxIter(100)
    .setFamily("binomial")
    .setFitIntercept(true)
    .setThreshold(0.35)
    .setLabelCol(labelColname);

[36mres237[39m: [32mLogisticRegression[39m = logreg_c3d52860028e

In [239]:
println(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial. (default: auto, current: binomial)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true, current: true)
labelCol: label column name (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. (undefined)
maxBlockSizeInMB: Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a pa

### List all parameters of the model:

In [240]:
lr.extractParamMap().toSeq.foreach(
    x => println(
        "Parameter %s = %s".format(
            x.param.toString substring (1 + x.param.toString indexOf "__" ),
            x.value)
    )
)

Parameter _threshold = 0.35
Parameter _elasticNetParam = 0.0
Parameter _predictionCol = prediction
Parameter _tol = 1.0E-6
Parameter _labelCol = label
Parameter _maxIter = 100
Parameter _featuresCol = features
Parameter _aggregationDepth = 2
Parameter _regParam = 0.0
Parameter _family = binomial
Parameter _rawPredictionCol = rawPrediction
Parameter _maxBlockSizeInMB = 0.0
Parameter _probabilityCol = probability
Parameter _fitIntercept = true
Parameter _standardization = true


In [244]:
// Now Learn a LogisticRegression model. This uses the parameters stored in lr.
val lrModel1 = lr.fit(trainingDF);

23/01/30 15:54:15 INFO LBFGS: Step Size: 3.574
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.316549 (rel: 0.104) 0.308898
23/01/30 15:54:15 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.245978 (rel: 0.223) 0.0965974
23/01/30 15:54:15 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.232709 (rel: 0.0539) 0.0500178
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.222595 (rel: 0.0435) 0.0367947
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.214510 (rel: 0.0363) 0.0243518
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.211601 (rel: 0.0136) 0.0194000
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.210507 (rel: 0.00517) 0.00815788
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.

[36mlrModel1[39m: [32mLogisticRegressionModel[39m = LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=50

### Understand the fitted model

Extract the parameters and performance metrics of the fitted model.

In [245]:
print("Prediction labels: ")
lrModel1.summary.labels.foreach( x => print(" " + x))
print("\nTrue Positive Rate By Label: ")
lrModel1.summary.truePositiveRateByLabel.foreach( x => print(" " + x))
print("\nRecall By Label: ")
lrModel1.summary.recallByLabel.foreach( x => print(" " + x))
print("\nPrecision By Label: ")
lrModel1.summary.precisionByLabel.foreach( x => print(" " + x))
print("\nFalse Positive Rate By Label: ")
lrModel1.summary.falsePositiveRateByLabel.foreach( x => print(" " + x))
print("\nF-Measure By Label: ")
lrModel1.summary.fMeasureByLabel.foreach( x => print(" " + x))
println("\nAccuracy: " + lrModel1.summary.accuracy)
println("Total no of Iterations: " + lrModel1.summary.totalIterations)
// labelCol // label

Prediction labels:  0.0 1.0
True Positive Rate By Label:  0.9540030441400305 0.5675482487491065
Recall By Label:  0.9540030441400305 0.5675482487491065
Precision By Label:  0.9452554744525548 0.611867454405343
False Positive Rate By Label:  0.43245175125089347 0.04599695585996956
F-Measure By Label:  0.9496091145991152 0.5888751545117429
Accuracy: 0.910222150241585
Total no of Iterations: 100


### Wrap these commands into a convenience function:

In [246]:
/**
Extract model fit performance from a binary classifiction model
*/
def getModelFitSummary(sc:SparkContext, fittedModel:LogisticRegressionModel):DataFrame = {
    if( fittedModel.numClasses == 2){
        val summaryDF = sc.parallelize(
            Array(
              ("Prediction labels", fittedModel.summary.labels(0), fittedModel.summary.labels(1) )
            , ("True Positive Rate", fittedModel.summary.truePositiveRateByLabel(0), fittedModel.summary.truePositiveRateByLabel(1) )
            , ("Recall", fittedModel.summary.recallByLabel(0), fittedModel.summary.recallByLabel(1) )
            , ("Precision", fittedModel.summary.precisionByLabel(0), fittedModel.summary.precisionByLabel(1) )
            , ("False Positive Rate", fittedModel.summary.falsePositiveRateByLabel(0), fittedModel.summary.falsePositiveRateByLabel(1) )
            , ("F-measure", fittedModel.summary.fMeasureByLabel(0), fittedModel.summary.fMeasureByLabel(1) )
            , ("Total Accuracy", 0.0, fittedModel.summary.accuracy )
            , ("Area Under ROC", 0.0, fittedModel.binarySummary.areaUnderROC)
            )
            ).toDF(Array("Metric", "Class_0", "Class_1"): _*)

        return summaryDF
    }
    return null
}

defined [32mfunction[39m [36mgetModelFitSummary[39m

In [247]:
val lrmodel1_summary = getModelFitSummary(spark.sparkContext, lrModel1)

lrmodel1_summary.show(10)

+-------------------+-------------------+-------------------+
|             Metric|            Class_0|            Class_1|
+-------------------+-------------------+-------------------+
|  Prediction labels|                0.0|                1.0|
| True Positive Rate| 0.9540030441400305| 0.5675482487491065|
|             Recall| 0.9540030441400305| 0.5675482487491065|
|          Precision| 0.9452554744525548|  0.611867454405343|
|False Positive Rate|0.43245175125089347|0.04599695585996956|
|          F-measure| 0.9496091145991152| 0.5888751545117429|
|     Total Accuracy|                0.0|  0.910222150241585|
|     Area Under ROC|                0.0| 0.9366489871543373|
+-------------------+-------------------+-------------------+



[36mlrmodel1_summary[39m: [32mDataFrame[39m = [Metric: string, Class_0: double ... 1 more field]

In [248]:
lrModel1.coefficients

[36mres247[39m: [32mlinalg[39m.[32mVector[39m = [0.08929699811858023,-0.12991105334224565,0.10660142417505587,0.0035108312116281085,0.05015596724370776,0.4334691081349534,-0.11275155612992703,-0.08362680602814758,0.0022494988198624158,0.1832846920962329,0.2533553293269838,0.3790928746135225,0.4567786711449594,0.32985034825169035,-0.2674248674989839,-0.4256658042471482,-0.49222255172482904,-0.3990814004052108,-0.47235513585234873,-0.3995417988274074,-0.2903718016139603,0.1902187433847383,-0.12245630399105265,0.02796183082914988,0.04008613501539039,0.08763729127949141,0.032406713072771494,0.08293763583846984,-0.10119501477903015,0.20084202716608857,0.11343769207386381,-0.5396409612701998,-1.0112351384318294,-0.8447128285517715,0.10421454370594475,0.6135613168062243,-0.6207939396904261,-0.5378100985337787,-0.25508736139086624,0.045457782736203954,0.05287103483647745,1.5511249151553137,0.07652248872014075,23.072170029479025,-0.8945340833066275,-6.686053732452271,3.819435183913021,0.0

In [249]:
println(lrModel1.extractParamMap())

{
	logreg_c3d52860028e-aggregationDepth: 2,
	logreg_c3d52860028e-elasticNetParam: 0.0,
	logreg_c3d52860028e-family: binomial,
	logreg_c3d52860028e-featuresCol: features,
	logreg_c3d52860028e-fitIntercept: true,
	logreg_c3d52860028e-labelCol: label,
	logreg_c3d52860028e-maxBlockSizeInMB: 0.0,
	logreg_c3d52860028e-maxIter: 100,
	logreg_c3d52860028e-predictionCol: prediction,
	logreg_c3d52860028e-probabilityCol: probability,
	logreg_c3d52860028e-rawPredictionCol: rawPrediction,
	logreg_c3d52860028e-regParam: 0.0,
	logreg_c3d52860028e-standardization: true,
	logreg_c3d52860028e-threshold: 0.35,
	logreg_c3d52860028e-tol: 1.0E-6
}


In [250]:
lrModel1.numFeatures

[36mres249[39m: [32mInt[39m = [32m50[39m

In [251]:
lrModel1.numClasses

[36mres250[39m: [32mInt[39m = [32m2[39m

### Hyperparameter Tuning

We can use grid-search to find the best set of parameters for this model.

In [255]:
// We use a ParamGridBuilder to construct a grid of hyper-parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGridLR = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.0025, 0.005))
  .addGrid(lr.elasticNetParam, Array(0.0075, 0.01))
  .build()

[36mparamGridLR[39m: [32mArray[39m[[32mml[39m.[32mparam[39m.[32mParamMap[39m] = [33mArray[39m(
  {
	logreg_c3d52860028e-elasticNetParam: 0.0075,
	logreg_c3d52860028e-regParam: 0.0025
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-regParam: 0.0025
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.0075,
	logreg_c3d52860028e-regParam: 0.005
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-regParam: 0.005
}
)

Define a performance metric to be used by the grid-search to identify the best performing model.

Here, we define the performance metric to be used as the "Area under the Precision-recall curve".

In [256]:
val binaryEvaluator = new BinaryClassificationEvaluator()
  .setLabelCol(labelColname)
  .setMetricName("areaUnderPR");

[36mbinaryEvaluator[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_a664d6d37f22, metricName=areaUnderPR, numBins=1000

In [257]:
val xfoldValidator = new CrossValidator()
  .setEstimator(lr)
  .setNumFolds(10)
  .setEvaluator(binaryEvaluator)
  .setEstimatorParamMaps(paramGridLR)
  .setCollectSubModels(false)
  .setParallelism(2)

[36mxfoldValidator[39m: [32mCrossValidator[39m = cv_310b21052e7d

In [258]:
// Run train validation split, and choose the best set of parameters.
logger.info("Started training Logistic Regression model via x-fold cross validation")
val cvmodel1 = xfoldValidator.fit(trainingDF2)
logger.info("Finished training Logistic Regression model")

23/01/30 15:57:03 INFO OWLQN: Step Size: 0.9854
23/01/30 15:57:03 INFO OWLQN: Val and Grad Norm: 0.307416 (rel: 0.126) 0.176688
23/01/30 15:57:03 INFO OWLQN: Step Size: 0.9854
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.307426 (rel: 0.126) 0.176672
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.240222 (rel: 0.219) 0.0721266
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.240240 (rel: 0.219) 0.0721294
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.229298 (rel: 0.0455) 0.0377960
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.229317 (rel: 0.0455) 0.0377921
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.221291 (rel: 0.0349) 0.0268653
23/01/30 15:57:05 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:05 INFO OWLQN: Val and Grad Norm: 0.221

[36mcvmodel1[39m: [32mCrossValidatorModel[39m = CrossValidatorModel: uid=cv_310b21052e7d, bestModel=LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=80, numFolds=10

In [259]:
val lrModel2 = cvmodel1.bestModel

[36mlrModel2[39m: [32mModel[39m[[32m_[39m] = LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=80

In [260]:
// we can view the hyper-parameters for the best model selected by grid-search.
// This prints the parameter (name: value) pairs, where names are unique IDs for this instance.

println(s"---Cross-fold validated Logistic Regression Model was fit using parameters:---${lrModel2.extractParamMap}")

---Cross-fold validated Logistic Regression Model was fit using parameters:---{
	logreg_c3d52860028e-aggregationDepth: 2,
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-family: binomial,
	logreg_c3d52860028e-featuresCol: features,
	logreg_c3d52860028e-fitIntercept: true,
	logreg_c3d52860028e-labelCol: label,
	logreg_c3d52860028e-maxBlockSizeInMB: 0.0,
	logreg_c3d52860028e-maxIter: 100,
	logreg_c3d52860028e-predictionCol: prediction,
	logreg_c3d52860028e-probabilityCol: probability,
	logreg_c3d52860028e-rawPredictionCol: rawPrediction,
	logreg_c3d52860028e-regParam: 0.0025,
	logreg_c3d52860028e-standardization: true,
	logreg_c3d52860028e-threshold: 0.35,
	logreg_c3d52860028e-tol: 1.0E-6
}


In [261]:
val lrmodel2_summary = getModelFitSummary(spark.sparkContext, lrModel2.asInstanceOf[LogisticRegressionModel])

lrmodel2_summary.show(10)

+-------------------+------------------+-------------------+
|             Metric|           Class_0|            Class_1|
+-------------------+------------------+-------------------+
|  Prediction labels|               0.0|                1.0|
| True Positive Rate| 0.957126830599436| 0.5419262098706277|
|             Recall| 0.957126830599436| 0.5419262098706277|
|          Precision|0.9428895725678784| 0.6153427638737758|
|False Positive Rate|0.4580737901293723|0.04287316940056396|
|          F-measure|0.9499548600662052| 0.5763057324840765|
|     Total Accuracy|               0.0| 0.9104831112905396|
|     Area Under ROC|               0.0| 0.9326249601072021|
+-------------------+------------------+-------------------+



[36mlrmodel2_summary[39m: [32mDataFrame[39m = [Metric: string, Class_0: double ... 1 more field]

In [262]:
lrmodel1_summary.createOrReplaceTempView("m1perf");
lrmodel2_summary.createOrReplaceTempView("m2perf");

spark.sql(
    """SELECT m1.Metric,
    m1.Class_0 as Model1_Class0, m1.Class_1 as Model1_Class1,
    m2.Class_0 as Model2_Class0, m2.Class_1 as Model2_Class1
    from m1perf m1
    inner join m2perf m2 on m2.Metric=m1.Metric"""
    ).show();

+-------------------+-------------------+-------------------+------------------+-------------------+
|             Metric|      Model1_Class0|      Model1_Class1|     Model2_Class0|      Model2_Class1|
+-------------------+-------------------+-------------------+------------------+-------------------+
|     Area Under ROC|                0.0| 0.9366489871543373|               0.0| 0.9326249601072021|
|          F-measure| 0.9496091145991152| 0.5888751545117429|0.9499548600662052| 0.5763057324840765|
|False Positive Rate|0.43245175125089347|0.04599695585996956|0.4580737901293723|0.04287316940056396|
|          Precision| 0.9452554744525548|  0.611867454405343|0.9428895725678784| 0.6153427638737758|
|  Prediction labels|                0.0|                1.0|               0.0|                1.0|
|             Recall| 0.9540030441400305| 0.5675482487491065| 0.957126830599436| 0.5419262098706277|
|     Total Accuracy|                0.0|  0.910222150241585|               0.0| 0.91048311

---
## Evaluate model performance on test set

In [263]:
// Define a convenience function to calculate log-loss:
def logScoringMetric(inputDF: DataFrame, predictProb:String="p1", labelCol:String = "label"): Double = {
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, udf, _}
    import spark.implicits._

    val testResultsLoglossDF = inputDF.withColumn(
      "loglossT1",
      col("label") * org.apache.spark.sql.functions.log(col("p1")) * -1.0
    ).withColumn(
      colName = "loglossT2",
      org.apache.spark.sql.functions.expr("1 - p1")
    ).withColumn(
      "logloss",
      col("loglossT1") + org.apache.spark.sql.functions.log(col("loglossT2")) * expr("1 - label")
    ).drop(colNames = "loglossT1", "loglossT2")

    val logloss: Any = testResultsLoglossDF.select(avg("logloss")).collect()(0)(0);

    return logloss.asInstanceOf[Double]
}

defined [32mfunction[39m [36mlogScoringMetric[39m

In [264]:
// define a convenience function to split probability vector column into classs 1 probability column
def addBinaryProbabilities(inputDF: org.apache.spark.sql.DataFrame, probCol:String = "probability"): DataFrame = {

    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, udf, _}
    import spark.implicits._

    // Breakup vector field "probability" into prob of class "1":
    // Create a UDF to convert VectorUDT to ArrayType
    val vecToArray = udf((xs: linalg.Vector) => xs.toArray)
    
    // Add a ArrayType Column: PredictProbabArr
    val dfProbArr = inputDF.withColumn("PredictProbabArr", vecToArray($"probability"))
    
    // Array of element names that need to be fetched:
    val elements = Array("p0", "p1")
    
    // Create a SQL-like expression using the array
    val sqlExpr = elements.zipWithIndex.map { case (alias, idx) => col("PredictProbabArr").getItem(idx).as(alias) }
    
    //add the columns to the dataframe
    val testResultWithProbsDF = dfProbArr.select((col("*") +: sqlExpr): _*)
      .drop(colNames = "PredictProbabArr", "p0")

    return testResultWithProbsDF
}

defined [32mfunction[39m [36maddBinaryProbabilities[39m

In [266]:
// use specific column name to store class1 probabilities
val class1ProbColName = "p1"

[36mclass1ProbColName[39m: [32mString[39m = [32m"p1"[39m

In [271]:
val binEvalPR = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderPR")
  .setLabelCol(labelColname)

[36mbinEvalPR[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_81eac8a5b98c, metricName=areaUnderPR, numBins=1000

In [272]:
val binEvalROC = new BinaryClassificationEvaluator().setLabelCol(labelColname)

[36mbinEvalROC[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_2a47427bae4b, metricName=areaUnderROC, numBins=1000

### Evaluate Logistic Regression Models

In [275]:
// Make predictions on test data using the Transformer.transform() method.
// Note: model.transform will only use the 'features' column.
println("Generating inferences on test set for -> Logistic Regression models:")

val testResultLR1 = lrModel1.transform(testDF);
val testResultLR2 = lrModel2.transform(testDF2);

Generating inferences on test set for -> Logistic Regression models:


[36mtestResultLR1[39m: [32mDataFrame[39m = [age: double, job: string ... 44 more fields]
[36mtestResultLR2[39m: [32mDataFrame[39m = [age: double, job: string ... 38 more fields]

In [273]:
println("Logistic Regression Model 1: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultLR1).doubleValue());

println("Logistic Regression Model 1: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultLR1).doubleValue());

Logistic Regression Model 1: For test set, Area under Precision-Recall curve is: 0.5875903105939474
Logistic Regression Model 1: For test set, Area under ROC curve is: 0.9295516336693499


In [274]:
println("Logistic Regression Model 2: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultLR2).doubleValue());

println("Logistic Regression Model 2: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultLR2).doubleValue());

Logistic Regression Model 2: For test set, Area under Precision-Recall curve is: 0.5851839562542677
Logistic Regression Model 2: For test set, Area under ROC curve is: 0.9287183059783829


## Persisting Models to Storage

All models and pipelines can be saved to disk.

The default data format for saving to disk is Parquet which also compresses the data structure using SNAPPY compression.

In [278]:
println("Now writing fitted LR model 1 to disk at: /tmp/lrmodel1")
lrModel1.write.overwrite().save("/tmp/lrmodel1")

Now writing fitted LR model 1 to disk at: /tmp/lrmodel1


23/01/30 01:21:31 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:32 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:32 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:32 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:32 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


In [296]:
// first, remove any vector columns since they cannot be written to a csv file:
var droppedTestDF = testResultLR1.drop("features", "numericalfeatures", "scaledfeatures",
      "rawPrediction", "probability", "idx_job", "idx_marital", "idx_education", "idx_defaulted",
      "idx_housing", "idx_loan", "idx_day_of_week", "idx_poutcome", "vec_idx_job", "vec_idx_marital",
      "vec_idx_education", "vec_idx_defaulted", "vec_idx_housing", "vec_idx_loan", "vec_idx_day_of_week",
      "vec_idx_poutcome")

droppedTestDF.show(4)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+--------------+------------------+----------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_month_name|vec_idx_month_name|prediction|
+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+--------------+------------------+----------+
|18.0|student| single|   basic.4y|       no|    yes| yes|  cellular|       apr|        thu|   184.0|     2.0|999.0|     0.0|nonexistent|        -1.8|        93.075|        -47.1|    1.365|     5099.1| no|  0.0|           5.0|     (9

In [281]:
droppedTestDF.coalesce(numPartitions = 1)
      .write.option("header", value = true)
      .mode(saveMode = "overwrite")
      .csv("/tmp/lr1_predictions.csv");

23/01/30 01:21:42 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:42 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


In [109]:
spark.stop()

In [110]:
sc.stop()