# Logistic Regression

In [1]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.log4j._
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.evaluation.MulticlassMetrics

Intitializing Scala interpreter ...

Spark Web UI available at http://10.188.57.59:4040
SparkContext available as 'sc' (version = 2.2.2, master = local[*], app id = local-1539609147853)
SparkSession available as 'spark'


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.log4j._
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.evaluation.MulticlassMetrics


In [2]:
Logger.getLogger("org").setLevel(Level.ERROR)
val file ="Machine_Learning_Sections/Classification/advertising.csv"
val df = spark.read.option("header","true").option("inferSchema","true").format("csv").load(file)
df.printSchema

root
 |-- Daily Time Spent on Site: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Area Income: double (nullable = true)
 |-- Daily Internet Usage: double (nullable = true)
 |-- Ad Topic Line: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Clicked on Ad: integer (nullable = true)



file: String = Machine_Learning_Sections/Classification/advertising.csv
df: org.apache.spark.sql.DataFrame = [Daily Time Spent on Site: double, Age: int ... 8 more fields]


In [3]:
val df1 = df.withColumn("Hour",hour(df("Timestamp")))

df1: org.apache.spark.sql.DataFrame = [Daily Time Spent on Site: double, Age: int ... 9 more fields]


In [4]:
val alldata = df1.select(df1("Clicked on Ad").as("label"),
                        $"Daily Time Spent on Site",
                        $"Age",
                        $"Area Income",
                        $"Daily Internet Usage",
                        $"Male",
                        $"Hour")
val data = alldata.na.drop()
data.printSchema

root
 |-- label: integer (nullable = true)
 |-- Daily Time Spent on Site: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Area Income: double (nullable = true)
 |-- Daily Internet Usage: double (nullable = true)
 |-- Male: integer (nullable = true)
 |-- Hour: integer (nullable = true)



alldata: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 5 more fields]
data: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 5 more fields]


In [5]:
val assembler = new VectorAssembler().setInputCols(Array("label",
                        "Daily Time Spent on Site",
                        "Age",
                        "Area Income",
                        "Daily Internet Usage",
                        "Male",
                        "Hour")).setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_ec34f35828b8


In [6]:
val Array(training_data, test_data) = data.randomSplit(Array(0.74, 0.26), seed = 12345)
training_data.count

training_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, Daily Time Spent on Site: double ... 5 more fields]
test_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, Daily Time Spent on Site: double ... 5 more fields]
res2: Long = 724


In [7]:
import org.apache.spark.ml.Pipeline
val lr = new LogisticRegression()
val pipeline = new Pipeline().setStages(Array(assembler, lr))

import org.apache.spark.ml.Pipeline
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_41afdc2a3782
pipeline: org.apache.spark.ml.Pipeline = pipeline_75e2eb005093


In [8]:
val model = pipeline.fit(training_data)
val results = model.transform(test_data)

model: org.apache.spark.ml.PipelineModel = pipeline_75e2eb005093
results: org.apache.spark.sql.DataFrame = [label: int, Daily Time Spent on Site: double ... 9 more fields]


In [9]:
results.count

res3: Long = 276


In [10]:
results.select($"prediction",$"label").show(5)

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
+----------+-----+
only showing top 5 rows



In [11]:
val prediction = results.select($"prediction",$"label").as[(Double, Double)].rdd
val metrics = new MulticlassMetrics(prediction)

prediction: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[70] at rdd at <console>:43
metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@72a09b05


In [12]:
// confusion Matrix
metrics.confusionMatrix

res5: org.apache.spark.mllib.linalg.Matrix =
132.0  0.0
0.0    144.0


In [14]:
//accuracy
metrics.accuracy

res7: Double = 1.0
