In [50]:
// Step 1 Choose model
// Step 2 Format Data-frame into two columns : label column & Array of features columns
// Lable column if its supervised learning 
// if unsupervised learning - only one column : array of features
// convert all the columns of features into a single column consisting of an array of all those features - 

In [51]:
// Pipeline object allows to easily set up a pipeline of data transformations, model selections and parameter tuning 
// It makes syntax much cleaner 

## Regression 

In [52]:
// To see less warnings
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)

import org.apache.log4j._


In [53]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}


In [54]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.LinearRegression

val spark = SparkSession.builder()
                        .appName("LinearRegExample")
                        .config("spark.master", "local")
                        .getOrCreate()

import spark.implicits._

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.LinearRegression
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22e6da4f
import spark.implicits._


In [55]:
// Prepare training and test data.
val data = spark.read.option("header","true").option("inferSchema","true").format("csv").load("USA_Housing.csv")

// Check out the Data
data.printSchema()

root
 |-- Avg Area Income: double (nullable = true)
 |-- Avg Area House Age: double (nullable = true)
 |-- Avg Area Number of Rooms: double (nullable = true)
 |-- Avg Area Number of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)



data: org.apache.spark.sql.DataFrame = [Avg Area Income: double, Avg Area House Age: double ... 4 more fields]


In [56]:
// Setting Up DataFrame for Machine Learning 

In [57]:
// This will allow us to join multiple feature columns
// into a single column of an array of feautre values
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors


In [58]:
// Rename Price to label column for naming convention.
// Grab only numerical columns from the data
var df = data.select($"Price", $"Avg Area Income", $"Avg Area House Age", $"Avg Area Number of Rooms", $"Area Population")

df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]


In [59]:
// Convert to double 
df = df.withColumn("Avg Area Income", df("Avg Area Income").cast("double"))
df = df.withColumn("Avg Area House Age", df("Avg Area Income").cast("double"))
df = df.withColumn("Avg Area Number of Rooms", df("Avg Area Number of Rooms").cast("double"))
df = df.withColumn("Area Population", df("Area Population").cast("double"))

df = df.withColumn("Price", df("Price").cast("double"))

df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]
df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]
df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]
df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]
df: org.apache.spark.sql.DataFrame = [Price: double, Avg Area Income: double ... 3 more fields]


In [60]:
df = df.withColumnRenamed("Price", "label")

df: org.apache.spark.sql.DataFrame = [label: double, Avg Area Income: double ... 3 more fields]


In [61]:
// An assembler converts the input values to a vector
// A vector is what the ML algorithm reads to train a model

// Set the input columns from which we are supposed to read the values
// Set the name of the column where the vector will be stored
val assembler = new VectorAssembler().setInputCols(Array("Avg Area Income","Avg Area House Age","Avg Area Number of Rooms","Area Population")).setOutputCol("features")

// Use the assembler to transform our DataFrame to the two columns
val training_df = assembler.transform(df).select($"label", $"features")

assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_23f0051ac170, handleInvalid=error, numInputCols=4
training_df: org.apache.spark.sql.DataFrame = [label: double, features: vector]


In [62]:
// Create a Linear Regression Model object
val lr = new LinearRegression()

lr: org.apache.spark.ml.regression.LinearRegression = linReg_493169b9e0e7


In [63]:
var lrModel = lr.fit(training_df)

lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_493169b9e0e7, numFeatures=4


In [64]:
// print coefficients & intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

Coefficients: [10.772761541740392,10.772761541740392,120060.55511403256,14.88564417519396] Intercept: -1622861.7041208127


In [65]:
// Summarize the model over the training set and print out some metrics

val trainingSummary = lrModel.summary

println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")

numIterations: 7
objectiveHistory: List(0.5, 0.31300164069390934, 0.15343625620611956, 0.14968401606775306, 0.14912193049893396, 0.1491218893453672, 0.14912188933015624, 0.14912188933013049)


trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@4411b751


In [66]:
trainingSummary.totalIterations

res24: Int = 7


In [67]:
trainingSummary.objectiveHistory.toList

res25: List[Double] = List(0.5, 0.31300164069390934, 0.15343625620611956, 0.14968401606775306, 0.14912193049893396, 0.1491218893453672, 0.14912188933015624, 0.14912188933013049)


In [68]:
 trainingSummary.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|-217142.16841915203|
|  15190.99821847654|
| -209669.7105461515|
|  337196.3682716482|
| -372041.7669418452|
|-167507.52370156976|
|-152736.92664057482|
| 169323.43214833923|
| -74453.53638159565|
|-174533.87658723188|
| 286500.92047974723|
| -211273.4675923239|
| 370409.07444726955|
|  137103.1896799663|
| -7430.190298326546|
| -24089.82271954883|
|-115775.43270880845|
|-161023.04202414374|
|-120707.78718792007|
|-204347.53636440332|
+-------------------+
only showing top 20 rows



In [69]:
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

RMSE: 192824.25387277597
r2: 0.7017562213397575


In [71]:
spark.stop()