# Apache Spark - Hands on Tutorial

Topics Covered:

  1. Scala Language Basics
  1. Intermediate Scala
  1. Creating a Spark Session
  1. Loading data
  1. Exploring data
  1. Viewing Jobs and performance using a simple operation
  1. Transforming data
  1. Training ML models
  1. Examining trained models
  1. Evaluating model performance
  1. Saving Data

## About the Apache Spark Platform

Apache Spark is a unified analytics engine for large-scale data processing.

It provides:
  1. High-level APIs in Java, Scala, Python and R, and an optimized engine.
  1. Supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing
  1. MLlib for machine learning
  1. GraphX for graph processing
  1. Structured Streaming for incremental computation and stream processing


## Components of an Apache Spark Cluster

Spark applications run as independent sets of processes on a cluster.

At a high level, the mechanism is as follows:
  1. An application is coordinated by the SparkContext object in your main program (called the driver program).
  1. Specifically, to run on a cluster, the SparkContext connects to a cluster manager (either Spark’s own standalone cluster manager, Mesos, YARN or Kubernetes), which allocates resources across all applications.
  1. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
  1. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.
  1. Finally, SparkContext sends tasks to the executors to run.


![Image](https://spark.apache.org/docs/latest/img/cluster-overview.png)

## Important concepts for running Spark Applications on the cluster

  1. Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads.
  1. This isolates applications from each other.
  1. It also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
  1. Spark is agnostic to the underlying cluster manager.
  1. The driver program must listen for and accept incoming connections from its executors throughout its lifetime.
  1. CP4D provides an API for submitting jobs to the Spark cluster, these inputs are taken by the "Analytics engine for Apache Spark" service in CP4D and used to launch a driver which runs the application on the cluster.


## Important Terms:

  1. **Application**: User program built on Spark. Consists of a driver program and executors on the cluster.
  1. **Application jar**: A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
  1. **Driver program**: The process running the main() function of the application and creating the SparkContext
  1. **Cluster manager**: An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN, Kubernetes)
  1. **Deploy mode**: Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
  1. **Worker node**: Any node that can run application code in the cluster
  1. **Executor**: A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
  1. **Task**: A unit of work that will be sent to one executor
  1. **Job**: A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you'll see this term used in the driver's logs.
  1. **Stage**: Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.


## Spark Scheduling:

Spark has several facilities for scheduling resources between computations:
  1. Each Spark application (instance of SparkContext) runs an independent set of executor processes. The cluster managers that Spark runs on provide facilities for scheduling across applications.
  1. Within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads. Spark includes a fair scheduler to schedule resources within each SparkContext.
  
### Scheduling Across Applications

When running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application.

If multiple users need to share your cluster, there are different options to manage allocation, depending on the cluster manager:

  1. **Static partitioning of resources**: Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Applications submitted to the standalone mode cluster will run in FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit the resources an application uses by setting the - _spark.cores.max_, or _spark.deploy.defaultCores_, or _spark.executor.memory_ configuration properties.
  1. **Dynamic Allocation of resources**: This is supported by clusters such as Kubernetes (k8s) where fine grained resource allocation and scheduling are available. Such cluster managers provide a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand.

## Example of Kubernetes cluster manager for Spark

![Image](https://spark.apache.org/docs/latest/img/k8s-cluster-mode.png)

---

## Spark Monitoring

Every SparkContext launches a Web UI, by default on port 4040, that displays useful information about the application. This includes:

  1. A list of scheduler stages and tasks
  1. A summary of RDD sizes and memory usage
  1. Environmental information.
  1. Information about the running executors

This interface is accessible from http://<driver-node>:4040 in a web browser when running spark in standalone mode.

Note: this information is only available for the duration of the application. To view the web UI after the applicaiton has finished, configure Spark to log Spark events to persisted storage.

### Spark History Server
It is still possible to construct the UI of an application through Spark’s history server, provided that the application’s event logs exist.
    
### Advanced Instrumentation
Several external tools can be used to help profile the performance of Spark jobs:

  1. Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
  1. OS profiling tools such as dstat, iostat, and iotop can provide fine-grained profiling on individual nodes.
  1. JVM utilities such as jstack for providing stack traces, jmap for creating heap-dumps, jstat for reporting time-series statistics and jconsole for visually exploring various JVM properties are useful for those comfortable with JVM internals.


---
# 1. Brief introduciton to the Scala Language 

## Basic Scala

This notebook gives a very brief introduciton to scala. For a more in-depth introduction, visit the scala language website at https://docs.scala-lang.org/ for extensive documentation suitable for all levels.

Some of the scala language examples have been taken and modified from https://rockthejvm.com/p/scala-at-light-speed by Daniel Ciocîrlan. Do check out the tutorials offered by Daniel - I found them to be very useful.

The following topics are covered lightly:
  - Variables
  - Mutable vs. Immutable variables
  - Expressions
  - Conditional expressions
  - Code blocks
  - Functions
  - Argument types and return types

Let us define a variable:

In [1]:
val aNumber: Int = 42 // const int aNumber = 42;

[36maNumber[39m: [32mInt[39m = [32m42[39m

In Scala, by default, all variables are declared as IMMUTABLE values/objects declared by **'val'**

The value of such variables cannot be changed once declared.

So, this statement will not work:

In [2]:
aNumber = 5 + 6

Unknown Error: <console>:24: error: reassignment to val
       aNumber = 5 + 6
               ^


Any modification to an object must return ANOTHER object

Next, we'll declare a 'var' which is **mutable**, which means it's value can be changed:

In [2]:
var newNumber = 5

In [3]:
newNumber = 5 + 6
println(newNumber)

11


Some of the basic data types in scala are: Int, Boolean, Char, Double, Float, String.

Note that specifying a type is optional when declaring a variable or expresion, but it is a best practice and is always recommended.

In [1]:
val aBoolean: Boolean = false

[36maBoolean[39m: [32mBoolean[39m = [32mfalse[39m

Strings and basic string operations:

In [5]:
val aString = "I program in Scala"
val aComposedString = "I" + " " + "program" + " " + "in" + " " + "Scala"

[36maString[39m: [32mString[39m = [32m"I program in Scala"[39m
[36maComposedString[39m: [32mString[39m = [32m"I program in Scala"[39m

When printing or logging messages during a program's execution, its often required to format variables and string formatting is useful in such situations:

In [6]:
val anInterpolatedString = s"The value of the variable is: $aNumber"

[36manInterpolatedString[39m: [32mString[39m = [32m"The value of the variable is: 42"[39m

In [7]:
"Format an integer with this expression - %d, format a string with this expression - %s".format(11, "Another String")

[36mres6[39m: [32mString[39m = [32m"Format an integer with this expression - 11, format a string with this expression - Another String"[39m

Expressions are code that can be reduced to a value:

In [8]:
val anExpression = 2 + 40

[36manExpression[39m: [32mInt[39m = [32m42[39m

Single line if-else expression:

In [9]:
val ifExpression = if (aNumber > 50) 100 else 0

[36mifExpression[39m: [32mInt[39m = [32m0[39m

In [10]:
val chainedIfExpression =
    if (aNumber > 43) 56
    else if (aNumber < 0) -2
    else if (aNumber > 999) 78
    else 0

[36mchainedIfExpression[39m: [32mInt[39m = [32m0[39m

A code block is wrapped up within curly braces - {}

Multiple statements can be included in such a code block.

The value returned by the last statement is returned by the code block when executing.

In [11]:
val aCodeBlock = {
    // definitions
    val aLocalValue = 67

    // value of block is the value of the last expression
    aLocalValue + 3
}
// Here, 70 is returned sic eit is the last statement.

[36maCodeBlock[39m: [32mInt[39m = [32m70[39m

A function is defined by assigning a code block or expression to the function signature, see below:

In [12]:
def myFunction(x: Int, y: String): String = { // define argument types, also define return type
    y + " " + x
}

defined [32mfunction[39m [36mmyFunction[39m

In [13]:
myFunction(42, "the number is")

[36mres12[39m: [32mString[39m = [32m"the number is 42"[39m

Here is an example of a recursive function - to find a factorial of a given number.

In [2]:
def factorial(n: Int): Int =
    if (n <= 1) 1
    else n * factorial(n - 1)

/*
factorial(5) = 5 * factorial(4) = 5 * 24 = 120
factorial(4) = 4 * factorial(3) = 4 * 6
factorial(3) = 3 * factorial(2) = 3 * 2
factorial(2) = 2 * factorial(1) = 2 * 1
factorial(1) = 1
*/

defined [32mfunction[39m [36mfactorial[39m

In [3]:
factorial(1)

[36mres2[39m: [32mInt[39m = [32m1[39m

In [4]:
factorial(2)

[36mres3[39m: [32mInt[39m = [32m2[39m

In [5]:
factorial(3)

[36mres4[39m: [32mInt[39m = [32m6[39m

In [6]:
factorial(4)

[36mres5[39m: [32mInt[39m = [32m24[39m

The Unit type implies no meaningful value is returned by the function.
It is equivalent to "void" in other languages

In [7]:
def myUnitReturningFunction(): Unit = {
    println("I don't return Unit")
}

defined [32mfunction[39m [36mmyUnitReturningFunction[39m

In [8]:
myUnitReturningFunction

I don't return Unit


In [9]:
val theUnit = ()

In [10]:
theUnit

---
## 2. Intermediate Scala

  - Object Oriented language
  - Class vs. Object
  - Traits
  - Case classes
  - Serialisation
  - Collections

In [22]:
// class and instance
class Animal {
    // define fields
    val age: Int = 0
    // define methods
    def eat() = println("I'm eating")
}

defined [32mclass[39m [36mAnimal[39m

In [23]:
val anAnimal = new Animal

[36manAnimal[39m: [32mAnimal[39m = ammonite.$sess.cmd21$Helper$Animal@357ac956

In [24]:
// inheritance
class Dog(val name: String) extends Animal // constructor definition
val aDog = new Dog("Tommy")

defined [32mclass[39m [36mDog[39m
[36maDog[39m: [32mDog[39m = ammonite.$sess.cmd23$Helper$Dog@191bfaaa

In [25]:
// constructor arguments are NOT fields: need to put a val before the constructor argument
aDog.name

[36mres24[39m: [32mString[39m = [32m"Tommy"[39m

In [26]:
// subtype polymorphism
val aDeclaredAnimal: Animal = new Dog("Hachi")
aDeclaredAnimal.eat() // the most derived method will be called at runtime

I'm eating


[36maDeclaredAnimal[39m: [32mAnimal[39m = ammonite.$sess.cmd23$Helper$Dog@58c6a14e

In [27]:
// abstract class
abstract class WalkingAnimal {
    val hasLegs = true // by default public, can restrict by adding protected or private
    protected val numOfFeet = 4
    def walk(): Unit
}

defined [32mclass[39m [36mWalkingAnimal[39m

A class can have only one inheritance, but have multiple traits.

In [28]:
// "interface": ultimate abstract type
trait Carnivore {
    def eat(animal: Animal): Unit
}

defined [32mtrait[39m [36mCarnivore[39m

In [29]:
trait Philosopher {
    def ?!(thought: String): Unit // valid method name
}

defined [32mtrait[39m [36mPhilosopher[39m

Here is an emaple of single-class inheritance, multi-trait "mixing"

In [30]:
class Crocodile extends Animal with Carnivore with Philosopher {
    override def eat(animal: Animal): Unit = println("I am eating you, animal!")

    override def ?!(thought: String): Unit = println(s"I was thinking: $thought")
}

defined [32mclass[39m [36mCrocodile[39m

In [31]:
val aCroc = new Crocodile

[36maCroc[39m: [32mCrocodile[39m = ammonite.$sess.cmd29$Helper$Crocodile@31f78889

In [32]:
aCroc.eat(aDog)

I am eating you, animal!


This is the infix notation = object method argument, it is only available for methods with ONE argument

In [33]:
aCroc eat aDog

I am eating you, animal!


Example of the use of a custom operator: **?!**

In [34]:
aCroc ?! "What if we could fly?"

I was thinking: What if we could fly?


Operators in Scala are actually methods

In [35]:
val basicMath = 1 + 2

[36mbasicMath[39m: [32mInt[39m = [32m3[39m

Equivalent expression showing that operator is a function:

In [36]:
val anotherBasicMath = 1.+(2)

[36manotherBasicMath[39m: [32mInt[39m = [32m3[39m

In [37]:
// Anonymous classes
val dinosaur = new Carnivore {
    override def eat(animal: Animal): Unit = println("I am a dinosaur so I can eat anything")
}

[36mdinosaur[39m: [32mAnyRef[39m with [32mCarnivore[39m = ammonite.$sess.cmd36$Helper$$anon$1@38c22f2f

In [38]:
dinosaur eat aDog

I am a dinosaur so I can eat anything


An object is a singleton.

In [39]:
object MySingleton { // the only instance of the MySingleton type
    val mySpecialValue = 53278
    def mySpecialMethod(): Int = 5327
    def apply(x: Int): Int = x + 1
}

defined [32mobject[39m [36mMySingleton[39m

A singleton cannot be instantiated:

In [39]:
val objInstance = new MySingleton

cmd39.sc:1: not found: type MySingleton
val objInstance = new MySingleton
                      ^Compilation Failed

: 

In [40]:
MySingleton.mySpecialMethod()

[36mres39[39m: [32mInt[39m = [32m5327[39m

In [41]:
MySingleton.apply(65)

[36mres40[39m: [32mInt[39m = [32m66[39m

In [42]:
MySingleton(65) // equivalent to MySingleton.apply(65)

[36mres41[39m: [32mInt[39m = [32m66[39m

### Companions - a companion object can be delcared with the same name as a class

Companions can access each other's private fields/methods

Singleton Animal and instances of Animal are different things, see below:

In [11]:
object Animal { // 

    val canLiveIndefinitely = false
}

defined [32mobject[39m [36mAnimal[39m

Objects will have "static" fields/methods

In [44]:
val animalsCanLiveForever = Animal.canLiveIndefinitely

[36manimalsCanLiveForever[39m: [32mBoolean[39m = [32mfalse[39m

**case classes**: these are lightweight data structures with some boilerplate.

They are useful for:
- sensible equals and hash code
- serialization
- companion with apply
- pattern matching


In [45]:
case class Person(name: String, age: Int)

defined [32mclass[39m [36mPerson[39m

They may be constructed without using the keyword **new**

In [47]:
val bob = Person("Bob", 54) // Person.apply("Bob", 54)

[36mbob[39m: [32mPerson[39m = [33mPerson[39m([32m"Bob"[39m, [32m54[39m)

### Example of exceptions

Use a try-catch-finally structure to handle exceptions. See this example:

In [12]:
try {

    // code that can throw an error
    val x: String = null
    x.length

} catch { // in Java: catch(Exception e) {...}

    case e: Exception => "some faulty error message"

} finally {
// execute some code no matter what

}

[36mres11[39m: [32mAny[39m = [32m"some faulty error message"[39m

### Generics

In [48]:
abstract class MyList[T] {
    def head: T
    def tail: MyList[T]
}

defined class MyList


In [49]:
// using a generic with a concrete type
val aList: List[Int] = List(1,2,3) // List.apply(1,2,3)

aList = List(1, 2, 3)


List(1, 2, 3)

In [50]:
val first = aList.head // int

first = 1


1

In [51]:
val rest = aList.tail

rest = List(2, 3)


List(2, 3)

In [52]:
val aStringList = List("hello", "Scala")

aStringList = List(hello, Scala)


List(hello, Scala)

In [53]:
val firstString = aStringList.head // string

firstString = hello


hello

In [54]:
val reversedList = aList.reverse // returns a NEW list

reversedList = List(3, 2, 1)


List(3, 2, 1)

---
## Scala Collections

Lets use some of the important collections in scala.

This is a vast topic, but in order to keep the focus of this tutorial on Apache Spark, it is only covered at a high level.

### Lists, Sequences, Sets and Vectors

In [55]:
val somelist = List('a', 'b', 'c', 'd', 'e')

somelist = List(a, b, c, d, e)


List(a, b, c, d, e)

In [56]:
somelist.map( x => println(x) )

a
b
c
d
e


List((), (), (), (), ())

In [57]:
val someSeq = Seq(1,2,3,4,5)

someSeq = List(1, 2, 3, 4, 5)


List(1, 2, 3, 4, 5)

In [58]:
val vector1 = Vector(10, 20, 30, 40, 50)

vector1 = Vector(10, 20, 30, 40, 50)


Vector(10, 20, 30, 40, 50)

In [59]:
val vector2 = vector1 :+ 60

vector2 = Vector(10, 20, 30, 40, 50, 60)


Vector(10, 20, 30, 40, 50, 60)

In [60]:
val vector3 = 0 +: vector2

vector3 = Vector(0, 10, 20, 30, 40, 50, 60)


Vector(0, 10, 20, 30, 40, 50, 60)

In [61]:
for (item <- vector3) println(item)

0
10
20
30
40
50
60


In [62]:
val someSet = scala.collection.mutable.Set[Int]()

someSet = Set()


Set()

In [63]:
someSet += 1
someSet += 2 += 3
someSet ++= Vector(4, 5)

Set(1, 5, 2, 3, 4)

In [64]:
for (item <- someSet) println(item)

1
5
2
3
4


In [65]:
someSet -= 1

Set(5, 2, 3, 4)

### Tuples

A tuple can be thought of as a record in a dataframe or a record from a database table.

In [66]:
val aTuple = ("Some name", 20, "Some City", "Some address", 400001)

aTuple = (Some name,20,Some City,Some address,400001)


(Some name,20,Some City,Some address,400001)

Access a tuple with this notation:

In [67]:
aTuple._1

Some name

In [68]:
aTuple._2

20

In [69]:
val planets =
  List(("Mercury", 57.9), ("Venus", 108.2), ("Earth", 149.6),
       ("Mars", 227.9), ("Jupiter", 778.3))

planets = List((Mercury,57.9), (Venus,108.2), (Earth,149.6), (Mars,227.9), (Jupiter,778.3))


List((Mercury,57.9), (Venus,108.2), (Earth,149.6), (Mars,227.9), (Jupiter,778.3))

Example of pattern matching on a list of tuples:

In [70]:
planets.foreach {
  case ("Earth", distance) =>
    println(s"*Our planet is $distance million kilometers from the sun*")
  case (planetname, distance) =>
    println(s"Planet $planetname is $distance million kilometers from the sun")
}

Planet Mercury is 57.9 million kilometers from the sun
Planet Venus is 108.2 million kilometers from the sun
*Our planet is 149.6 million kilometers from the sun*
Planet Mars is 227.9 million kilometers from the sun
Planet Jupiter is 778.3 million kilometers from the sun


### Maps

Map data structure stores data as key-value pairs. Keys need to be unique.

In [71]:
val states = collection.mutable.Map("DL" -> "Delhi")

states = Map(DL -> Delhi)


Map(DL -> Delhi)

In [72]:
states += ("MH" -> "Maharashtra");
states += ("TN" -> "Tamil Nadu");
states += ("KA" -> "Karnataka");

Map(TN -> Tamil Nadu, MH -> Maharashtra, KA -> Karnataka, DL -> Delhi)

Iterate over the map as follows:

In [73]:
for ((k,v) <- states) println(s"key: $k, value: $v")

key: TN, value: Tamil Nadu
key: MH, value: Maharashtra
key: KA, value: Karnataka
key: DL, value: Delhi


Another, more readable syntax is:

In [74]:
states.foreach {
    case(code, statename) => println(s"key: $code, value: $statename")
}

key: TN, value: Tamil Nadu
key: MH, value: Maharashtra
key: KA, value: Karnataka
key: DL, value: Delhi


---
## Libraries for Apache Spark

When running in a jupyter notebook, sometimes the required libraries may not exist in the classpath.

Load essential spark libraries from maven public repositories at runtime like this:

In [2]:
import $ivy.`org.apache.spark::spark-core:3.2.0`
import $ivy.`org.apache.spark::spark-mllib-local:3.2.0`
import $ivy.`org.apache.spark::spark-mllib:3.2.0`
import $ivy.`org.apache.spark::spark-graphx:3.2.0`
import $ivy.`org.apache.spark::spark-streaming:3.2.0`
import $ivy.`org.apache.spark::spark-tags:3.2.0`

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                          
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                                        
[39m
[32mimport [39m[36m$ivy.$                                   [39m

---

## Import Spark Libraries

In [3]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

[32mimport [39m[36morg.apache.spark.SparkContext
[39m
[32mimport [39m[36morg.apache.spark.SparkConf
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m

In [4]:
import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset

[32mimport [39m[36morg.apache.spark.ml.linalg.{Matrix, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.sql.Row
[39m
[32mimport [39m[36morg.apache.spark.sql.Dataset[39m

In [5]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.graphx._
// To make some of the examples work we will also need RDD
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD[39m

In [6]:
val appName = "SparkMLDemo"

[36mappName[39m: [32mString[39m = [32m"SparkMLDemo"[39m

## Setup the Logger

To control the volume of log messages, change the log4j configuraiton programatically like this:

In [7]:
import org.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

val logger: Logger = Logger.getLogger(appName)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
logger.setLevel(Level.INFO)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

[39m
[36mlogger[39m: [32mLogger[39m = org.apache.log4j.Logger@cbed23b

---
## Create Spark session

In [None]:
// close the spark session and spark context before starting a new one, if re-executing the notebook.
spark.stop()
sc.stop()

In [8]:
val sparkConf = new SparkConf()
             .setAppName(appName)
             //.setMaster("local[*]")
             //.setMaster("spark://localhost:7077")
             .setMaster("spark://sparkmaster320:7077")
             .set("spark.driver.extraClassPath", "/home/java_libs/db2jcc4.jar")
             .set("spark.executor.extraClassPath", "/home/java_libs/db2jcc4.jar")
             .set("spark.default.parallelism", "6")

[36msparkConf[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@7f9e6a3f

In [10]:
// Apply the config to start a spark session:
val spark = org.apache.spark.sql.SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

23/01/30 09:07:46 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3c6cd439

## Get information on Spark Session

Use spark context and config objects to get essential information.

In [11]:
val sc = spark.sparkContext

[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@956f293

In [12]:
println("Spark Master: %s, User: %s, Version: %s, Deployment mode: %s".format(
        sc.master, sc.sparkUser, sc.version, sc.deployMode
    ))

println("Default Partitions: %d, Scheduling Mode: %s".format(
         sc.defaultMinPartitions, sc.getSchedulingMode
    ))

Spark Master: spark://sparkmaster320:7077, User: notebooker, Version: 3.2.0, Deployment mode: client
Default Partitions: 2, Scheduling Mode: FIFO


In [13]:
val config = sc.getConf

for ((k,v) <- config.getAll) println(s"Configuration Parameter: $k=$v")

Configuration Parameter: spark.executor.extraClassPath=/home/java_libs/db2jcc4.jar
Configuration Parameter: spark.driver.port=34217
Configuration Parameter: spark.app.id=app-20230130033558-0001
Configuration Parameter: spark.driver.extraClassPath=/home/java_libs/db2jcc4.jar
Configuration Parameter: spark.app.startTime=1675049754134
Configuration Parameter: spark.default.parallelism=6
Configuration Parameter: spark.master=spark://sparkmaster320:7077
Configuration Parameter: spark.app.name=SparkMLDemo
Configuration Parameter: spark.executor.id=driver
Configuration Parameter: spark.driver.host=ba601551daf7
Configuration Parameter: spark.sql.warehouse.dir=file:/work/proj/SBI_DW/training/spark-warehouse


[36mconfig[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@6258ddb5

In [62]:
config.getOption("spark.executor.extraClassPath")

[36mres61[39m: [32mOption[39m[[32mString[39m] = [33mSome[39m([32m"/home/java_libs/db2jcc4.jar"[39m)

In [61]:
config.getOption("spark.jars")

[36mres60[39m: [32mOption[39m[[32mString[39m] = [32mNone[39m

A few important helper functions to change operational parameters, and to check if some configuration parameters can be changed at runtime or not.

In [85]:
spark.conf.isModifiable("spark.executor.memory")

false

In [86]:
spark.conf.isModifiable("spark.jars")

false

In [87]:
sys.env("PATH")

/opt/ibm/scala/bin:/opt/ibm/jdk/bin://.local/bin://bin:/opt/ibm/spark/bin:/opt/ibm/conda/R/bin:/opt/ibm/conda/miniconda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

---

## Run a simple aggregation operation

In [63]:
val myint_rdd = sc.parallelize(0 to 1000000).setName("myint_6parts")

[36mmyint_rdd[39m: [32mRDD[39m[[32mInt[39m] = myint_6parts ParallelCollectionRDD[0] at parallelize at cmd62.sc:1

In [64]:
println("Partition size: " + myint_rdd.partitions.size)

Partition size: 6


In [65]:
myint_rdd.cache()

[36mres64[39m: [32mRDD[39m[[32mInt[39m] = myint_6parts ParallelCollectionRDD[0] at parallelize at cmd62.sc:1

In [66]:
val newint_rdd = myint_rdd.repartition(12).setName("myint_12parts")
println("New RDD partition size: " + newint_rdd.partitions.size)

New RDD partition size: 12


[36mnewint_rdd[39m: [32mRDD[39m[[32mInt[39m] = myint_12parts MapPartitionsRDD[4] at repartition at cmd65.sc:1

In [67]:
newint_rdd.cache()

[36mres66[39m: [32mRDD[39m[[32mInt[39m] = myint_12parts MapPartitionsRDD[4] at repartition at cmd65.sc:1

## **VERY IMPORTANT CONCEPT**

Spark evaluates all expressions and statements **'lazily'**.

This means that actual operation on a dataframe or RDD or dataset is not started unless a result is required.

Spark engine will keep delaying the operation until the last possible moment.

For example, go to the job monitoring user interface and verify that the above statements did not trigger any operations.

But the following statement will trigger a re-calculation since a result is required at this moment:

In [139]:
// first, look at the performance on 12 partitions in the spark jobs UI:
// Our sample cluster has 3 nodes with 2 cores per node, hence a parallelism of 6 works best.
newint_rdd.sum()

[36mres138[39m: [32mDouble[39m = [32m5.000005E11[39m

In [140]:
// next, compare it with the performance whe using 6 partitions
// since this matches the core count of the cluster, it executes without any scheduler delay
myint_rdd.sum()

[36mres139[39m: [32mDouble[39m = [32m5.000005E11[39m

In [141]:
myint_rdd.stats

[36mres140[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mutil[39m.[32mStatCounter[39m = (count: 1000001, mean: 500000.000000, stdev: 288675.423270, max: 1000000.000000, min: 0.000000)

---

## Load data to Dataframe

Data can be loaded into a dataframe in several ways. This notebook will demonstrate the following two different approaches:

  - Read a csv file
  - Read a database table

### About this Dataset

This dataset is based on "Bank Marketing" UCI dataset (please check the description at: http://archive.ics.uci.edu/ml/datasets/Bank+Marketing).

   The data is enriched by the addition of five new social and economic features/attributes (national wide indicators from a ~10M population country), published by the Banco de Portugal and publicly available at: https://www.bportugal.pt/estatisticasweb.
   
   The addition of the five new social and economic attributes (made available here) lead to substantial improvement in the prediction of a success, even when the duration of the call is not included.

The binary classification goal is to predict if the client will subscribe a bank term deposit (variable y).

### Input variables:

#### Bank client data:
  - 1. age (numeric)
  - 2. job : type of job (categorical: "admin.", "blue-collar", "entrepreneur", "housemaid", "management", "retired", "selfemployed", "services", "student", "technician", "unemployed", "unknown")
  - 3. marital : marital status (categorical: "divorced","married","single","unknown"; note: "divorced" means divorced or widowed)
  - 4. education (categorical: "basic.4y","basic.6y","basic.9y","high.school","illiterate","professional.course","university.degree","unknown")
  - 5. default: has credit in default? (categorical: "no","yes","unknown")
  - 6. housing: has housing loan? (categorical: "no","yes","unknown")
  - 7. loan: has personal loan? (categorical: "no","yes","unknown")
#### Data related with the last contact of the current campaign:
  - 8. contact: contact communication type (categorical: "cellular","telephone") 
  - 9. month: last contact month of year (categorical: "jan", "feb", "mar", ..., "nov", "dec")
  - 10. day_of_week: last contact day of the week (categorical: "mon","tue","wed","thu","fri")
  - 11. duration: last contact duration, in seconds (numeric). Important note:  this attribute highly affects the output target (e.g., if duration=0 then y="no"). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.
#### Other attributes:
  - 12. campaign: number of contacts performed during this campaign and for this client (numeric, includes last contact)
  - 13. pdays: number of days that passed by after the client was last contacted from a previous campaign (numeric; 999 means client was not previously contacted)
  - 14. previous: number of contacts performed before this campaign and for this client (numeric)
  - 15. poutcome: outcome of the previous marketing campaign (categorical: "failure","nonexistent","success")
#### Social and economic context attributes
  - 16. emp.var.rate: employment variation rate - quarterly indicator (numeric)
  - 17. cons.price.idx: consumer price index - monthly indicator (numeric)     
  - 18. cons.conf.idx: consumer confidence index - monthly indicator (numeric)     
  - 19. euribor3m: euribor 3 month rate - daily indicator (numeric)
  - 20. nr.employed: number of employees - quarterly indicator (numeric)

#### Output variable (desired target):
  - 21. y - has the client subscribed a term deposit? (binary: "yes","no")

#### Missing Attribute Values:
There are several missing values in some categorical attributes, all coded with the "unknown" label. These missing values can be treated as a possible class label or using deletion or imputation techniques. 


In [14]:
// Declare a Schema
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}

val bank_telemkt_schema: StructType = new StructType()
    .add("age", DoubleType, true)
    .add("job", StringType, true)
    .add("marital", StringType, true)
    .add("education", StringType, true)
    .add("defaulted", StringType, true)
    .add("housing", StringType, true)
    .add("loan", StringType, true)
    .add("contact_no", StringType, true)
    .add("month_name", StringType, true)
    .add("day_of_week", StringType, true)
    .add("duration", DoubleType, true)
    .add("campaign", DoubleType, true)
    .add("pdays", DoubleType, true)
    .add("previous", DoubleType, true)
    .add("poutcome", StringType, true)
    .add("emp_var_rate", DoubleType, true)
    .add("cons_price_idx", DoubleType, true)
    .add("cons_conf_idx", DoubleType, true)
    .add("euribor3m", DoubleType, true)
    .add("nr_employed", DoubleType, true)
    .add("y", StringType, true);

[32mimport [39m[36morg.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}

[39m
[36mbank_telemkt_schema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"age"[39m, DoubleType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"job"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"marital"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"education"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"defaulted"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"housing"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"loan"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"contact_no"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"month_name"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"day_of_week"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"durat

In [15]:
// declare a class if you want to use DataSets instead of Data Frames:
case class ModelDataRecord(
                            age: Double,
                            job: String,
                            marital: String,
                            education: String,
                            defaulted: String,
                            housing: String,
                            loan: String,
                            contact_no: String,
                            month_name: String,
                            day_of_week: String,
                            duration: Double,
                            campaign: Double,
                            pdays: Double,
                            previous: Double,
                            poutcome: String,
                            emp_var_rate: Double,
                            cons_price_idx: Double,
                            cons_conf_idx: Double,
                            euribor3m: Double,
                            nr_employed: Double,
                            y: String
                          )

defined [32mclass[39m [36mModelDataRecord[39m

### Variable Names

Define variables with the column names.

These will be used during data transformation and model training/evaluation later.

In [145]:
val originalLabelColname="y"
val labelColname = "label"

val numerical_features = Array("age", "duration", "pdays", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed");
val categorical_features = Array("job", "marital", "education", "defaulted", "housing", "loan", "day_of_week", "poutcome", "month_name");

[36moriginalLabelColname[39m: [32mString[39m = [32m"y"[39m
[36mlabelColname[39m: [32mString[39m = [32m"label"[39m
[36mnumerical_features[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"age"[39m,
  [32m"duration"[39m,
  [32m"pdays"[39m,
  [32m"emp_var_rate"[39m,
  [32m"cons_price_idx"[39m,
  [32m"cons_conf_idx"[39m,
  [32m"euribor3m"[39m,
  [32m"nr_employed"[39m
)
[36mcategorical_features[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"job"[39m,
  [32m"marital"[39m,
  [32m"education"[39m,
  [32m"defaulted"[39m,
  [32m"housing"[39m,
  [32m"loan"[39m,
  [32m"day_of_week"[39m,
  [32m"poutcome"[39m,
  [32m"month_name"[39m
)

### Read data from file:

In [90]:
import spark.implicits._

val inputDF = spark.read
    .option("header", "true")
    .option("numPartitions", 6)
    .schema(bank_telemkt_schema)
    //.csv("/project_data/data_asset/bank-additional-full.csv")
    .csv("/home/datasets/bank_telemkt/bank-additional-full.csv").as[ModelDataRecord]

[32mimport [39m[36mspark.implicits._

[39m
[36minputDF[39m: [32mDataset[39m[[32mModelDataRecord[39m] = [age: double, job: string ... 19 more fields]

In [91]:
inputDF.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- defaulted: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact_no: string (nullable = true)
 |-- month_name: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp_var_rate: double (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: string (nullable = true)



### Reading data from a JDBC database connection

In [73]:
// load required JDC driver library from the maven public repository, if not already present in application:

// this uses the IBM DB2 connector to read from a DB2 table
import $ivy.`com.ibm.db2.jcc:db2jcc:db2jcc4`;

//import com.ibm.db2.jcc.DB2Driver;

[32mimport [39m[36m$ivy.$                               ;

//import com.ibm.db2.jcc.DB2Driver;[39m

In [13]:
import java.util.Properties

val connProp = new Properties()
connProp.setProperty("driver", "com.ibm.db2.jcc.DB2Driver")
connProp.put("url", "jdbc:db2://localhost:50000/testdb")
connProp.put("user", "db2inst1")
connProp.put("password", "abcdefgh")

val tableName = "DB2INST1.BANK_TELEMKT"

[32mimport [39m[36mjava.util.Properties

[39m
[36mconnProp[39m: [32mProperties[39m = {password=abcdefgh, driver=com.ibm.db2.jcc.DB2Driver, user=db2inst1, url=jdbc:db2://localhost:50000/testdb}
[36mres12_2[39m: [32mObject[39m = [32mnull[39m
[36mres12_3[39m: [32mObject[39m = [32mnull[39m
[36mres12_4[39m: [32mObject[39m = [32mnull[39m
[36mres12_5[39m: [32mObject[39m = [32mnull[39m
[36mtableName[39m: [32mString[39m = [32m"DB2INST1.BANK_TELEMKT"[39m

In [75]:
import spark.implicits._

val bank_telemkt_df = spark.read
    .format("jdbc")
    .option("driver", connProp.getProperty("driver"))
    .option("connectionProvider","db2")
    .option("url", connProp.getProperty("url"))
    .option("dbtable", tableName)// <- if using SQL query, specify it as dtable: "(select * from xyz) dtq"
    .option("user", connProp.getProperty("user"))
    .option("password", connProp.getProperty("password"))
    .option("fetchsize", 1000)
    .option("numPartitions", 6)
    .option("partitionColumn", "duration") // <- use a numeric column or date/timestamp
    .option("lowerBound", 0)
    .option("upperBound", 5000)
    .load()

[32mimport [39m[36mspark.implicits._

[39m
[36mbank_telemkt_df[39m: [32mDataFrame[39m = [AGE: double, JOB: string ... 19 more fields]

In [76]:
println(
      "Loaded data from database table \'%s\' into a dataframe with %d partitions".format(
        tableName,
        bank_telemkt_df.rdd.getNumPartitions)
    );

Loaded data from database table 'DB2INST1.BANK_TELEMKT' into a dataframe with 6 partitions


In [77]:
bank_telemkt_df.show(2)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| AGE|      JOB|MARITAL|  EDUCATION|DEFAULTED|HOUSING|LOAN|CONTACT_NO|MONTH_NAME|DAY_OF_WEEK|DURATION|CAMPAIGN|PDAYS|PREVIOUS|   POUTCOME|EMP_VAR_RATE|CONS_PRICE_IDX|CONS_CONF_IDX|EURIBOR3M|NR_EMPLOYED|  Y|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|56.0|housemaid|married|   basic.4y|       no|     no|  no| telephone|       may|        mon|   261.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|57.0| services|married|high.school|  unknown|     no|  no| telephone|       may|        mon|   149.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|      

In [79]:
// convert all column names to lowercase for consistency:

// first, create new column names which are lowercase of existing column names:
val newCols = bank_telemkt_df.columns.map(x => x.toLowerCase() )

// next, create a new dataframe with these new column names:
val inputDF = bank_telemkt_df.toDF(newCols: _*)

[36mnewCols[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"age"[39m,
  [32m"job"[39m,
  [32m"marital"[39m,
  [32m"education"[39m,
  [32m"defaulted"[39m,
  [32m"housing"[39m,
  [32m"loan"[39m,
  [32m"contact_no"[39m,
  [32m"month_name"[39m,
  [32m"day_of_week"[39m,
  [32m"duration"[39m,
  [32m"campaign"[39m,
  [32m"pdays"[39m,
  [32m"previous"[39m,
  [32m"poutcome"[39m,
  [32m"emp_var_rate"[39m,
  [32m"cons_price_idx"[39m,
  [32m"cons_conf_idx"[39m,
  [32m"euribor3m"[39m,
  [32m"nr_employed"[39m,
  [32m"y"[39m
)
[36minputDF[39m: [32mDataFrame[39m = [age: double, job: string ... 19 more fields]

---

## Explore the Data

Show top 4 rows of the DataFrame loaded from this source.

In [138]:
inputDF.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|56.0|housemaid|married|   basic.4y|       no|     no|  no| telephone|       may|        mon|   261.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
|57.0| services|married|high.school|  unknown|     no|  no| telephone|       may|        mon|   149.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|      

### Run any type of SQL queries on the dataframe

First, declare the dataframe as a "view".

Then, it will be available to query using SQL statements.

In [94]:
inputDF.count()

[36mres93[39m: [32mLong[39m = [32m41188L[39m

In [95]:
inputDF.createOrReplaceTempView("input")

Now, a simple SQL statement can be run to query from this dataframe 'inputDF' that has been declared as a 'view'

In [96]:
spark.sql("SELECT job, count(1) from input group by job"
         ).show();

+-------------+--------+
|          job|count(1)|
+-------------+--------+
|   management|    2924|
|      retired|    1720|
|      unknown|     330|
|self-employed|    1421|
|      student|     875|
|  blue-collar|    9254|
| entrepreneur|    1456|
|       admin.|   10422|
|   technician|    6743|
|     services|    3969|
|    housemaid|    1060|
|   unemployed|    1014|
+-------------+--------+



Lets write a slightly more complicated SQL query.

Notice that the usual SQL expressions are available to use with Spark SQL.

In [97]:
val queryResultDF = spark.sql("SELECT job, count(1) as Total_Count, " + 
          "100.0*sum(case when y='yes' then 1 else 0 end)/count(1) as Outcome_Yes" +
          " from input group by job"
         )

[36mqueryResultDF[39m: [32mDataFrame[39m = [job: string, Total_Count: bigint ... 1 more field]

In [98]:
queryResultDF.collect().foreach(
    curr_row => println(
        "Total count for %s = %d, of which %2.1f %% converted".format(curr_row(0), curr_row(1), curr_row(2) )
    )
)

Total count for management = 2924, of which 11.2 % converted
Total count for retired = 1720, of which 25.2 % converted
Total count for unknown = 330, of which 11.2 % converted
Total count for self-employed = 1421, of which 10.5 % converted
Total count for student = 875, of which 31.4 % converted
Total count for blue-collar = 9254, of which 6.9 % converted
Total count for entrepreneur = 1456, of which 8.5 % converted
Total count for admin. = 10422, of which 13.0 % converted
Total count for technician = 6743, of which 10.8 % converted
Total count for services = 3969, of which 8.1 % converted
Total count for housemaid = 1060, of which 10.0 % converted
Total count for unemployed = 1014, of which 14.2 % converted


In [100]:
inputDF.where(inputDF("y") === "yes").describe("duration").show()

+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|              4640|
|   mean| 553.1911637931034|
| stddev|401.17187076598645|
|    min|              37.0|
|    max|            4199.0|
+-------+------------------+



In [101]:
inputDF.where(inputDF("y") === "no").describe("duration").show()

+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|             36548|
|   mean|220.84480682937507|
| stddev|207.09629330889348|
|    min|               0.0|
|    max|            4918.0|
+-------+------------------+



In [102]:
import org.apache.spark.ml.stat.KolmogorovSmirnovTest
import org.apache.spark.ml.stat.ANOVATest
import org.apache.spark.ml.stat.ChiSquareTest

[32mimport [39m[36morg.apache.spark.ml.stat.KolmogorovSmirnovTest
[39m
[32mimport [39m[36morg.apache.spark.ml.stat.ANOVATest
[39m
[32mimport [39m[36morg.apache.spark.ml.stat.ChiSquareTest[39m

### K-S Test

The two-sample K-S test compares and gives the difference betweed the CDF (cumulative distribution functions) of two samples.
However, Spark MLlib does not have a two-samepl k-s test function.

As a workaround, the one-sample K-S test is applied here to compare each sample one by one against the normal distribution CDF
with given mean and standard deviation.


In [103]:
val ksTestResult = KolmogorovSmirnovTest.test(inputDF.where(inputDF("y") === "yes"), "duration", "norm", 220.844, 207.096).take(1)(0)
val ksPvalue = ksTestResult.getDouble(0)
val ksStatistic = ksTestResult.getDouble(1)

[36mksTestResult[39m: [32mRow[39m = [6.496780891040999E-11,0.36740306637984005]
[36mksPvalue[39m: [32mDouble[39m = [32m6.496780891040999E-11[39m
[36mksStatistic[39m: [32mDouble[39m = [32m0.36740306637984005[39m

In [104]:
val ksTestResult = KolmogorovSmirnovTest.test(inputDF.where(inputDF("y") === "no"), "duration", "norm", 220.844, 207.096).take(1)(0)
val ksPvalue = ksTestResult.getDouble(0)
val ksStatistic = ksTestResult.getDouble(1)

[36mksTestResult[39m: [32mRow[39m = [8.158118625090083E-11,0.15181821622812208]
[36mksPvalue[39m: [32mDouble[39m = [32m8.158118625090083E-11[39m
[36mksStatistic[39m: [32mDouble[39m = [32m0.15181821622812208[39m

### Convenience function for comparing population for a continuous variable vs. each of the target variable's two classes

In [106]:
import org.apache.spark.sql.Dataset

/** 
Convenience function to compare statistics on numerical column vs. the target values.
It compares the two samples of a continuous variable - one sample each for the target variable's two classes
*/
def numericalBivariateTable(inputData: org.apache.spark.sql.Dataset[_],
                            labelColName: String,
                            colToAnalyse: String,
                            label1: String,
                            label2: String): org.apache.spark.sql.Dataset[_] = {
    
    import org.apache.spark.ml.stat.KolmogorovSmirnovTest
    
    val statsDF1 = inputData.where(inputData(labelColName) === label1).describe(colToAnalyse)
        .withColumnRenamed("summary","statistics_1")
        .withColumnRenamed(colToAnalyse, colToAnalyse + "_" + labelColName + "_" + label1)
    
    val statsDF2 = inputData.where(inputData(labelColName) === label2).describe(colToAnalyse)
        .withColumnRenamed("summary","statistics_2")
        .withColumnRenamed(colToAnalyse, colToAnalyse + "_" + labelColName + "_" + label2)
    
    val joinedDF = statsDF1.join(statsDF2, statsDF1("statistics_1") ===  statsDF2("statistics_2"),"inner")
        .drop("statistics_2")
        .withColumnRenamed("statistics_1","Statistic")
    
    val tmpArray = joinedDF.collect()
    val meanVal = tmpArray(1)(1).toString.toDouble
    val sdVal = tmpArray(2)(1).toString.toDouble
    
    val ksTestResult1 = KolmogorovSmirnovTest.test(inputData.where(inputData(labelColName) === label1), colToAnalyse, "norm", meanVal, sdVal).take(1)(0)
    val ksPvalue1 = ksTestResult1.getDouble(0)
    val ksStatistic1 = ksTestResult1.getDouble(1)
    
    val ksTestResult2 = KolmogorovSmirnovTest.test(inputData.where(inputData(labelColName) === label2), colToAnalyse, "norm", meanVal, sdVal).take(1)(0)
    val ksPvalue2 = ksTestResult2.getDouble(0)
    val ksStatistic2 = ksTestResult2.getDouble(1)
    
    return joinedDF.union(Seq(("K-S Statistic", ksStatistic1, ksStatistic2)).toDF)
    
}

[32mimport [39m[36morg.apache.spark.sql.Dataset

/** 
Convenience function to compare statistics on numerical column vs. the target values.
It compares the two samples of a continuous variable - one sample each for the target variable's two classes
*/
[39m
defined [32mfunction[39m [36mnumericalBivariateTable[39m

In [107]:
val duration_stats = numericalBivariateTable(inputDF, "y", "duration", "no", "yes")

duration_stats.show(10)

+-------------+-------------------+------------------+
|    Statistic|      duration_y_no|    duration_y_yes|
+-------------+-------------------+------------------+
|        count|              36548|              4640|
|         mean| 220.84480682937507| 553.1911637931034|
|       stddev| 207.09629330889348|401.17187076598645|
|          min|                0.0|              37.0|
|          max|             4918.0|            4199.0|
|K-S Statistic|0.15181978657453288|0.3674019979283104|
+-------------+-------------------+------------------+



[36mduration_stats[39m: [32mDataset[39m[[32m_[39m] = [Statistic: string, duration_y_no: string ... 1 more field]

In [108]:
val age_stats = numericalBivariateTable(inputDF, "y", "age", "no", "yes")

age_stats.show(10)

+-------------+-------------------+------------------+
|    Statistic|           age_y_no|         age_y_yes|
+-------------+-------------------+------------------+
|        count|              36548|              4640|
|         mean| 39.911185290576775| 40.91314655172414|
|       stddev|   9.89813179527967|13.837476239030554|
|          min|               17.0|              17.0|
|          max|               95.0|              98.0|
|K-S Statistic|0.09278012965860277|0.1403293216252272|
+-------------+-------------------+------------------+



[36mage_stats[39m: [32mDataset[39m[[32m_[39m] = [Statistic: string, age_y_no: string ... 1 more field]

In [109]:
val pdays_stats = numericalBivariateTable(inputDF, "y", "pdays", "no", "yes")

pdays_stats.show(10)

+-------------+------------------+------------------+
|    Statistic|        pdays_y_no|       pdays_y_yes|
+-------------+------------------+------------------+
|        count|             36548|              4640|
|         mean| 984.1138776403634| 792.0355603448276|
|       stddev|120.65686774517023| 403.4071808397044|
|          min|               0.0|               0.0|
|          max|             999.0|             999.0|
|K-S Statistic| 0.534101209799114|0.4509048096821161|
+-------------+------------------+------------------+



[36mpdays_stats[39m: [32mDataset[39m[[32m_[39m] = [Statistic: string, pdays_y_no: string ... 1 more field]

In [110]:
val euribor3m_stats = numericalBivariateTable(inputDF, "y", "euribor3m", "no", "yes")

euribor3m_stats.show(10)

+-------------+-------------------+------------------+
|    Statistic|     euribor3m_y_no|   euribor3m_y_yes|
+-------------+-------------------+------------------+
|        count|              36548|              4640|
|         mean|  3.811491162306951| 2.123135129310318|
|       stddev| 1.6381874709419575|1.7425979218386416|
|          min|              0.634|             0.634|
|          max|              5.045|             5.045|
|K-S Statistic|0.36483589053153886|0.6084414888935559|
+-------------+-------------------+------------------+



[36meuribor3m_stats[39m: [32mDataset[39m[[32m_[39m] = [Statistic: string, euribor3m_y_no: string ... 1 more field]

In [111]:
val cons_price_idx_stats = numericalBivariateTable(inputDF, "y", "cons_price_idx", "no", "yes")

cons_price_idx_stats.show(10)

+-------------+-------------------+--------------------+
|    Statistic|cons_price_idx_y_no|cons_price_idx_y_yes|
+-------------+-------------------+--------------------+
|        count|              36548|                4640|
|         mean|  93.60375705925262|   93.35438599138075|
|       stddev| 0.5589929413581802|  0.6766438009043876|
|          min|             92.201|              92.201|
|          max|             94.767|              94.767|
|K-S Statistic|0.22440582351312138| 0.30829070578904105|
+-------------+-------------------+--------------------+



[36mcons_price_idx_stats[39m: [32mDataset[39m[[32m_[39m] = [Statistic: string, cons_price_idx_y_no: string ... 1 more field]

---
## Transform the data

  - Convert text columns into indexed data representing categorical variables
  - Apply one-hot encoding to categorical variables
  - Scale numerical variables using min-max values
  - 'Assemble' feature columns together for ML algorithms to use for training
  - Others: A wide variety of transformations are available out-of-the-box

In [112]:
import org.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.linalg

[32mimport [39m[36morg.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}
[39m
[32mimport [39m[36morg.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
[39m
[32mimport [39m[36morg.apache.spark.ml.linalg[39m

In [113]:
// first of all, index the binary label column:
val labelIndexer = new StringIndexer()
      .setInputCol(originalLabelColname)
      .setOutputCol("label")

[36mlabelIndexer[39m: [32mStringIndexer[39m = strIdx_648d3b0473d8

After declaring the indexer, we "fit" it on the data.

In [114]:
val fittedIndexer = labelIndexer.fit(inputDF)

[36mfittedIndexer[39m: [32mml[39m.[32mfeature[39m.[32mStringIndexerModel[39m = StringIndexerModel: uid=strIdx_648d3b0473d8, handleInvalid=error

Now, this "fitted" transformer is ready to be used. We run the transform on our dataset to get the required result.

In this case, change the target variable form text to a categorical variable.

In [115]:
val transformedDF = fittedIndexer.transform(inputDF)

[36mtransformedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 20 more fields]

In [116]:
transformedDF.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+
|56.0|housemaid|married|   basic.4y|       no|     no|  no| telephone|       may|        mon|   261.0|     1.0|999.0|     0.0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|  0.0|
|57.0| services|married|high.school|  unknown|     no|  no| telephone|       may|        mon|   149.0|     1.0|999.0|     0.0|nonexistent|         1

----

### Create data transformation Pipeline

Instead of applying transformations one by one, let us collect these into a pipeline programatically and apply them all at once.

In the code below, We'll use an ArrayBuffer to dynamically collect all transformations.

In [117]:
// this buffer "xforms" will accumulate all our transformations till we're ready to put them in a pipeline
var xforms = scala.collection.mutable.ArrayBuffer.empty[PipelineStage];

[36mxforms[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m()

### Step 1: first of all, index the binary label column:

In [118]:
val labelIndexer = new StringIndexer()
  .setInputCol(originalLabelColname)
  .setOutputCol("label")

// add this to the array buffer:
xforms += labelIndexer;

[36mlabelIndexer[39m: [32mStringIndexer[39m = strIdx_b92509fa5ad4
[36mres117_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4
)

In [119]:
xforms.length

[36mres118[39m: [32mInt[39m = [32m1[39m

### Step 2: Next, add a column indexer for each categorical column.

Notice how the colun name is used to set the input column name and create the output column name prefixed with "idx_".

In [120]:
categorical_features.foreach(
    x => 
    xforms += new StringIndexer().setInputCol(x).setOutputCol("idx_" + x)
    )

println(s"Indexing categorical variables. Count of transformations at this point is now = ${xforms.length}")

Indexing categorical variables. Count of transformations at this point is now = 9


### Step 3: Next, apply one-hot encoding to all categorical variables:

In [121]:
categorical_features.foreach(x => xforms += new OneHotEncoder().setInputCol("idx_" + x).setOutputCol("vec_idx_" + x))

println(s"One-hot encoding all categorical variables. Count of transformations at this point is now = ${xforms.length}")

One-hot encoding all categorical variables. Count of transformations at this point is now = 17


At this point, let us ather all column names, these will be used in vector assembler later:

In [122]:
var allColNames = scala.collection.mutable.ArrayBuffer.empty[String]

categorical_features.foreach(x => allColNames += "vec_idx_%s".format(x))

In [123]:
// gather all numerical variables to assemble into a vector for applying scaling:
var numericalColNames = scala.collection.mutable.ArrayBuffer.empty[String]

numerical_features.foreach(y => numericalColNames += y)

### Important Concept

Note: The vector assembler gathers all numerical variables and creates a vector out of these. This is then used for other transformations.

Almost all ML algorithms operate on a vector column of dependent variables. Hence, vector assembler is necessity in most situations.

### Step 4: Assemble all numerical features.

In [124]:
val assembler1 = new VectorAssembler()
  .setInputCols(numerical_features.toArray)
  .setOutputCol("numericalfeatures")

xforms += assembler1;

println(s"Assembled together all numerical variables.\n Count of transformations at this point is now = ${xforms.length}")

Assembled together all numerical variables.
 Count of transformations at this point is now = 18


[36massembler1[39m: [32mVectorAssembler[39m = VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
[36mres123_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
)

### Step 5: Apply a min-max scaler for the numerical features:

In [125]:
xforms += new MinMaxScaler().setInputCol("numericalfeatures").setOutputCol("scaledfeatures");
allColNames += "scaledfeatures"

println(s"Scaled all numerical variables by min-max scaler.\n Count of transformations at this point is now = ${xforms.length}")

Scaled all numerical variables by min-max scaler.
 Count of transformations at this point is now = 19


[36mres124_0[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8,
  minMaxScal_d34452318ea3
)
[36mres124_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mString[39m] = [33mArrayBuffer[39m(
  [32m"vec_idx_job"[39m,
  [32m"vec_idx_marital"[39m,
  [32m"vec_idx_education"[39m,
  [32m"vec_idx_defaulted"[39m,
  [32m"vec_idx_housing"[39m,
  [32m"vec_idx_loan"[39m,


### Step 6: Finally, collect all columns into the "features" column, this is a vector column which is the set of all dependent variables to be used for model training.

In [126]:
val assembler2 = new VectorAssembler()
  .setInputCols(allColNames.toArray)
  .setOutputCol("features")

xforms += assembler2;

println(s"Collect all scaled numerical variables and the categorical variables together.")
println(s"Count of transformations at this point is now = ${xforms.length}")

Collect all scaled numerical variables and the categorical variables together.
Count of transformations at this point is now = 20


[36massembler2[39m: [32mVectorAssembler[39m = VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9
[36mres125_1[39m: [32mcollection[39m.[32mmutable[39m.[32mArrayBuffer[39m[[32mPipelineStage[39m] = [33mArrayBuffer[39m(
  strIdx_b92509fa5ad4,
  strIdx_81f85490b36c,
  strIdx_ce69f1f871b5,
  strIdx_c015f894853f,
  strIdx_32f039b5508e,
  strIdx_aeb9100a7da0,
  strIdx_df5472840729,
  strIdx_c594c9129ba0,
  strIdx_2994d75552c7,
  oneHotEncoder_a2281c834864,
  oneHotEncoder_2e9a416be7af,
  oneHotEncoder_dd81b0e3ca9f,
  oneHotEncoder_2edcf25b0c71,
  oneHotEncoder_0c3912d2c3ba,
  oneHotEncoder_051edbcf4b3a,
  oneHotEncoder_456c6d53b3ae,
  oneHotEncoder_49ca0d8f1c3d,
  VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8,
  minMaxScal_d34452318ea3,
  VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9
)

In [127]:
// print out all the transformations
xforms.toArray.map(x => println(x))

strIdx_b92509fa5ad4
strIdx_81f85490b36c
strIdx_ce69f1f871b5
strIdx_c015f894853f
strIdx_32f039b5508e
strIdx_aeb9100a7da0
strIdx_df5472840729
strIdx_c594c9129ba0
strIdx_2994d75552c7
oneHotEncoder_a2281c834864
oneHotEncoder_2e9a416be7af
oneHotEncoder_dd81b0e3ca9f
oneHotEncoder_2edcf25b0c71
oneHotEncoder_0c3912d2c3ba
oneHotEncoder_051edbcf4b3a
oneHotEncoder_456c6d53b3ae
oneHotEncoder_49ca0d8f1c3d
VectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8
minMaxScal_d34452318ea3
VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9


[36mres126[39m: [32mArray[39m[[32mUnit[39m] = [33mArray[39m(
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  (),
  ()
)

### Step 7: Next, create a model pipeline with all these transformations

In [128]:
println(s"Assembling pipeline with the following transformations: ${xforms.mkString}")

val xformPipeline = new Pipeline()
  .setStages(xforms.toArray);

Assembling pipeline with the following transformations: strIdx_b92509fa5ad4strIdx_81f85490b36cstrIdx_ce69f1f871b5strIdx_c015f894853fstrIdx_32f039b5508estrIdx_aeb9100a7da0strIdx_df5472840729strIdx_c594c9129ba0strIdx_2994d75552c7oneHotEncoder_a2281c834864oneHotEncoder_2e9a416be7afoneHotEncoder_dd81b0e3ca9foneHotEncoder_2edcf25b0c71oneHotEncoder_0c3912d2c3baoneHotEncoder_051edbcf4b3aoneHotEncoder_456c6d53b3aeoneHotEncoder_49ca0d8f1c3dVectorAssembler: uid=vecAssembler_2e7df15b8426, handleInvalid=error, numInputCols=8minMaxScal_d34452318ea3VectorAssembler: uid=vecAssembler_352e32d6ca96, handleInvalid=error, numInputCols=9


[36mxformPipeline[39m: [32mPipeline[39m = pipeline_ba250921493c

### Step 8: Fit the pipeline to create the transformer object. 

In [129]:
val dataTransformPipelineFitted = xformPipeline.fit(inputDF);

[36mdataTransformPipelineFitted[39m: [32mPipelineModel[39m = pipeline_ba250921493c

In [130]:
println("Now saving the transformation pipeline to disk at: ")
dataTransformPipelineFitted.write.overwrite().save("/tmp/dataTransformPipeline")

Now saving the transformation pipeline to disk at: 


23/01/30 12:13:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
23/01/30 12:13:14 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:14 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 12:13:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 12:13:15 INFO FileOutputCommitter: Fil

### Step 9: Use the fitted transformer to apply the transformations on the dataset

In [55]:
// Run the transformation pipeline on the dataset to prepare the data for model building
val preparedDF: org.apache.spark.sql.DataFrame = dataTransformPipelineFitted.transform(inputDF);

println("Completed transforming data using the pipeline.")

23/01/29 21:36:20 INFO SparkMLDemo: Completed transforming data using the pipeline.


[36mpreparedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 39 more fields]

In [56]:
preparedDF.show(4)

23/01/29 21:36:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|   vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome|   numericalfeatures|      scaledfeatures|      

### Define a convenience function to create the feature transformation pipeline programatically.

In [139]:
import org.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}

/**
* Transforms the raw dataset columns to a form usable for training the models -
* e.g. string to categorical variables, one-hot encoding, scaling of continuous variables, etc.
*
* @param inputDF Input raw dataset
* @param labelColname Name of the label column
* @param categoricalFeatures List of column names which are categorical features
* @param numericalFeatures List of column names which are numerical features
* @return The fitted transformation pipeline
*/
def firTransformDataPipeline(inputDF: Dataset[ModelDataRecord],
                labelColname: String,
                categoricalFeatures: Array[String],
                numericalFeatures: Array[String]): PipelineModel = {

    // this buffer "xforms" will accumulate all our transformations till we're ready to put them in a pipeline
    var xforms = scala.collection.mutable.ArrayBuffer.empty[PipelineStage];

    // first of all, index the binary label column:
    val labelIndexer = new StringIndexer()
      .setInputCol(labelColname)
      .setOutputCol("label")
    xforms += labelIndexer;

    // add a column indexer for each categorical column:
    categoricalFeatures.foreach(x => xforms += new StringIndexer().setInputCol(x).setOutputCol("idx_" + x))
    logger.info("Indexing categorical variables.")

    categoricalFeatures.foreach(x => xforms += new OneHotEncoder().setInputCol("idx_" + x).setOutputCol("vec_idx_" + x))
    logger.info("On-hot encoding all categorical variables.")

    // gather all column names, these will be used in vector assembler later:
    var allColNames = scala.collection.mutable.ArrayBuffer.empty[String]
    categoricalFeatures.foreach(x => allColNames += "vec_idx_%s".format(x))

    // gather all numerical variables to assemble into a vector for scaling
    var numericalColNames = scala.collection.mutable.ArrayBuffer.empty[String]
    numericalFeatures.foreach(y => numericalColNames += y)
    val assembler1 = new VectorAssembler()
      .setInputCols(numericalColNames.toArray)
      .setOutputCol("numericalfeatures")
    xforms += assembler1;
    logger.info("Assembled together all numerical variables.")

    // apply a min-max scaler for the numerical features:
    xforms += new MinMaxScaler().setInputCol("numericalfeatures").setOutputCol("scaledfeatures");
    allColNames += "scaledfeatures"
    logger.info("Scaled all numerical variables by min-max scaler.")

    // finally, collect all columns into the "features" column, this is a vector object
    val assembler2 = new VectorAssembler()
      .setInputCols(allColNames.toArray)
      .setOutputCol("features")
    xforms += assembler2;

    logger.info("Assembling pipeline with the following transformations: \n" + xforms.mkString(" \n"))
    val xformPipeline = new Pipeline()
      .setStages(xforms.toArray);

    val xformFitted = xformPipeline.fit(inputDF);
    logger.info("Completed fitting the pipeline")

    return xformFitted
}

[32mimport [39m[36morg.apache.spark.ml.{Model, Pipeline, PipelineModel, PipelineStage}
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}

/**
* Transforms the raw dataset columns to a form usable for training the models -
* e.g. string to categorical variables, one-hot encoding, scaling of continuous variables, etc.
*
* @param inputDF Input raw dataset
* @param labelColname Name of the label column
* @param categoricalFeatures List of column names which are categorical features
* @param numericalFeatures List of column names which are numerical features
* @return The fitted transformation pipeline
*/
[39m
defined [32mfunction[39m [36mfirTransformDataPipeline[39m

In [144]:
categorical_features

[36mres143[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"job"[39m,
  [32m"marital"[39m,
  [32m"education"[39m,
  [32m"defaulted"[39m,
  [32m"housing"[39m,
  [32m"loan"[39m,
  [32m"day_of_week"[39m,
  [32m"poutcome"[39m
)

In [146]:
val xformPipeline = firTransformDataPipeline(inputDF, originalLabelColname, categorical_features, numerical_features)

23/01/30 13:48:52 INFO SparkMLDemo: Indexing categorical variables.
23/01/30 13:48:52 INFO SparkMLDemo: On-hot encoding all categorical variables.
23/01/30 13:48:52 INFO SparkMLDemo: Assembled together all numerical variables.
23/01/30 13:48:52 INFO SparkMLDemo: Scaled all numerical variables by min-max scaler.
23/01/30 13:48:52 INFO SparkMLDemo: Assembling pipeline with the following transformations: 
strIdx_387de59ef340 
strIdx_da034cc60575 
strIdx_cd96b3a27cc3 
strIdx_cdb9382316bb 
strIdx_533d8e9f698e 
strIdx_710fcc1c2f06 
strIdx_583318018d1f 
strIdx_bd235a3f2b37 
strIdx_17810b26410a 
strIdx_f9e59d5bb2d9 
oneHotEncoder_23267482a5e1 
oneHotEncoder_9a097096c385 
oneHotEncoder_1743522806e8 
oneHotEncoder_606c90442245 
oneHotEncoder_86b7ed7edbcc 
oneHotEncoder_7cff57b6ffd5 
oneHotEncoder_7cbb64f05541 
oneHotEncoder_246deca3c0cb 
oneHotEncoder_631138749c83 
VectorAssembler: uid=vecAssembler_5dfd2d2c6bb9, handleInvalid=error, numInputCols=8 
minMaxScal_e1a93ba7a235 
VectorAssembler: uid=v

[36mxformPipeline[39m: [32mPipelineModel[39m = pipeline_09def6d18db0

### Investigate the fitted pipeline and its stages:

In [193]:
val pipelineStages = xformPipeline.parent.extractParamMap.toSeq(0).value.asInstanceOf[Array[PipelineStage]]

[36mpipelineStages[39m: [32mArray[39m[[32mPipelineStage[39m] = [33mArray[39m(
  strIdx_387de59ef340,
  strIdx_da034cc60575,
  strIdx_cd96b3a27cc3,
  strIdx_cdb9382316bb,
  strIdx_533d8e9f698e,
  strIdx_710fcc1c2f06,
  strIdx_583318018d1f,
  strIdx_bd235a3f2b37,
  strIdx_17810b26410a,
  strIdx_f9e59d5bb2d9,
  oneHotEncoder_23267482a5e1,
  oneHotEncoder_9a097096c385,
  oneHotEncoder_1743522806e8,
  oneHotEncoder_606c90442245,
  oneHotEncoder_86b7ed7edbcc,
  oneHotEncoder_7cff57b6ffd5,
  oneHotEncoder_7cbb64f05541,
  oneHotEncoder_246deca3c0cb,
  oneHotEncoder_631138749c83,
  VectorAssembler: uid=vecAssembler_5dfd2d2c6bb9, handleInvalid=error, numInputCols=8,
  minMaxScal_e1a93ba7a235,
  VectorAssembler: uid=vecAssembler_72339199a57d, handleInvalid=error, numInputCols=10
)

Print the parameters of all the stages:

In [207]:
var counter = 0
pipelineStages.map( x => {println(counter + ":" + x.getClass.toString + ": " + x.toString + ": " + x.extractParamMap); counter +=1;});

0:class org.apache.spark.ml.feature.StringIndexer: strIdx_387de59ef340: {
	strIdx_387de59ef340-handleInvalid: error,
	strIdx_387de59ef340-inputCol: y,
	strIdx_387de59ef340-outputCol: label,
	strIdx_387de59ef340-stringOrderType: frequencyDesc
}
1:class org.apache.spark.ml.feature.StringIndexer: strIdx_da034cc60575: {
	strIdx_da034cc60575-handleInvalid: error,
	strIdx_da034cc60575-inputCol: job,
	strIdx_da034cc60575-outputCol: idx_job,
	strIdx_da034cc60575-stringOrderType: frequencyDesc
}
2:class org.apache.spark.ml.feature.StringIndexer: strIdx_cd96b3a27cc3: {
	strIdx_cd96b3a27cc3-handleInvalid: error,
	strIdx_cd96b3a27cc3-inputCol: marital,
	strIdx_cd96b3a27cc3-outputCol: idx_marital,
	strIdx_cd96b3a27cc3-stringOrderType: frequencyDesc
}
3:class org.apache.spark.ml.feature.StringIndexer: strIdx_cdb9382316bb: {
	strIdx_cdb9382316bb-handleInvalid: error,
	strIdx_cdb9382316bb-inputCol: education,
	strIdx_cdb9382316bb-outputCol: idx_education,
	strIdx_cdb9382316bb-stringOrderType: frequenc

### Now, transform the data to form usable for model training and inference

In [147]:
val preparedDF: org.apache.spark.sql.DataFrame = xformPipeline.transform(inputDF)

[36mpreparedDF[39m: [32mDataFrame[39m = [age: double, job: string ... 41 more fields]

In [148]:
preparedDF.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+--------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+------------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|idx_month_name|   vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome

### Examine how the categorical features have transformed

In [212]:
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row}
import org.apache.spark.sql.functions.{col, udf, _}

[32mimport [39m[36morg.apache.spark.sql.types.{StringType, StructType}
[39m
[32mimport [39m[36morg.apache.spark.sql.{Row}
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, udf, _}[39m

In [211]:
val month_crosstab = inputDF.stat.crosstab(originalLabelColname, "month_name")

month_crosstab.show()

+------------+----+----+---+----+----+---+-----+----+---+---+
|y_month_name| apr| aug|dec| jul| jun|mar|  may| nov|oct|sep|
+------------+----+----+---+----+----+---+-----+----+---+---+
|         yes| 539| 655| 89| 649| 559|276|  886| 416|315|256|
|          no|2093|5523| 93|6525|4759|270|12883|3685|403|314|
+------------+----+----+---+----+----+---+-----+----+---+---+



[32mimport [39m[36morg.apache.spark.sql.types.{StringType, StructType}
[39m
[32mimport [39m[36morg.apache.spark.sql.{Row}

[39m
[36mmonth_crosstab[39m: [32mDataFrame[39m = [y_month_name: string, apr: bigint ... 9 more fields]

In [221]:
preparedDF.where(col("month_name") === "may").select(col("month_name"), col("idx_month_name"), col("vec_idx_month_name")).show(2)

+----------+--------------+------------------+
|month_name|idx_month_name|vec_idx_month_name|
+----------+--------------+------------------+
|       may|           0.0|     (9,[0],[1.0])|
|       may|           0.0|     (9,[0],[1.0])|
+----------+--------------+------------------+
only showing top 2 rows



In [222]:
preparedDF.where(col("month_name") === "jul").select(col("month_name"), col("idx_month_name"), col("vec_idx_month_name")).show(2)

+----------+--------------+------------------+
|month_name|idx_month_name|vec_idx_month_name|
+----------+--------------+------------------+
|       jul|           1.0|     (9,[1],[1.0])|
|       jul|           1.0|     (9,[1],[1.0])|
+----------+--------------+------------------+
only showing top 2 rows



In [223]:
preparedDF.where(col("month_name") === "aug").select(col("month_name"), col("idx_month_name"), col("vec_idx_month_name")).show(2)

+----------+--------------+------------------+
|month_name|idx_month_name|vec_idx_month_name|
+----------+--------------+------------------+
|       aug|           2.0|     (9,[2],[1.0])|
|       aug|           2.0|     (9,[2],[1.0])|
+----------+--------------+------------------+
only showing top 2 rows



In [224]:
preparedDF.where(col("month_name") === "jun").select(col("month_name"), col("idx_month_name"), col("vec_idx_month_name")).show(2)

+----------+--------------+------------------+
|month_name|idx_month_name|vec_idx_month_name|
+----------+--------------+------------------+
|       jun|           3.0|     (9,[3],[1.0])|
|       jun|           3.0|     (9,[3],[1.0])|
+----------+--------------+------------------+
only showing top 2 rows



In [209]:
val chisqTestResult = ChiSquareTest.test(preparedDF, "vec_idx_month_name", labelColname).take(1)(0);
// result has 3 parts:
// - pValues
// - degrees of freedom
// - test statistics
val chisqpValues = {chisqTestResult(0)}.asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
val chisqTestStatistics = {chisqTestResult(2)}.asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
val colnames = month_crosstab.columns.slice(1,11)

+------------+----+----+---+----+----+---+-----+----+---+---+
|y_month_name| apr| aug|dec| jul| jun|mar|  may| nov|oct|sep|
+------------+----+----+---+----+----+---+-----+----+---+---+
|         yes| 539| 655| 89| 649| 559|276|  886| 416|315|256|
|          no|2093|5523| 93|6525|4759|270|12883|3685|403|314|
+------------+----+----+---+----+----+---+-----+----+---+---+



[36mmonth_crosstab[39m: [32mDataFrame[39m = [y_month_name: string, apr: bigint ... 9 more fields]
[36mchisqTestResult[39m: [32mRow[39m = [[0.0,6.109124317532633E-11,0.07369442107074264,0.06240103278223397,0.01666732886397504,0.0,0.0,0.0,0.0],WrappedArray(1, 1, 1, 1, 1, 1, 1, 1, 1),[482.83247762738677,42.78524486317104,3.1987534332197667,3.4723915059566437,5.731069508518859,238.75663393025457,777.1928013995296,654.6000147318687,854.2406435112696]]
[36mchisqpValues[39m: [32mlinalg[39m.[32mDenseVector[39m = [0.0,6.109124317532633E-11,0.07369442107074264,0.06240103278223397,0.01666732886397504,0.0,0.0,0.0,0.0]
[36mchisqTestStatistics[39m: [32mlinalg[39m.[32mDenseVector[39m = [482.83247762738677,42.78524486317104,3.1987534332197667,3.4723915059566437,5.731069508518859,238.75663393025457,777.1928013995296,654.6000147318687,854.2406435112696]
[36mcolnames[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"apr"[39m,
  [32m"aug"[39m,
  [32m"dec"[39m,
  

As can be seen from this example below - the string encoder and on-hot encoder take the highest frequency value
and encode it as 0, and then proceeding with the next higher frequency and so on.

The matching one-hot encoded values can be easily identified in this example.

In [225]:
preparedDF.stat.crosstab(originalLabelColname, "education").show()

+-----------+--------+--------+--------+-----------+----------+-------------------+-----------------+-------+
|y_education|basic.4y|basic.6y|basic.9y|high.school|illiterate|professional.course|university.degree|unknown|
+-----------+--------+--------+--------+-----------+----------+-------------------+-----------------+-------+
|         no|    3748|    2104|    5572|       8484|        14|               4648|            10498|   1480|
|        yes|     428|     188|     473|       1031|         4|                595|             1670|    251|
+-----------+--------+--------+--------+-----------+----------+-------------------+-----------------+-------+



In [227]:
preparedDF.stat.crosstab(labelColname, "idx_education").show()

+-------------------+-----+----+----+----+----+----+----+---+
|label_idx_education|  0.0| 1.0| 2.0| 3.0| 4.0| 5.0| 6.0|7.0|
+-------------------+-----+----+----+----+----+----+----+---+
|                0.0|10498|8484|5572|4648|3748|2104|1480| 14|
|                1.0| 1670|1031| 473| 595| 428| 188| 251|  4|
+-------------------+-----+----+----+----+----+----+----+---+



In [228]:
categorical_features.map( x => preparedDF.stat.crosstab(originalLabelColname, x).show())

+-----+------+-----------+------------+---------+----------+-------+-------------+--------+-------+----------+----------+-------+
|y_job|admin.|blue-collar|entrepreneur|housemaid|management|retired|self-employed|services|student|technician|unemployed|unknown|
+-----+------+-----------+------------+---------+----------+-------+-------------+--------+-------+----------+----------+-------+
|  yes|  1352|        638|         124|      106|       328|    434|          149|     323|    275|       730|       144|     37|
|   no|  9070|       8616|        1332|      954|      2596|   1286|         1272|    3646|    600|      6013|       870|    293|
+-----+------+-----------+------------+---------+----------+-------+-------------+--------+-------+----------+----------+-------+

+---------+--------+-------+------+-------+
|y_marital|divorced|married|single|unknown|
+---------+--------+-------+------+-------+
|       no|    4136|  22396|  9948|     68|
|      yes|     476|   2532|  1620|     12|

[36mres227[39m: [32mArray[39m[[32mUnit[39m] = [33mArray[39m((), (), (), (), (), (), (), (), ())

## Select subset of features

Let us create an additional model which uses a selected subset of features and evaluate this model's performance vs. the one with all the features.

In [229]:
val best_subset_categorical = Array(
    "campaign", "month_name", "job", "day_of_week", "education", "marital"
)

val best_subset_numerical = Array(
    "duration", "pdays", "euribor3m", "cons_price_idx", "age"
)

[36mbest_subset_categorical[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"campaign"[39m,
  [32m"month_name"[39m,
  [32m"job"[39m,
  [32m"day_of_week"[39m,
  [32m"education"[39m,
  [32m"marital"[39m
)
[36mbest_subset_numerical[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"duration"[39m,
  [32m"pdays"[39m,
  [32m"euribor3m"[39m,
  [32m"cons_price_idx"[39m,
  [32m"age"[39m
)

Fit a new transformation pipeline with subset of features.

In [230]:
val xformPipeline2 = firTransformDataPipeline(inputDF, originalLabelColname, best_subset_categorical, best_subset_numerical)

23/01/30 15:48:32 INFO SparkMLDemo: Indexing categorical variables.
23/01/30 15:48:32 INFO SparkMLDemo: On-hot encoding all categorical variables.
23/01/30 15:48:32 INFO SparkMLDemo: Assembled together all numerical variables.
23/01/30 15:48:32 INFO SparkMLDemo: Scaled all numerical variables by min-max scaler.
23/01/30 15:48:32 INFO SparkMLDemo: Assembling pipeline with the following transformations: 
strIdx_137ba1783896 
strIdx_b09b13da7231 
strIdx_cfb343f416ed 
strIdx_7bb6f881461d 
strIdx_8ecf1d33f435 
strIdx_06062ff464e4 
strIdx_68c715321a64 
oneHotEncoder_70bc9bd017bc 
oneHotEncoder_2794034dd822 
oneHotEncoder_f07f9736b2e7 
oneHotEncoder_3ef22556dc1c 
oneHotEncoder_eff9887e73d9 
oneHotEncoder_3ab2df8bc876 
VectorAssembler: uid=vecAssembler_60128f25a3ca, handleInvalid=error, numInputCols=5 
minMaxScal_e7ce508fa355 
VectorAssembler: uid=vecAssembler_5566e14c9193, handleInvalid=error, numInputCols=7
23/01/30 15:48:35 INFO SparkMLDemo: Completed fitting the pipeline


[36mxformPipeline2[39m: [32mPipelineModel[39m = pipeline_c49d59177ceb

In [231]:
val preparedDF2 = xformPipeline2.transform(inputDF)

[36mpreparedDF2[39m: [32mDataFrame[39m = [age: double, job: string ... 35 more fields]

In [232]:
preparedDF2.show(4)

+----+---------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+------------+--------------+-------+---------------+-------------+-----------+----------------+------------------+--------------+-------------------+-----------------+---------------+--------------------+--------------------+--------------------+
| age|      job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_campaign|idx_month_name|idx_job|idx_day_of_week|idx_education|idx_marital|vec_idx_campaign|vec_idx_month_name|   vec_idx_job|vec_idx_day_of_week|vec_idx_education|vec_idx_marital|   numericalfeatures|      scaledfeatures|            features|
+----+---------+-------+-----------+---------+-------+----+----------+----------+-

## Split data into Test-Train sets

Randomly select records to split the prepared data into train-test datasets.

Here a ratio of 90% training and 10% testing has been specified.

In [241]:
val Array(trainingDF, testDF) = preparedDF.randomSplit(Array(0.9, 0.1))

[36mtrainingDF[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]
[36mtestDF[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]

In [242]:
// At this point, these two datasets may be "cached" for improving Spark performance:
trainingDF.cache()
testDF.cache()

[36mres241_0[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]
[36mres241_1[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 41 more fields]

In [243]:
trainingDF.show(5)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+-------+-----------+-------------+-------------+-----------+--------+---------------+------------+--------------+---------------+---------------+-----------------+-----------------+---------------+-------------+-------------------+----------------+------------------+--------------------+--------------------+--------------------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_job|idx_marital|idx_education|idx_defaulted|idx_housing|idx_loan|idx_day_of_week|idx_poutcome|idx_month_name|    vec_idx_job|vec_idx_marital|vec_idx_education|vec_idx_defaulted|vec_idx_housing| vec_idx_loan|vec_idx_day_of_week|vec_idx_poutcome|v

Repeat test-train split for best subset data as well:

In [233]:
val Array(trainingDF2, testDF2) = preparedDF2.randomSplit(Array(0.9, 0.1))

[36mtrainingDF2[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]
[36mtestDF2[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]

In [234]:
trainingDF2.cache()
testDF2.cache()

[36mres233_0[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]
[36mres233_1[39m: [32mDataset[39m[[32mRow[39m] = [age: double, job: string ... 35 more fields]

In [235]:
trainingDF2.show(5)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+------------+--------------+-------+---------------+-------------+-----------+----------------+------------------+---------------+-------------------+-----------------+---------------+--------------------+--------------------+--------------------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_campaign|idx_month_name|idx_job|idx_day_of_week|idx_education|idx_marital|vec_idx_campaign|vec_idx_month_name|    vec_idx_job|vec_idx_day_of_week|vec_idx_education|vec_idx_marital|   numericalfeatures|      scaledfeatures|            features|
+----+-------+-------+-----------+---------+-------+----+----------+----------+-----

---
## Train a Logistic Regression Model

Use the Spark Mlib libraries to train different machine learning models on this dataset.

In [236]:
import org.apache.spark.ml.classification.{GBTClassifier, LogisticRegression, LogisticRegressionModel, RandomForestClassifier}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit, TrainValidationSplitModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

[32mimport [39m[36morg.apache.spark.ml.classification.{GBTClassifier, LogisticRegression, LogisticRegressionModel, RandomForestClassifier}
[39m
[32mimport [39m[36morg.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit, TrainValidationSplitModel}
[39m
[32mimport [39m[36morg.apache.spark.ml.evaluation.BinaryClassificationEvaluator[39m

In [237]:
val lr = new LogisticRegression()

[36mlr[39m: [32mLogisticRegression[39m = logreg_c3d52860028e

Set parameters using setter methods.

In [238]:
lr.setMaxIter(100)
    .setFamily("binomial")
    .setFitIntercept(true)
    .setThreshold(0.35)
    .setLabelCol(labelColname);

[36mres237[39m: [32mLogisticRegression[39m = logreg_c3d52860028e

In [239]:
println(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial. (default: auto, current: binomial)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true, current: true)
labelCol: label column name (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. (undefined)
maxBlockSizeInMB: Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a pa

### List all parameters of the model:

In [240]:
lr.extractParamMap().toSeq.foreach(
    x => println(
        "Parameter %s = %s".format(
            x.param.toString substring (1 + x.param.toString indexOf "__" ),
            x.value)
    )
)

Parameter _threshold = 0.35
Parameter _elasticNetParam = 0.0
Parameter _predictionCol = prediction
Parameter _tol = 1.0E-6
Parameter _labelCol = label
Parameter _maxIter = 100
Parameter _featuresCol = features
Parameter _aggregationDepth = 2
Parameter _regParam = 0.0
Parameter _family = binomial
Parameter _rawPredictionCol = rawPrediction
Parameter _maxBlockSizeInMB = 0.0
Parameter _probabilityCol = probability
Parameter _fitIntercept = true
Parameter _standardization = true


In [244]:
// Now Learn a LogisticRegression model. This uses the parameters stored in lr.
val lrModel1 = lr.fit(trainingDF);

23/01/30 15:54:15 INFO LBFGS: Step Size: 3.574
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.316549 (rel: 0.104) 0.308898
23/01/30 15:54:15 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.245978 (rel: 0.223) 0.0965974
23/01/30 15:54:15 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:15 INFO LBFGS: Val and Grad Norm: 0.232709 (rel: 0.0539) 0.0500178
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.222595 (rel: 0.0435) 0.0367947
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.214510 (rel: 0.0363) 0.0243518
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.211601 (rel: 0.0136) 0.0194000
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.210507 (rel: 0.00517) 0.00815788
23/01/30 15:54:16 INFO LBFGS: Step Size: 1.000
23/01/30 15:54:16 INFO LBFGS: Val and Grad Norm: 0.

[36mlrModel1[39m: [32mLogisticRegressionModel[39m = LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=50

### Understand the fitted model

Extract the parameters and performance metrics of the fitted model.

In [245]:
print("Prediction labels: ")
lrModel1.summary.labels.foreach( x => print(" " + x))
print("\nTrue Positive Rate By Label: ")
lrModel1.summary.truePositiveRateByLabel.foreach( x => print(" " + x))
print("\nRecall By Label: ")
lrModel1.summary.recallByLabel.foreach( x => print(" " + x))
print("\nPrecision By Label: ")
lrModel1.summary.precisionByLabel.foreach( x => print(" " + x))
print("\nFalse Positive Rate By Label: ")
lrModel1.summary.falsePositiveRateByLabel.foreach( x => print(" " + x))
print("\nF-Measure By Label: ")
lrModel1.summary.fMeasureByLabel.foreach( x => print(" " + x))
println("\nAccuracy: " + lrModel1.summary.accuracy)
println("Total no of Iterations: " + lrModel1.summary.totalIterations)
// labelCol // label

Prediction labels:  0.0 1.0
True Positive Rate By Label:  0.9540030441400305 0.5675482487491065
Recall By Label:  0.9540030441400305 0.5675482487491065
Precision By Label:  0.9452554744525548 0.611867454405343
False Positive Rate By Label:  0.43245175125089347 0.04599695585996956
F-Measure By Label:  0.9496091145991152 0.5888751545117429
Accuracy: 0.910222150241585
Total no of Iterations: 100


### Wrap these commands into a convenience function:

In [246]:
/**
Extract model fit performance from a binary classifiction model
*/
def getModelFitSummary(sc:SparkContext, fittedModel:LogisticRegressionModel):DataFrame = {
    if( fittedModel.numClasses == 2){
        val summaryDF = sc.parallelize(
            Array(
              ("Prediction labels", fittedModel.summary.labels(0), fittedModel.summary.labels(1) )
            , ("True Positive Rate", fittedModel.summary.truePositiveRateByLabel(0), fittedModel.summary.truePositiveRateByLabel(1) )
            , ("Recall", fittedModel.summary.recallByLabel(0), fittedModel.summary.recallByLabel(1) )
            , ("Precision", fittedModel.summary.precisionByLabel(0), fittedModel.summary.precisionByLabel(1) )
            , ("False Positive Rate", fittedModel.summary.falsePositiveRateByLabel(0), fittedModel.summary.falsePositiveRateByLabel(1) )
            , ("F-measure", fittedModel.summary.fMeasureByLabel(0), fittedModel.summary.fMeasureByLabel(1) )
            , ("Total Accuracy", 0.0, fittedModel.summary.accuracy )
            , ("Area Under ROC", 0.0, fittedModel.binarySummary.areaUnderROC)
            )
            ).toDF(Array("Metric", "Class_0", "Class_1"): _*)

        return summaryDF
    }
    return null
}

defined [32mfunction[39m [36mgetModelFitSummary[39m

In [247]:
val lrmodel1_summary = getModelFitSummary(spark.sparkContext, lrModel1)

lrmodel1_summary.show(10)

+-------------------+-------------------+-------------------+
|             Metric|            Class_0|            Class_1|
+-------------------+-------------------+-------------------+
|  Prediction labels|                0.0|                1.0|
| True Positive Rate| 0.9540030441400305| 0.5675482487491065|
|             Recall| 0.9540030441400305| 0.5675482487491065|
|          Precision| 0.9452554744525548|  0.611867454405343|
|False Positive Rate|0.43245175125089347|0.04599695585996956|
|          F-measure| 0.9496091145991152| 0.5888751545117429|
|     Total Accuracy|                0.0|  0.910222150241585|
|     Area Under ROC|                0.0| 0.9366489871543373|
+-------------------+-------------------+-------------------+



[36mlrmodel1_summary[39m: [32mDataFrame[39m = [Metric: string, Class_0: double ... 1 more field]

In [248]:
lrModel1.coefficients

[36mres247[39m: [32mlinalg[39m.[32mVector[39m = [0.08929699811858023,-0.12991105334224565,0.10660142417505587,0.0035108312116281085,0.05015596724370776,0.4334691081349534,-0.11275155612992703,-0.08362680602814758,0.0022494988198624158,0.1832846920962329,0.2533553293269838,0.3790928746135225,0.4567786711449594,0.32985034825169035,-0.2674248674989839,-0.4256658042471482,-0.49222255172482904,-0.3990814004052108,-0.47235513585234873,-0.3995417988274074,-0.2903718016139603,0.1902187433847383,-0.12245630399105265,0.02796183082914988,0.04008613501539039,0.08763729127949141,0.032406713072771494,0.08293763583846984,-0.10119501477903015,0.20084202716608857,0.11343769207386381,-0.5396409612701998,-1.0112351384318294,-0.8447128285517715,0.10421454370594475,0.6135613168062243,-0.6207939396904261,-0.5378100985337787,-0.25508736139086624,0.045457782736203954,0.05287103483647745,1.5511249151553137,0.07652248872014075,23.072170029479025,-0.8945340833066275,-6.686053732452271,3.819435183913021,0.0

In [249]:
println(lrModel1.extractParamMap())

{
	logreg_c3d52860028e-aggregationDepth: 2,
	logreg_c3d52860028e-elasticNetParam: 0.0,
	logreg_c3d52860028e-family: binomial,
	logreg_c3d52860028e-featuresCol: features,
	logreg_c3d52860028e-fitIntercept: true,
	logreg_c3d52860028e-labelCol: label,
	logreg_c3d52860028e-maxBlockSizeInMB: 0.0,
	logreg_c3d52860028e-maxIter: 100,
	logreg_c3d52860028e-predictionCol: prediction,
	logreg_c3d52860028e-probabilityCol: probability,
	logreg_c3d52860028e-rawPredictionCol: rawPrediction,
	logreg_c3d52860028e-regParam: 0.0,
	logreg_c3d52860028e-standardization: true,
	logreg_c3d52860028e-threshold: 0.35,
	logreg_c3d52860028e-tol: 1.0E-6
}


In [250]:
lrModel1.numFeatures

[36mres249[39m: [32mInt[39m = [32m50[39m

In [251]:
lrModel1.numClasses

[36mres250[39m: [32mInt[39m = [32m2[39m

In [253]:
lrModel1.write.overwrite().save("/tmp/lr_model1")

23/01/30 15:55:36 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 15:55:36 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 15:55:37 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 15:55:37 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 15:55:37 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 15:55:37 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


### Hyperparameter Tuning

We can use grid-search to find the best set of parameters for this model.

In [255]:
// We use a ParamGridBuilder to construct a grid of hyper-parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGridLR = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.0025, 0.005))
  .addGrid(lr.elasticNetParam, Array(0.0075, 0.01))
  .build()

[36mparamGridLR[39m: [32mArray[39m[[32mml[39m.[32mparam[39m.[32mParamMap[39m] = [33mArray[39m(
  {
	logreg_c3d52860028e-elasticNetParam: 0.0075,
	logreg_c3d52860028e-regParam: 0.0025
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-regParam: 0.0025
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.0075,
	logreg_c3d52860028e-regParam: 0.005
},
  {
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-regParam: 0.005
}
)

Define a performance metric to be used by the grid-search to identify the best performing model.

Here, we define the performance metric to be used as the "Area under the Precision-recall curve".

In [256]:
val binaryEvaluator = new BinaryClassificationEvaluator()
  .setLabelCol(labelColname)
  .setMetricName("areaUnderPR");

[36mbinaryEvaluator[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_a664d6d37f22, metricName=areaUnderPR, numBins=1000

In [257]:
val xfoldValidator = new CrossValidator()
  .setEstimator(lr)
  .setNumFolds(10)
  .setEvaluator(binaryEvaluator)
  .setEstimatorParamMaps(paramGridLR)
  .setCollectSubModels(false)
  .setParallelism(2)

[36mxfoldValidator[39m: [32mCrossValidator[39m = cv_310b21052e7d

In [258]:
// Run train validation split, and choose the best set of parameters.
val cvmodel1 = xfoldValidator.fit(trainingDF2)

23/01/30 15:57:03 INFO OWLQN: Step Size: 0.9854
23/01/30 15:57:03 INFO OWLQN: Val and Grad Norm: 0.307416 (rel: 0.126) 0.176688
23/01/30 15:57:03 INFO OWLQN: Step Size: 0.9854
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.307426 (rel: 0.126) 0.176672
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.240222 (rel: 0.219) 0.0721266
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.240240 (rel: 0.219) 0.0721294
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.229298 (rel: 0.0455) 0.0377960
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.229317 (rel: 0.0455) 0.0377921
23/01/30 15:57:04 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:04 INFO OWLQN: Val and Grad Norm: 0.221291 (rel: 0.0349) 0.0268653
23/01/30 15:57:05 INFO OWLQN: Step Size: 1.000
23/01/30 15:57:05 INFO OWLQN: Val and Grad Norm: 0.221

[36mcvmodel1[39m: [32mCrossValidatorModel[39m = CrossValidatorModel: uid=cv_310b21052e7d, bestModel=LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=80, numFolds=10

In [259]:
val lrModel2 = cvmodel1.bestModel

[36mlrModel2[39m: [32mModel[39m[[32m_[39m] = LogisticRegressionModel: uid=logreg_c3d52860028e, numClasses=2, numFeatures=80

In [260]:
// we can view the hyper-parameters for the best model selected by grid-search.
// This prints the parameter (name: value) pairs, where names are unique IDs for this instance.

println(s"---Cross-fold validated Logistic Regression Model was fit using parameters:---${lrModel2.extractParamMap}")

---Cross-fold validated Logistic Regression Model was fit using parameters:---{
	logreg_c3d52860028e-aggregationDepth: 2,
	logreg_c3d52860028e-elasticNetParam: 0.01,
	logreg_c3d52860028e-family: binomial,
	logreg_c3d52860028e-featuresCol: features,
	logreg_c3d52860028e-fitIntercept: true,
	logreg_c3d52860028e-labelCol: label,
	logreg_c3d52860028e-maxBlockSizeInMB: 0.0,
	logreg_c3d52860028e-maxIter: 100,
	logreg_c3d52860028e-predictionCol: prediction,
	logreg_c3d52860028e-probabilityCol: probability,
	logreg_c3d52860028e-rawPredictionCol: rawPrediction,
	logreg_c3d52860028e-regParam: 0.0025,
	logreg_c3d52860028e-standardization: true,
	logreg_c3d52860028e-threshold: 0.35,
	logreg_c3d52860028e-tol: 1.0E-6
}


In [261]:
val lrmodel2_summary = getModelFitSummary(spark.sparkContext, lrModel2.asInstanceOf[LogisticRegressionModel])

lrmodel2_summary.show(10)

+-------------------+------------------+-------------------+
|             Metric|           Class_0|            Class_1|
+-------------------+------------------+-------------------+
|  Prediction labels|               0.0|                1.0|
| True Positive Rate| 0.957126830599436| 0.5419262098706277|
|             Recall| 0.957126830599436| 0.5419262098706277|
|          Precision|0.9428895725678784| 0.6153427638737758|
|False Positive Rate|0.4580737901293723|0.04287316940056396|
|          F-measure|0.9499548600662052| 0.5763057324840765|
|     Total Accuracy|               0.0| 0.9104831112905396|
|     Area Under ROC|               0.0| 0.9326249601072021|
+-------------------+------------------+-------------------+



[36mlrmodel2_summary[39m: [32mDataFrame[39m = [Metric: string, Class_0: double ... 1 more field]

In [262]:
lrmodel1_summary.createOrReplaceTempView("m1perf");
lrmodel2_summary.createOrReplaceTempView("m2perf");

spark.sql(
    """SELECT m1.Metric,
    m1.Class_0 as Model1_Class0, m1.Class_1 as Model1_Class1,
    m2.Class_0 as Model2_Class0, m2.Class_1 as Model2_Class1
    from m1perf m1
    inner join m2perf m2 on m2.Metric=m1.Metric"""
    ).show();

+-------------------+-------------------+-------------------+------------------+-------------------+
|             Metric|      Model1_Class0|      Model1_Class1|     Model2_Class0|      Model2_Class1|
+-------------------+-------------------+-------------------+------------------+-------------------+
|     Area Under ROC|                0.0| 0.9366489871543373|               0.0| 0.9326249601072021|
|          F-measure| 0.9496091145991152| 0.5888751545117429|0.9499548600662052| 0.5763057324840765|
|False Positive Rate|0.43245175125089347|0.04599695585996956|0.4580737901293723|0.04287316940056396|
|          Precision| 0.9452554744525548|  0.611867454405343|0.9428895725678784| 0.6153427638737758|
|  Prediction labels|                0.0|                1.0|               0.0|                1.0|
|             Recall| 0.9540030441400305| 0.5675482487491065| 0.957126830599436| 0.5419262098706277|
|     Total Accuracy|                0.0|  0.910222150241585|               0.0| 0.91048311

---
## Evaluate model performance on test set

In [263]:
// Define a convenience function to calculate log-loss:
def logScoringMetric(inputDF: DataFrame, predictProb:String="p1", labelCol:String = "label"): Double = {
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, udf, _}
    import spark.implicits._

    val testResultsLoglossDF = inputDF.withColumn(
      "loglossT1",
      col("label") * org.apache.spark.sql.functions.log(col("p1")) * -1.0
    ).withColumn(
      colName = "loglossT2",
      org.apache.spark.sql.functions.expr("1 - p1")
    ).withColumn(
      "logloss",
      col("loglossT1") + org.apache.spark.sql.functions.log(col("loglossT2")) * expr("1 - label")
    ).drop(colNames = "loglossT1", "loglossT2")

    val logloss: Any = testResultsLoglossDF.select(avg("logloss")).collect()(0)(0);

    return logloss.asInstanceOf[Double]
}

defined [32mfunction[39m [36mlogScoringMetric[39m

In [264]:
// define a convenience function to split probability vector column into classs 1 probability column
def addBinaryProbabilities(inputDF: org.apache.spark.sql.DataFrame, probCol:String = "probability"): DataFrame = {

    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, udf, _}
    import spark.implicits._

    // Breakup vector field "probability" into prob of class "1":
    // Create a UDF to convert VectorUDT to ArrayType
    val vecToArray = udf((xs: linalg.Vector) => xs.toArray)
    // Add a ArrayType Column: PredictProbabArr
    val dfProbArr = inputDF.withColumn("PredictProbabArr", vecToArray($"probability"))
    // Array of element names that need to be fetched:
    val elements = Array("p0", "p1")
    // Create a SQL-like expression using the array
    val sqlExpr = elements.zipWithIndex.map { case (alias, idx) => col("PredictProbabArr").getItem(idx).as(alias) }
    //add the columns to the dataframe
    val testResultWithProbsDF = dfProbArr.select((col("*") +: sqlExpr): _*)
      .drop(colNames = "PredictProbabArr", "p0")

    return testResultWithProbsDF
}

defined [32mfunction[39m [36maddBinaryProbabilities[39m

In [266]:
// use specific column name to store class1 probabilities
val class1ProbColName = "p1"

[36mclass1ProbColName[39m: [32mString[39m = [32m"p1"[39m

In [271]:
val binEvalPR = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderPR")
  .setLabelCol(labelColname)

[36mbinEvalPR[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_81eac8a5b98c, metricName=areaUnderPR, numBins=1000

In [272]:
val binEvalROC = new BinaryClassificationEvaluator().setLabelCol(labelColname)

[36mbinEvalROC[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_2a47427bae4b, metricName=areaUnderROC, numBins=1000

### Evaluate Logistic Regression Models

In [275]:
// Make predictions on test data using the Transformer.transform() method.
// Note: model.transform will only use the 'features' column.
println("Generating inferences on test set for -> Logistic Regression models:")

val testResultLR1 = lrModel1.transform(testDF);
val testResultLR2 = lrModel2.transform(testDF2);

Generating inferences on test set for -> Logistic Regression models:


[36mtestResultLR1[39m: [32mDataFrame[39m = [age: double, job: string ... 44 more fields]
[36mtestResultLR2[39m: [32mDataFrame[39m = [age: double, job: string ... 38 more fields]

In [273]:
println("Logistic Regression Model 1: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultLR1).doubleValue());

println("Logistic Regression Model 1: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultLR1).doubleValue());

Logistic Regression Model 1: For test set, Area under Precision-Recall curve is: 0.5875903105939474
Logistic Regression Model 1: For test set, Area under ROC curve is: 0.9295516336693499


In [274]:
println("Logistic Regression Model 2: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultLR2).doubleValue());

println("Logistic Regression Model 2: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultLR2).doubleValue());

Logistic Regression Model 2: For test set, Area under Precision-Recall curve is: 0.5851839562542677
Logistic Regression Model 2: For test set, Area under ROC curve is: 0.9287183059783829


---
## Train a Gradient Boosted Trees Model

In [276]:
val gbt = new GBTClassifier()
      .setLabelCol(labelColname)
      .setFeaturesCol("features")
      .setMaxIter(10)
      .setStepSize(0.4)
      .setMaxBins(32)
      .setFeatureSubsetStrategy("auto")

[36mgbt[39m: [32mGBTClassifier[39m = gbtc_78e536cb4af2

In [277]:
// Print out the parameters, documentation, and any default values.
println(s"---Gradient Boosted Decision Trees parameters for training:---\n ${gbt.explainParams()}\n")

---Gradient Boosted Decision Trees parameters for training:---
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: all, current: auto)
featuresCol: features column name (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance)
labelCol: label column name (default: label, current: label)
leafC

In [278]:
val gbdtModel1 = gbt.fit(trainingDF)

[36mgbdtModel1[39m: [32mml[39m.[32mclassification[39m.[32mGBTClassificationModel[39m = GBTClassificationModel: uid = gbtc_78e536cb4af2, numTrees=10, numClasses=2, numFeatures=50

In [279]:
gbdtModel1.numClasses

[36mres278[39m: [32mInt[39m = [32m2[39m

In [280]:
val gbdtParamGrid = new ParamGridBuilder()
  .addGrid(gbt.minInstancesPerNode, Array(4,8))
  .addGrid(gbt.maxDepth, Array(4, 5))
  .build()

[36mgbdtParamGrid[39m: [32mArray[39m[[32mml[39m.[32mparam[39m.[32mParamMap[39m] = [33mArray[39m(
  {
	gbtc_78e536cb4af2-maxDepth: 4,
	gbtc_78e536cb4af2-minInstancesPerNode: 4
},
  {
	gbtc_78e536cb4af2-maxDepth: 5,
	gbtc_78e536cb4af2-minInstancesPerNode: 4
},
  {
	gbtc_78e536cb4af2-maxDepth: 4,
	gbtc_78e536cb4af2-minInstancesPerNode: 8
},
  {
	gbtc_78e536cb4af2-maxDepth: 5,
	gbtc_78e536cb4af2-minInstancesPerNode: 8
}
)

In [281]:
val binaryEvaluator = new BinaryClassificationEvaluator()
  .setLabelCol(labelColname)
  .setMetricName("areaUnderPR");

[36mbinaryEvaluator[39m: [32mBinaryClassificationEvaluator[39m = BinaryClassificationEvaluator: uid=binEval_181899c3ccfa, metricName=areaUnderPR, numBins=1000

In [282]:
val xfoldValidator = new CrossValidator()
  .setEstimator(gbt)
  .setNumFolds(10)
  .setEvaluator(binaryEvaluator)
  .setEstimatorParamMaps(gbdtParamGrid)
  .setCollectSubModels(false)
  .setParallelism(2)

[36mxfoldValidator[39m: [32mCrossValidator[39m = cv_5d3faabb4550

In [291]:
// Run train validation split, and choose the best set of parameters.
logger.info("Started training GBDT model via x-fold cross validation")
val gbdtModelCV = xfoldValidator.fit(trainingDF2)
logger.info("Finished training the GBDT model.")

23/01/30 16:28:58 INFO SparkMLDemo: Started training GBDT model via x-fold cross validation
23/01/30 16:35:12 INFO SparkMLDemo: Finished training the GBDT model.


[36mgbdtModelCV[39m: [32mCrossValidatorModel[39m = CrossValidatorModel: uid=cv_5d3faabb4550, bestModel=GBTClassificationModel: uid = gbtc_78e536cb4af2, numTrees=10, numClasses=2, numFeatures=80, numFolds=10

In [292]:
val gbdtModel2 = gbdtModelCV.bestModel

[36mgbdtModel2[39m: [32mModel[39m[[32m_[39m] = GBTClassificationModel: uid = gbtc_78e536cb4af2, numTrees=10, numClasses=2, numFeatures=80

In [293]:
// This prints the parameter (name: value) pairs, where names are unique IDs for this instance.
println( gbdtModel2.extractParamMap )

{
	gbtc_78e536cb4af2-cacheNodeIds: false,
	gbtc_78e536cb4af2-checkpointInterval: 10,
	gbtc_78e536cb4af2-featureSubsetStrategy: auto,
	gbtc_78e536cb4af2-featuresCol: features,
	gbtc_78e536cb4af2-impurity: variance,
	gbtc_78e536cb4af2-labelCol: label,
	gbtc_78e536cb4af2-leafCol: ,
	gbtc_78e536cb4af2-lossType: logistic,
	gbtc_78e536cb4af2-maxBins: 32,
	gbtc_78e536cb4af2-maxDepth: 4,
	gbtc_78e536cb4af2-maxIter: 10,
	gbtc_78e536cb4af2-maxMemoryInMB: 256,
	gbtc_78e536cb4af2-minInfoGain: 0.0,
	gbtc_78e536cb4af2-minInstancesPerNode: 8,
	gbtc_78e536cb4af2-minWeightFractionPerNode: 0.0,
	gbtc_78e536cb4af2-predictionCol: prediction,
	gbtc_78e536cb4af2-probabilityCol: probability,
	gbtc_78e536cb4af2-rawPredictionCol: rawPrediction,
	gbtc_78e536cb4af2-seed: -1287390502,
	gbtc_78e536cb4af2-stepSize: 0.4,
	gbtc_78e536cb4af2-subsamplingRate: 1.0,
	gbtc_78e536cb4af2-validationTol: 0.01
}


### Evaluate Gradient Boosted Decision Trees Model

In [294]:
val testResultGBDT1 = gbdtModel1.transform(testDF);
val testResultGBDT2 = gbdtModel2.transform(testDF2);

[36mtestResultGBDT1[39m: [32mDataFrame[39m = [age: double, job: string ... 44 more fields]
[36mtestResultGBDT2[39m: [32mDataFrame[39m = [age: double, job: string ... 38 more fields]

In [289]:
println(
    "Gradient Boosted Decision Trees Model 1: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultGBDT1).doubleValue()
);

println(
    "Gradient Boosted Decision Trees Model 1: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultGBDT1).doubleValue()
);

Gradient Boosted Decision Trees Model 1: For test set, Area under Precision-Recall curve is: 0.6252967079198892
Gradient Boosted Decision Trees Model 1: For test set, Area under ROC curve is: 0.9387421301490524


In [295]:
println("Gradient Boosted Decision Trees Model 2: For test set, Area under Precision-Recall curve is: " + binEvalPR.evaluate(testResultGBDT2).doubleValue());

println("Gradient Boosted Decision Trees Model 2: For test set, Area under ROC curve is: " + binEvalROC.evaluate(testResultGBDT2).doubleValue());

Gradient Boosted Decision Trees Model 2: For test set, Area under Precision-Recall curve is: 0.6671679471854852
Gradient Boosted Decision Trees Model 2: For test set, Area under ROC curve is: 0.9450175728633119


## Persisting Models to Storage

All models and pipelines can be saved to disk.

The default data format for saving to disk is Parquet which also compresses the data structure using SNAPPY compression.

In [278]:
println("Now writing fitted LR model 1 to disk at: /tmp/lrmodel1")
lrModel1.write.overwrite().save("/tmp/lrmodel1")

Now writing fitted LR model 1 to disk at: /tmp/lrmodel1


23/01/30 01:21:31 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:32 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:32 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:32 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:32 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


In [279]:
println("Now writing fitted LR model 2 to disk at: /tmp/lrmodel2")
lrModel1.write.overwrite().save("/tmp/lrmodel2")

Now writing fitted LR model 2 to disk at: /tmp/lrmodel2


23/01/30 01:21:33 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:33 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:33 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:33 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/01/30 01:21:33 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:33 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


In [296]:
// first, remove any vector columns since they cannot be written to a csv file:
var droppedTestDF = testResultLR1.drop("features", "numericalfeatures", "scaledfeatures",
      "rawPrediction", "probability", "idx_job", "idx_marital", "idx_education", "idx_defaulted",
      "idx_housing", "idx_loan", "idx_day_of_week", "idx_poutcome", "vec_idx_job", "vec_idx_marital",
      "vec_idx_education", "vec_idx_defaulted", "vec_idx_housing", "vec_idx_loan", "vec_idx_day_of_week",
      "vec_idx_poutcome")

droppedTestDF.show(4)

+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+--------------+------------------+----------+
| age|    job|marital|  education|defaulted|housing|loan|contact_no|month_name|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|label|idx_month_name|vec_idx_month_name|prediction|
+----+-------+-------+-----------+---------+-------+----+----------+----------+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+-----+--------------+------------------+----------+
|18.0|student| single|   basic.4y|       no|    yes| yes|  cellular|       apr|        thu|   184.0|     2.0|999.0|     0.0|nonexistent|        -1.8|        93.075|        -47.1|    1.365|     5099.1| no|  0.0|           5.0|     (9

In [281]:
droppedTestDF.coalesce(numPartitions = 1)
      .write.option("header", value = true)
      .mode(saveMode = "overwrite")
      .csv("/tmp/lr1_predictions.csv");

23/01/30 01:21:42 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/01/30 01:21:42 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false


---
## Apache Spark GraphX Demo


In [63]:
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [61]:
// define convenience function to print all edges of a graph:
def printAllEdges( graph: Graph[(String, String), String] ): Unit = {
    val facts: RDD[String] = graph.triplets.map(triplet => 
      "(" + triplet.srcAttr._1 + ") -- " + triplet.attr + " -- (" + triplet.dstAttr._1 + ")" );

    facts.collect.foreach(println(_))
}

defined [32mfunction[39m [36mprintAllEdges[39m

In [62]:
def printGraphProperties(graph: Graph[(String, String), String] ): Unit = {
    // graph operators:
    println( "Num of edges = " + graph.numEdges )
    println( "Num of vertices = " + graph.numVertices )
    println( "Num of inDegrees = " + graph.inDegrees.count() )
    println( "Num of outDegrees = " + graph.outDegrees.count() )
    println( "Num of degrees = " + graph.degrees.count() )
}

defined [32mfunction[39m [36mprintGraphProperties[39m

### Sample Graph

In [64]:
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize( Array(
        (3L, ("rxin", "student"))
      , (7L, ("jgonzal", "postdoc"))
      , (1L, ("somebody", "postdoc"))
      , (5L, ("franklin", "prof"))
      , (2L, ("istoica", "prof"))
      , (10L, ("hoityToity", "student"))
     )
   )

[36musers[39m: [32mRDD[39m[([32mVertexId[39m, ([32mString[39m, [32mString[39m))] = ParallelCollectionRDD[123] at parallelize at cmd63.sc:2

In [65]:
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(
      Array(
      Edge(3L, 7L, "collab")
      , Edge(5L, 3L, "advisor")
      , Edge(2L, 5L, "colleague")
      , Edge(5L, 7L, "pi")
      , Edge(10L, 5L, "friend")
      , Edge(10L, 1L, "friend")
      )
    )

[36mrelationships[39m: [32mRDD[39m[[32mEdge[39m[[32mString[39m]] = ParallelCollectionRDD[124] at parallelize at cmd64.sc:2

In [66]:
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

[36mdefaultUser[39m: ([32mString[39m, [32mString[39m) = ([32m"John Doe"[39m, [32m"Missing"[39m)
[36mgraph[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@5ae6fd3f

In [67]:
println("Caching the graph:")
graph.cache()

Caching the graph:


[36mres66_1[39m: [32mGraph[39m[([32mString[39m, [32mString[39m), [32mString[39m] = org.apache.spark.graphx.impl.GraphImpl@5ae6fd3f

In [71]:
// print out the graph:
printAllEdges( graph )

(rxin) -- collab -- (jgonzal)
(franklin) -- advisor -- (rxin)
(istoica) -- colleague -- (franklin)
(franklin) -- pi -- (jgonzal)
(hoityToity) -- friend -- (franklin)
(hoityToity) -- friend -- (somebody)


In [72]:
// print out basic properties:
printGraphProperties(graph)

Num of edges = 6
Num of vertices = 6
Num of inDegrees = 4
Num of outDegrees = 4
Num of degrees = 6


In [68]:
println("Count all users which are postdocs:")
println( graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count )

Count all users which are postdocs:
2


In [69]:
println( "Count all the edges where src > dst")
println( graph.edges.filter(e => e.srcId > e.dstId).count )

Count all the edges where src > dst
3


In [129]:
spark.stop()