In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.feature.StandardScaler

import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder().getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://MSI:4040
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1605214207529)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier, GBTClassifier}
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.feature.StandardScaler
import org.apache.log4j._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7d5bdaa


In [2]:
val df = (spark.read.option("header","true").option("inferSchema","true")
          .option("multiline","true").format("csv")
          .load("../../data/ml_scala/advertising.csv"))

df: org.apache.spark.sql.DataFrame = [Daily Time Spent on Site: double, Age: int ... 8 more fields]


In [3]:
df.printSchema()
df.describe().show()
df.show(5)

// Print a row to better visualize
val colnames = df.columns
val firstrow = df.head(1)(0)
println("\n")
println("Example Data Row")
for(ind <- Range(1,colnames.length)){
  println(colnames(ind))
  println(firstrow(ind))
  println("\n")
}

root
 |-- Daily Time Spent on Site: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Area Income: double (nullable = true)
 |-- Daily Internet Usage: double (nullable = true)
 |-- Ad Topic Line: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Clicked on Ad: integer (nullable = true)

+-------+------------------------+-----------------+------------------+--------------------+--------------------+----------+-------------------+-----------+-------------------+------------------+
|summary|Daily Time Spent on Site|              Age|       Area Income|Daily Internet Usage|       Ad Topic Line|      City|               Male|    Country|          Timestamp|     Clicked on Ad|
+-------+------------------------+-----------------+------------------+--------------------+--------------------+----------+-------------------+-----------+---------------

colnames: Array[String] = Array(Daily Time Spent on Site, Age, Area Income, Daily Internet Usage, Ad Topic Line, City, Male, Country, Timestamp, Clicked on Ad)
firstrow: org.apache.spark.sql.Row = [68.95,35,61833.9,256.09,Cloned 5thgeneration orchestration,Wrightburgh,0,Tunisia,2016-03-27 00:53:11,0]


In [4]:
// Add an hour column from the Timestamp
val df_time = df.withColumn("Hour", hour(df("Timestamp")))
df_time.show(5)

+------------------------+---+-----------+--------------------+--------------------+--------------+----+----------+-------------------+-------------+----+
|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|       Ad Topic Line|          City|Male|   Country|          Timestamp|Clicked on Ad|Hour|
+------------------------+---+-----------+--------------------+--------------------+--------------+----+----------+-------------------+-------------+----+
|                   68.95| 35|    61833.9|              256.09|Cloned 5thgenerat...|   Wrightburgh|   0|   Tunisia|2016-03-27 00:53:11|            0|   0|
|                   80.23| 31|   68441.85|              193.77|Monitored nationa...|     West Jodi|   1|     Nauru|2016-04-04 01:39:02|            0|   1|
|                   69.47| 26|   59785.94|               236.5|Organic bottom-li...|      Davidton|   0|San Marino|2016-03-13 20:35:42|            0|  20|
|                   74.15| 29|   54806.18|              245.89|Triple-

df_time: org.apache.spark.sql.DataFrame = [Daily Time Spent on Site: double, Age: int ... 9 more fields]


In [5]:
// Drop rows with null
df_time.na.drop()

res2: org.apache.spark.sql.DataFrame = [Daily Time Spent on Site: double, Age: int ... 9 more fields]


In [6]:
df_time.columns

res3: Array[String] = Array(Daily Time Spent on Site, Age, Area Income, Daily Internet Usage, Ad Topic Line, City, Male, Country, Timestamp, Clicked on Ad, Hour)


In [7]:
// Select relevant features and label
val df_detail = (df_time.select(df_time("Clicked on Ad").as("label"), $"Daily Time Spent on Site",
                           $"Age",$"Area Income", $"Daily Internet Usage",$"Male",$"Hour"))

// Remove rows with duplicates
val df_clean = df_detail.na.drop()

// Show dataframe
df_clean.show(5)

+-----+------------------------+---+-----------+--------------------+----+----+
|label|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|Male|Hour|
+-----+------------------------+---+-----------+--------------------+----+----+
|    0|                   68.95| 35|    61833.9|              256.09|   0|   0|
|    0|                   80.23| 31|   68441.85|              193.77|   1|   1|
|    0|                   69.47| 26|   59785.94|               236.5|   0|  20|
|    0|                   74.15| 29|   54806.18|              245.89|   1|   2|
|    0|                   68.37| 35|   73889.99|              225.58|   0|   3|
+-----+------------------------+---+-----------+--------------------+----+----+
only showing top 5 rows



df_detail: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 5 more fields]
df_clean: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 5 more fields]


In [8]:
// Assemble everything together to be ("label","features") format
val assembler = (new VectorAssembler()
                  .setInputCols(Array("Daily Time Spent on Site", "Age", "Area Income",
                                      "Daily Internet Usage","Male","Hour"))
                  .setOutputCol("features"))

// Create scaler for random forest and gb classifiers
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Create train/test data sets
val Array(training, test) = df_clean.randomSplit(Array(0.7,0.3))

assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_58eb290b3784, handleInvalid=error, numInputCols=6
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_64d9ab586d3d
training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, Daily Time Spent on Site: double ... 5 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, Daily Time Spent on Site: double ... 5 more fields]


In [9]:
// Create classifier
val clf_lr = new LogisticRegression()
val clf_rf = new RandomForestClassifier()
val clf_gb = new GBTClassifier()

// Create pipelines
val log_pipeline = new Pipeline().setStages(Array(assembler,  clf_lr))
val rf_pipeline = new Pipeline().setStages(Array(assembler, scaler, clf_rf))
val gb_pipeline = new Pipeline().setStages(Array(assembler, scaler, clf_gb))

// Fit models
val log_model = log_pipeline.fit(training)
val rf_model = rf_pipeline.fit(training)
val gb_model =  gb_pipeline.fit(training)

clf_lr: org.apache.spark.ml.classification.LogisticRegression = logreg_da1f93e812c1
clf_rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_9c69ba19c931
clf_gb: org.apache.spark.ml.classification.GBTClassifier = gbtc_634fb516776d
log_pipeline: org.apache.spark.ml.Pipeline = pipeline_6bbeaf2aa945
rf_pipeline: org.apache.spark.ml.Pipeline = pipeline_d316856f7c38
gb_pipeline: org.apache.spark.ml.Pipeline = pipeline_1dd369a83bdb
log_model: org.apache.spark.ml.PipelineModel = pipeline_6bbeaf2aa945
rf_model: org.apache.spark.ml.PipelineModel = pipeline_d316856f7c38
gb_model: org.apache.spark.ml.PipelineModel = pipeline_1dd369a83bdb


In [10]:
// Model Results
val log_results = log_model.transform(test)
val rf_results = rf_model.transform(test)
val gb_results = gb_model.transform(test)

// Convert to RDD to use the MulticlassMetrics class
val log_pred = log_results.select($"prediction",$"label").as[(Double, Double)].rdd
val rf_pred = rf_results.select($"prediction",$"label").as[(Double, Double)].rdd
val gb_pred = gb_results.select($"prediction",$"label").as[(Double, Double)].rdd

// Create new metrics object
val log_metrics = new MulticlassMetrics(log_pred)
val rf_metrics = new MulticlassMetrics(rf_pred)
val gb_metrics = new MulticlassMetrics(gb_pred)

// Print out confusion matrices
println("Confusion Matrix for Logistic Model:")
println(log_metrics.confusionMatrix)
println("Confusion Matrix for Random Forest Model:")
println(rf_metrics.confusionMatrix)
println("Confusion Matrix for Gradient Boosted Model:")
println(gb_metrics.confusionMatrix)

Confusion Matrix for Logistic Model:
156.0  1.0    
9.0    144.0  
Confusion Matrix for Random Forest Model:
156.0  1.0    
13.0   140.0  
Confusion Matrix for Gradient Boosted Model:
154.0  3.0    
11.0   142.0  


log_results: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 9 more fields]
rf_results: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 10 more fields]
gb_results: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 10 more fields]
log_pred: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[616] at rdd at <console>:46
rf_pred: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[622] at rdd at <console>:47
gb_pred: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[628] at rdd at <console>:48
log_metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@29c097cf
rf_metrics: org.apache.spark.ml...


In [11]:
spark.stop()