# Linear Regression using Spark and Scala
## Predict the profits of a food truck

Read the input file which contains the data profits and population from the cities

In [1]:
// The code was removed by DSX for sharing.

Waiting for a Spark session to start...

+----------+------+
|Population|Profit|
+----------+------+
|    6.1101|17.592|
|    5.5277|9.1302|
|    8.5186|13.662|
|    7.0032|11.854|
|    5.8598|6.8233|
+----------+------+
only showing top 5 rows



In [2]:

val dfData1 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(bmos.url("AnalyticsEngineTest", "ex1data1.csv"))
dfData1.show(5)


+----------+------+
|Population|Profit|
+----------+------+
|    6.1101|17.592|
|    5.5277|9.1302|
|    8.5186|13.662|
|    7.0032|11.854|
|    5.8598|6.8233|
+----------+------+
only showing top 5 rows



In [3]:
df1.printSchema()
df1.describe("Profit").show()
df1.schema

root
 |-- Population: double (nullable = true)
 |-- Profit: double (nullable = true)

+-------+-----------------+
|summary|           Profit|
+-------+-----------------+
|  count|               97|
|   mean| 5.83913505154639|
| stddev|5.510262255231546|
|    min|          -2.6807|
|    max|           24.147|
+-------+-----------------+



StructType(StructField(Population,DoubleType,true), StructField(Profit,DoubleType,true))

In [4]:
val df2 = df1.select(df1("Profit").as("label"), df1("Population").as("features1"))
df2.show()

+-------+---------+
|  label|features1|
+-------+---------+
| 17.592|   6.1101|
| 9.1302|   5.5277|
| 13.662|   8.5186|
| 11.854|   7.0032|
| 6.8233|   5.8598|
| 11.886|   8.3829|
| 4.3483|   7.4764|
|   12.0|   8.5781|
| 6.5987|   6.4862|
| 3.8166|   5.0546|
| 3.2522|   5.7107|
| 15.505|   14.164|
| 3.1551|    5.734|
| 7.2258|   8.4084|
|0.71618|   5.6407|
| 3.5129|   5.3794|
| 5.3048|   6.3654|
|0.56077|   5.1301|
| 3.6518|   6.4296|
| 5.3893|   7.0708|
+-------+---------+
only showing top 20 rows



In [5]:
import org.apache.spark.ml.regression._
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler().setInputCols(Array("features1")).setOutputCol("features")
val df3 = assembler.transform(df2).select($"label",$"features")
df2.show(5)
df3.show(5)

+------+---------+
| label|features1|
+------+---------+
|17.592|   6.1101|
|9.1302|   5.5277|
|13.662|   8.5186|
|11.854|   7.0032|
|6.8233|   5.8598|
+------+---------+
only showing top 5 rows

+------+--------+
| label|features|
+------+--------+
|17.592|[6.1101]|
|9.1302|[5.5277]|
|13.662|[8.5186]|
|11.854|[7.0032]|
|6.8233|[5.8598]|
+------+--------+
only showing top 5 rows



In [6]:
val lr = new LinearRegression()
val lrModel = lr.fit(df3)

In [7]:
println(s"Slope/Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

val trainingSummary = lrModel.summary

println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")

trainingSummary.residuals.show()

println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"MSE: ${trainingSummary.meanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

Slope/Coefficients: [1.1930336441895963] Intercept: -3.895780878311882
numIterations: 1
objectiveHistory: List(0.0)
+--------------------+
|           residuals|
+--------------------+
|  14.198226008949028|
|    6.43124880332505|
|   7.394804476918388|
|  7.3947276613233015|
|  3.7281423300896854|
|   5.780699142434916|
| -0.6755158591072163|
|   5.661818975089107|
|  2.7562260553693223|
|  1.6820730203911491|
| 0.33492364643835426|
|  2.5026523420104407|
| 0.21002596252873662|
|  1.0900767845080797|
|  -2.117583998468374|
|  0.9908756927583671|
|  1.6064445195874253|
| -1.6638310197451653|
|-0.12314824036954564|
|  0.8493785869760853|
+--------------------+
only showing top 20 rows

RMSE: 2.9923139460876023
MSE: 8.953942751950358
r2: 0.70203155378414


In [8]:
val df4 = lrModel.transform(df3)
df4.show(4)


+------+--------+------------------+
| label|features|        prediction|
+------+--------+------------------+
|17.592|[6.1101]|3.3937739910509706|
|9.1302|[5.5277]|  2.69895119667495|
|13.662|[8.5186]| 6.267195523081613|
|11.854|[7.0032]| 4.459272338676698|
+------+--------+------------------+
only showing top 4 rows



In [9]:
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.{Vector,Vectors}
df4.printSchema

val df5 = df4.map{case Row(d1:Double, v: org.apache.spark.ml.linalg.Vector, d2:Double) => (d1,v(0),d2)}.toDF("Profit","Population","Prediction")
df5.show(4)

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)

+------+----------+------------------+
|Profit|Population|        Prediction|
+------+----------+------------------+
|17.592|    6.1101|3.3937739910509706|
|9.1302|    5.5277|  2.69895119667495|
|13.662|    8.5186| 6.267195523081613|
|11.854|    7.0032| 4.459272338676698|
+------+----------+------------------+
only showing top 4 rows



Usually '%AddDeps org.vegas-viz vegas_2.11 0.3.11 --transitive' would download all the dependend jar's. But this was not working and hence adding the individual dependencies

In [10]:
%AddDeps org.vegas-viz vegas_2.11 0.3.11
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11 
%AddDeps org.typelevel cats-core_2.11 0.9.0
%AddDeps org.typelevel cats-kernel_2.11 0.9.0
%AddDeps org.typelevel cats-macros_2.11 0.9.0
%AddDeps io.circe circe-core_2.11 0.7.0
%AddDeps io.circe circe-generic_2.11 0.7.0
%AddDeps io.circe circe-jawn_2.11 0.7.0
%AddDeps io.circe circe-numbers_2.11 0.7.0
%AddDeps io.circe circe-parser_2.11 0.7.0
%AddDeps org.spire-math jawn-parser_2.11 0.10.4
%AddDeps org.typelevel machinist_2.11 0.6.1
%AddDeps org.typelevel macro-compat_2.11 1.1.1
%AddDeps com.github.julien-truffaut monocle-core_2.11 1.1.0
%AddDeps com.github.julien-truffaut monocle-macro_2.11 1.1.0
%AddDeps org.scalafx scalafx_2.11-8 0.92-R10
%AddDeps org.scalaz scalaz-core_2.11 7.1.1
%AddDeps com.chuusai shapeless_2.11 2.3.2
%AddDeps com.github.mpilquist simulacrum_2.11 0.10.0
%AddDeps org.webjars.bower vega-3 0.0-rc4
%AddDeps org.webjars.bower vega-lite 1.2.0
%AddDeps org.vegas-viz vegas_2.11 0.3.11
%AddDeps org.vegas-viz vegas-macros_2.11 0.3.11

Marking org.vegas-viz:vegas_2.11:0.3.11 for download
Preparing to fetch from:
-> file:/gpfs/fs01/user/s1a7-203321b462d21d-1bc0acfaf2fe/notebook/tmp/toree-tmp-dir707244611477441236/toree_add_deps/
-> https://repo1.maven.org/maven2
-> New file at /gpfs/fs01/user/s1a7-203321b462d21d-1bc0acfaf2fe/notebook/tmp/toree-tmp-dir707244611477441236/toree_add_deps/https/repo1.maven.org/maven2/org/vegas-viz/vegas_2.11/0.3.11/vegas_2.11-0.3.11.jar
Marking org.vegas-viz:vegas-spark_2.11:0.3.11 for download
Preparing to fetch from:
-> file:/gpfs/fs01/user/s1a7-203321b462d21d-1bc0acfaf2fe/notebook/tmp/toree-tmp-dir707244611477441236/toree_add_deps/
-> https://repo1.maven.org/maven2
-> New file at /gpfs/fs01/user/s1a7-203321b462d21d-1bc0acfaf2fe/notebook/tmp/toree-tmp-dir707244611477441236/toree_add_deps/https/repo1.maven.org/maven2/org/vegas-viz/vegas-spark_2.11/0.3.11/vegas-spark_2.11-0.3.11.jar
Marking org.typelevel:cats-core_2.11:0.9.0 for download
Preparing to fetch from:
-> file:/gpfs/fs01/user/s1a

Import the Vegas project. This can be used to plot graphs in scala-spark

In [11]:
import vegas._
import vegas.data.External._
import vegas.sparkExt._

In [12]:
Vegas.layered("Linear Regression", width=500, height=500).
  withLayers(
    Layer().
        withDataFrame(df1).
        mark(Point).
        encodeX("Population", Quant).
        encodeY("Profit", Quant),
    Layer().
      withDataFrame(df5).
      mark(Line).
      encodeX("Population", Quant).
      encodeY("Prediction", Quant)
  ).
  show