## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
#Problem 1
#1.	Load “linearRegressionData.csv” to Databricks environment.
file_location = "/FileStore/tables/linearRegressionData_csv___Sheet1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
lin_reg = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(lin_reg)

# linearRegressionData_csv___Sheet1.csv has been loaded to Databricks filesystem, then read from the path and saved in 'lin_reg' dataframe. 

dvs1,ivs1,ivs2,ivs3
34.63,5.53,5.58,5.41
40.89,3.89,6.48,6.97
37.25,5.07,4.5,6.5
45.09,5.81,5.71,8.59
39.4,5.61,5.79,6.77


In [0]:
# Create a view /temporary table named 'lin_reg' and load with 'linearRegressionData.csv' data 

temp_table_name = "lin_reg"

lin_reg.createOrReplaceTempView(temp_table_name)

In [0]:
#Import all the required modules
import dbldatagen as dg
from pyspark.sql.types import StructType, StructField,  StringType
import pandas as pd
import numpy as np
from pyspark.sql.functions import corr
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.regression import RidgeRegressionWithSGD as ridgeSGD
from pyspark.mllib.regression import LassoWithSGD as lassoSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs

In [0]:
#2.Generate the data based on 'linearRegressionData.csv' schema and minimum - maximum values using 'Databricks Labs data generator - dbldatagen' by installing  'dbldatagen.whl' python whl library to 'Data_Engineering' cluster.
shuffle_partitions_requested = 8
partitions_requested = 8
data_rows = 500000


table_schema = spark.table("lin_reg").schema

print(table_schema)
  
dataspec = (dg.DataGenerator(spark, rows=500000, partitions=8,
                  randomSeedMethod="hash_fieldname")
            .withSchema(table_schema))

dataspec = (dataspec
                .withColumnSpec("dvs1",random=True,minValue=37.25,maxValue=45.09,randomSeedMethod="hash_fieldname")                                       
                .withColumnSpec("ivs1",random=True,minValue=3.89,maxValue=5.81,randomSeedMethod="hash_fieldname") 
                .withColumnSpec("ivs2",random=True,minValue=4.5,maxValue=6.48,randomSeedMethod="hash_fieldname")       
                .withColumnSpec("ivs3",random=True,minValue=5.41,maxValue=8.59,randomSeedMethod="hash_fieldname")
           )
df1 = dataspec.build()



StructType(List(StructField(dvs1,DoubleType,true),StructField(ivs1,DoubleType,true),StructField(ivs2,DoubleType,true),StructField(ivs3,DoubleType,true)))


In [0]:
#Verify if the required number of records have generated
df1.count()

Out[5]: 500000

In [0]:
#3.	Load “autoMPGDataModified.csv” to Databricks environment.
file_location = "/FileStore/tables/autoMPGDataModified___Sheet1.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
mpg_data = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(mpg_data)

# autoMPGDataModified___Sheet1.csv has been loaded to Databricks filesystem, then read from the path and saved in 'mpg_data' dataframe. 

mpg,displacement,horsepower,weight,accelaration
18,307,18,3504,12.0
15,350,36,3693,11.5
18,318,30,3436,11.0
16,304,30,3433,12.0
17,302,25,3449,10.5


In [0]:
# Create a view/temporary table named 'mpg_data' and load with 'autoMPGDataModified.csv' data 

temp_table_name = "mpg_data"

mpg_data.createOrReplaceTempView(temp_table_name)

In [0]:
#4.Generate the data based on 'autoMPGDataModified___Sheet1.csv' schema and minimum - maximum values using 'Databricks Labs data generator - dbldatagen' by installing  'dbldatagen.whl' python whl library to 'Data_Engineering' cluster.
shuffle_partitions_requested = 8
partitions_requested = 8
data_rows = 500000


table_schema = spark.table("mpg_data").schema

print(table_schema)
  
dataspec = (dg.DataGenerator(spark, rows=500000, partitions=8,
                  randomSeedMethod="hash_fieldname")
            .withSchema(table_schema))

dataspec = (dataspec
                .withColumnSpec("mpg",random=True,minValue=15,maxValue=18,randomSeedMethod="hash_fieldname")                                       
                .withColumnSpec("displacement",random=True,minValue=302,maxValue=350,randomSeedMethod="hash_fieldname") 
                .withColumnSpec("horsepower",random=True,minValue=18,maxValue=36,randomSeedMethod="hash_fieldname")       
                .withColumnSpec("weight",random=True,minValue=3430,maxValue=3693,randomSeedMethod="hash_fieldname")
                .withColumnSpec("accelaration",random=True,minValue=10.5,maxValue=12,randomSeedMethod="hash_fieldname")
           )
df2 = dataspec.build()


StructType(List(StructField(mpg,IntegerType,true),StructField(displacement,IntegerType,true),StructField(horsepower,IntegerType,true),StructField(weight,IntegerType,true),StructField(accelaration,DoubleType,true)))


In [0]:
#Check generated data
df2.show()

+---+------------+----------+------+------------+
|mpg|displacement|horsepower|weight|accelaration|
+---+------------+----------+------+------------+
| 16|         305|        28|  3561|        10.5|
| 16|         325|        36|  3573|        10.5|
| 17|         314|        35|  3680|        10.5|
| 17|         331|        22|  3645|        11.5|
| 18|         326|        27|  3479|        10.5|
| 17|         310|        26|  3588|        10.5|
| 17|         316|        34|  3550|        11.5|
| 16|         339|        22|  3552|        10.5|
| 16|         334|        27|  3460|        10.5|
| 18|         336|        30|  3581|        10.5|
| 16|         303|        20|  3503|        10.5|
| 18|         346|        35|  3533|        10.5|
| 18|         307|        24|  3491|        11.5|
| 16|         322|        28|  3580|        10.5|
| 15|         316|        32|  3506|        11.5|
| 15|         317|        27|  3461|        11.5|
| 18|         311|        34|  3662|        10.5|


In [0]:
#Problem 2 
#1.	For data “linearRegressionData.csv”, transform Dataframe  to RDD.
lin_reg_rdd = df1.rdd

In [0]:
#2.	Create an RDD of the labeled point.
lin_reg_rdd_labelpoint = lin_reg_rdd.map(lambda data : LabeledPoint(data[0],data[1:4]))
lin_reg_rdd_labelpoint.take(10)

#Considering dvs1 as target variable(dependent variable) and ivs1,ivs2,ivs3 as feature variable(independent variables).

Out[37]: [LabeledPoint(41.25, [4.890000000000001,5.5,6.41]),
 LabeledPoint(40.25, [3.89,5.5,5.41]),
 LabeledPoint(43.25, [3.89,4.5,6.41]),
 LabeledPoint(37.25, [4.890000000000001,4.5,8.41]),
 LabeledPoint(39.25, [4.890000000000001,5.5,7.41]),
 LabeledPoint(42.25, [3.89,4.5,6.41]),
 LabeledPoint(38.25, [4.890000000000001,5.5,8.41]),
 LabeledPoint(41.25, [4.890000000000001,4.5,6.41]),
 LabeledPoint(39.25, [4.890000000000001,4.5,6.41]),
 LabeledPoint(43.25, [4.890000000000001,4.5,6.41])]

In [0]:
#3.	Divide the data into a training and testing set- 70-30 percent using randomsplit function
lin_reg_rdd_labelPointSplit = lin_reg_rdd_labelpoint.randomSplit([0.7,0.3])

In [0]:
#Continuation of question 3 ....
lin_reg_rdd_labelPointSplitTrainData = lin_reg_rdd_labelPointSplit[0]
lin_reg_rdd_labelPointSplitTestData = lin_reg_rdd_labelPointSplit[1]

In [0]:
#4.	Create a linear regression model.
from pyspark.mllib.regression import LinearRegressionWithSGD as lrSGD
LinearRegression = lrSGD.train(data = lin_reg_rdd_labelPointSplitTrainData,iterations =200,step = 0.1,intercept = True)

#Here steps and iterations have been adjusted to maximize R^2 value

In [0]:
#Check the intercept value generated by model
LinearRegression.intercept

Out[40]: 1.8427682174394642

In [0]:
#Check the weights generated by model
LinearRegression.weights

Out[41]: DenseVector([2.5951, 2.9593, 1.8133])

In [0]:
#Regression Model is as below  : 
#dvs1=1.84 + 2.59ivs1 + 2.95ivs2 + 1.8ivs3


In [0]:
#5. Save the model 
LinearRegression.save(sc, '/home/pysparkbook/LinearRegression')

In [0]:
#6.	Predict data using the saved 'LinearRegression' model.
LinearRegressionPredictedData = lin_reg_rdd_labelPointSplitTestData.map(lambda data : (float(data.label) ,float(LinearRegression.predict(data.features))))

In [0]:
#Continuation of question 6....Check the predicted data 
LinearRegressionPredictedData.take(5)

Out[44]: [(40.25, 38.0242394395811),
 (43.25, 36.878207098655686),
 (42.25, 36.878207098655686),
 (38.25, 46.0593056491882),
 (39.25, 39.47334880173755)]

In [0]:
#7.	Evaluate the created model and check its accuracy using RMSE 
LinearRegressionModelMetrics = rmtrcs(LinearRegressionPredictedData)
LinearRegressionModelMetrics.rootMeanSquaredError

#Here the RMSE value(3.35) is low; thereby indicating that model's predictions are not that errorneous.

Out[45]: 3.350951055850866

In [0]:
#Continuation of question 7.....
LinearRegressionModelMetrics.r2

#Here the negative value of R square is indicating that regression line is not better than horizontal line(i.e mean value line) to fit the data.

Out[46]: -0.6219044010035226

In [0]:
#Problem 3
#1.	For data “autoMPGDataModified.csv”, transform Dataframe to RDD.
mpg_rdd = df2.rdd.map(list)

In [0]:
#2.	Create an RDD of the labeled point
mpg_rdd_LabelPoint = mpg_rdd.map(lambda data : LabeledPoint(data[0],[data[1]/10,data[2],float(data[3])/100,data[4]]))

#Dividing the displacement by 10 and weight by 100 inorder to normalize the data
#Considering mpg as target variable(dependent variable) and displacement,horsepower,weight,accelaration as feature variable(independent variable).

In [0]:
#Verify the labeled points
mpg_rdd_LabelPoint.take(5)

Out[23]: [LabeledPoint(16.0, [30.5,28.0,35.61,10.5]),
 LabeledPoint(16.0, [32.5,36.0,35.73,10.5]),
 LabeledPoint(17.0, [31.4,35.0,36.8,10.5]),
 LabeledPoint(17.0, [33.1,22.0,36.45,11.5]),
 LabeledPoint(18.0, [32.6,27.0,34.79,10.5])]

In [0]:
#3.Divide the data into a training and testing set. - 70-30 percent 
mpg_rdd_LabelPoint_Split = mpg_rdd_LabelPoint.randomSplit([0.7,0.3])
mpg_rdd_LabelPoint_Train = mpg_rdd_LabelPoint_Split[0]
mpg_rdd_LabelPoint_Test = mpg_rdd_LabelPoint_Split[1]
mpg_rdd_LabelPoint_Train.take(5)

Out[24]: [LabeledPoint(16.0, [30.5,28.0,35.61,10.5]),
 LabeledPoint(16.0, [32.5,36.0,35.73,10.5]),
 LabeledPoint(17.0, [31.4,35.0,36.8,10.5]),
 LabeledPoint(17.0, [33.1,22.0,36.45,11.5]),
 LabeledPoint(18.0, [32.6,27.0,34.79,10.5])]

In [0]:
#4.Create a ridge regression model.
#Ridge regression is an extension of linear regression where the loss function is modified to minimize the complexity of the model; has an additional parameter 'regParam'.
Ridge_Model = ridgeSGD.train(data = mpg_rdd_LabelPoint_Train,iterations = 350,step = 0.0006,regParam = 0.05,intercept = True)



In [0]:
#Check the intercept value generated by ridge model
Ridge_Model.intercept

Out[26]: 1.004820131988767

In [0]:
#Check the weights generated by ridge model
Ridge_Model.weights


Out[27]: DenseVector([0.1598, 0.127, 0.175, 0.0541])

In [0]:
#5.Train and save the model
Ridge_Model.save(sc, '/home/pysparkbook/Ridge_Model')

#Model has been saved; its showing error since it already exist and trying to save again.

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-801468090244901>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m#5.     Train and save the model[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mRidge_Model[0m[0;34m.[0m[0msave[0m[0;34m([0m[0msc[0m[0;34m,[0m [0;34m'/home/pysparkbook/Ridge_Model'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/mllib/regression.py[0m in [0;36msave[0;34m(self, sc, path)[0m
[1;32m    533[0m         java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel(
[1;32m    534[0m             _py2java(sc, self._coeff), self.intercept)
[0;32m--> 535[0;31m         [0mjava_model[0m[0;34m.[0m[0msave[0m[0;34m([0m[0msc[0m[0;34m.[0m[0m_jsc[0m[0;34m.[0m[0msc[0m[0;34m([0m[0;34m)[0m[0;34m,[0m [0mpath[0m

In [0]:
#6.	Predict data using the saved 'Ridge_Model' model
mpg_PredictedData = mpg_rdd_LabelPoint_Test.map(lambda data :[float(data.label) , float(Ridge_Model.predict(data.features))])
mpg_PredictedData.take(5)


Out[29]: [[17.0, 17.20473930455295],
 [18.0, 17.01655446868039],
 [16.0, 15.082827761725197],
 [18.0, 17.266725966839196],
 [18.0, 16.186684710752324]]

In [0]:
#7.	Evaluate the created model and check its accuracy using RMSE 
RidgeModelMetrics = rmtrcs(mpg_PredictedData)
RidgeModelMetrics.rootMeanSquaredError

#Here, low RMSE value (1.193) indicates that, given model is able to 'fit' a dataset

Out[30]: 1.1926152480536567

In [0]:
#8.	Create a Lasso regression model.
mpg_Lasso = lassoSGD.train(data = mpg_rdd_LabelPoint_Train,iterations = 400, step = 0.0005,regParam = 0.05, intercept = True)



In [0]:
#Check the intercept value generated by Lasso Regression model
mpg_Lasso.intercept

Out[32]: 1.00482478838847

In [0]:
#Check the weights generated by Lasso Regression model 
mpg_Lasso.weights

Out[33]: DenseVector([0.1594, 0.1277, 0.1745, 0.0539])

In [0]:
#9.Train and save the model
mpg_Lasso.save(sc, '/home/pysparkbook/Lasso_Model')

#Model has been saved; its showing error since it already exist and trying to save again.

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3440197099547882>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m#9.Train and save the model[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mmpg_Lasso[0m[0;34m.[0m[0msave[0m[0;34m([0m[0msc[0m[0;34m,[0m [0;34m'/home/pysparkbook/Lasso_Model'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/mllib/regression.py[0m in [0;36msave[0;34m(self, sc, path)[0m
[1;32m    379[0m         java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel(
[1;32m    380[0m             _py2java(sc, self._coeff), self.intercept)
[0;32m--> 381[0;31m         [0mjava_model[0m[0;34m.[0m[0msave[0m[0;34m([0m[0msc[0m[0;34m.[0m[0m_jsc[0m[0;34m.[0m[0msc[0m[0;34m([0m[0;34m)[0m[0;34m,[0m [0mpath[0m[0;34m)[0m[0;

In [0]:
#10.Predict data using the saved 'mpg_Lasso' model.
mpg_LassoPredictedData = mpg_rdd_LabelPoint_Test.map(lambda data : (float(data.label) , float(mpg_Lasso.predict(data.features))))
mpg_LassoPredictedData.take(5)

Out[34]: [(17.0, 17.199932131483312),
 (18.0, 17.008139428546237),
 (16.0, 15.068568930204963),
 (18.0, 17.26178745952062),
 (18.0, 16.172879888490634)]

In [0]:
#11.Evaluate the created model and check its accuracy
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
mpg_LassoModelMetrics = rmtrcs(mpg_LassoPredictedData)
mpg_LassoModelMetrics.rootMeanSquaredError

#Here, low RMSE value (1.195) indicates that, given model is able to 'fit' a dataset

Out[35]: 1.194930606846984

In [0]:
#Problem 4 
#Compare the lasso regression and ridge regression models and provide your commentary.


# Ridge Regression: Ridge regression is an extension for linear regression. It’s basically a regularized linear regression model. It enforces the beta coefficients to be lower, but it does not enforce them to be zero. That is, it will not get rid of irrelevant features but rather minimize their impact on the trained model.

# Lasso Regression: Lasso is another extension built on regularized linear regression. The only difference from Ridge regression is that the regularization term is in absolute value. Lasso method overcomes the disadvantage of Ridge regression by not only punishing high values of the beta coefficients but actually setting them to zero if they are not relevant. So, model will have fewer features.

# From 'MPG' data perspective, both Lasso and Ridge model are able fit the data effectively since both models are having RMSE values nearly equal (Ridge -->1.193 ;  Lasso --> 1.195), this could be since dataset has less number of features (4) and all could be significant. We would be able to see significant difference between the two models incase of dataset having significantly more number of independent variables.



