# Spark ML - Linear Regression (Adv Example)
Now that you've completed the documentation example, it's good to go through something a little more complex. This tutorial's materials have been adapted from Susan Li. Find more information here: 
- https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a
- https://github.com/susanli2016/PySpark-and-MLlib.

Objective: We're using a dataset from Kaggle to predict housing prices in Boston. We'll explore the data, visualise it, create a vector, build a regression model and evaluate it. 

In [9]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.regression import LinearRegression

# First, let's import the data. Note that we can infer the schema as it's a CSV file.
df = spark.read.csv("Datasets/boston_housing_data.csv",inferSchema=True,header=True)

In [10]:
# Let's explore. Here's the first row of the data.
print(df.head())

# And the entire data structure. 
df.printSchema()

Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0)
root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



## Feature Information
- CRIM — per capita crime rate by town.
- ZN — proportion of residential land zoned for lots over 25,000 sq.ft.
- INDUS — proportion of non-retail business acres per town.
- CHAS — Charles River dummy variable (= 1 if tract bounds river; 0 otherwise).
- NOX — nitrogen oxides concentration (parts per 10 million).
- RM — average number of rooms per dwelling.
- AGE — proportion of owner-occupied units built prior to 1940.
- DIS — weighted mean of distances to five Boston employment centres.
- RAD — index of accessibility to radial highways.
- TAX — full-value property-tax rate per \$10,000.
- PTRATIO — pupil-teacher ratio by town.
- BLACK — 1000(Bk — 0.63)² where Bk is the proportion of blacks by town.
- LSTAT — lower status of the population (percent).
- MEDV — median value of owner-occupied homes in $1000s. This is the target variable.

In [11]:
# Now that we understand the data's features, let's use a Python package to neatly describe the data.
import pandas as pd
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265
rad,506,9.549407114624506,8.707259384239366,1,24


## Setting Up a DataFrame for Machine Learning (MLlib)
We need to do a few things before Spark can accept this data for machine learning. First of all, it needs to be in the form of two columns: label and features. Unlike the documentation example, this data is messy. We'll need to combine all of the features into a single vector. VectorAssembler simplifies the process.

In [12]:
# Import VectorAssembler and Vectors
from pyspark.ml.feature import VectorAssembler

# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
vector_assembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'features')

# Now that we've created the assembler variable, let's actually transform the data.
vector_output = vector_assembler.transform(df)

# Using print schema, you see that the features output column has been added. 
vector_output.printSchema()

# You can see that the features column is a DenseVector that combines the various features as expected.
vector_output.head(1)

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)
 |-- features: vector (nullable = true)



[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0, features=DenseVector([0.0063, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2, 4.09, 1.0, 296.0, 15.3, 396.9, 4.98]))]

In [13]:
# Because the features have been combined into one vector, we no longer need them. Below we select the features and label.
vector_output = vector_output.select(['features', 'medv'])

# You can see that the dataframe now only contains two columns. 
print(vector_output.head(1))
vector_output.show(3)

[Row(features=DenseVector([0.0063, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2, 4.09, 1.0, 296.0, 15.3, 396.9, 4.98]), medv=24.0)]
+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows



In [14]:
# Let's do a randomised 70/30 split. Remember, you should explain why you chose a particular split. 
train_data,test_data = vector_output.randomSplit([0.7,0.3])

# Let's see our training data.
train_data.describe().show()

# And our testing data.
test_data.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               347|
|   mean|22.520461095100888|
| stddev| 9.055746543025418|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               159|
|   mean|22.559748427672957|
| stddev| 9.527396336301386|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



## Linear Regression
Now we can create a Linear Regression Model instance. Because the feature column is named 'features', we don't have to worry about it. However, as the labelCol isn't the default name, we have to specify it's name (medv).

In [15]:
# Importing the LR package.
from pyspark.ml.regression import LinearRegression

# Instantiate the instance.
lr = LinearRegression(featuresCol='features', labelCol='medv')

# Fit the training data.
lr_model = lr.fit(train_data)

# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

# Print the intercept.
print("Intercept: " + str(lr_model.intercept) + "\n")

# Summarise the model and print out some evaluation metrics.
training_summary = lr_model.summary

# Print RMSE. 
print("RMSE: " + str(training_summary.rootMeanSquaredError))

# Print R2.
print("R2: " + str(training_summary.r2))

Coefficients: [-0.08768171995287691,0.04156810439822088,0.03485554793084419,4.877901500391077,-18.10717650016799,3.396530513243429,-0.003485201766661757,-1.3459790348524527,0.24829628108813098,-0.009987236990479436,-0.9576820003039361,0.005596340757275696,-0.5303137617612333]
Intercept: 39.84588018015844

RMSE: 4.78101882094623
R2: 0.7204588386054254


In [16]:
train_data.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               347|
|   mean|22.520461095100888|
| stddev| 9.055746543025418|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



RMSE measures the differences between predicted values and actual values. However, RMSE alone is meaningless until we compare with the actual "medv" value, such as mean, min and max. After such comparison, our RMSE looks pretty good.

R2 indicates that our model can explain approximately 70% of the variability in median house value (medv).

### Evaluating the Model using the Testing Set

In [17]:
# Let's evaluate the model against the test data.
test_results = lr_model.evaluate(test_data)

# And print the RMSE/R2. As expected, our RMSE and R2 are slightly worse when applying the testing set.
print("RMSE on test data: " + str(test_results.rootMeanSquaredError))
print("R2 on test data: " + str(test_results.r2))

RMSE on test data: 4.626537055363597
R2 on test data: 0.7626967626458543


Good job on finishing! But there's still a huge amount of complexity yet to be discussed. When you use PySpark with your own dataset, be sure to look through the documentation, understand which parameters you can use and do your best to figure out what they actually do. 

These tutorials are designed to be finished within a few hours, but it will take much more time, practice and patience before you can call yourself an expert!