## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
#importing essential libraries
import pyspark
from pyspark import SparkFiles
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date
from pyspark.sql.functions import randn #For generating dataframe 
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import corr

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
import pandas as pd
import numpy as np

<h1><b>Problem 1</b></h1>
<p>1. Load “linearRegressionData.csv” to Databricks environment.</p>
<p>2. Create a dataframe with 500k simulated data records from “linearRegressionData.csv”</p>
<p>3. Load “autoMPGDataModified.csv” to Databricks environment</p>
<p>4. Create a dataframe with 500k simulated data records from “autoMPGDataModified.csv”</p>

In [0]:
# File location and type
file_location1="dbfs:/FileStore/shared_uploads/h20210811@pilani.bits-pilani.ac.in/autoMPGDataModified___Sheet1-2.csv"
file_location2 ="dbfs:/FileStore/shared_uploads/h20210811@pilani.bits-pilani.ac.in/linearRegressionData_csv___Sheet1-1.csv"

# CSV options
auto = spark.read.csv(file_location1, inferSchema=True, header=True)
linreg=spark.read.csv(file_location2, inferSchema=True, header=True)
 
display(auto)
display(linreg)
# The applied options are for CSV files. For other file types, these will be ignored.

# Create temporary table called table1
#auto.createOrReplaceTempView('table1')

mpg,displacement,horsepower,weight,accelaration
18,307,18,3504,12.0
15,350,36,3693,11.5
18,318,30,3436,11.0
16,304,30,3433,12.0
17,302,25,3449,10.5


dvs1,ivs1,ivs2,ivs3
34.63,5.53,5.58,5.41
40.89,3.89,6.48,6.97
37.25,5.07,4.5,6.5
45.09,5.81,5.71,8.59
39.4,5.61,5.79,6.77


In [0]:
print("The correlation value between 'dvs1','ivs1' is:")
linreg.select(corr('dvs1','ivs1')).show()
print("The correlation value between ivs3, ivs1 is:")
linreg.select(corr('ivs3','ivs1')).show()
print("The correlation value between (ivs2, ivs1) is:")
linreg.select(corr('ivs2','ivs1')).show()

The correlation value between 'dvs1','ivs1' is:
+--------------------+
|    corr(dvs1, ivs1)|
+--------------------+
|0.018862790027389265|
+--------------------+

The correlation value between ivs3, ivs1 is:
+-------------------+
|   corr(ivs3, ivs1)|
+-------------------+
|0.12495881433301012|
+-------------------+

The correlation value between (ivs2, ivs1) is:
+-------------------+
|   corr(ivs2, ivs1)|
+-------------------+
|-0.3956898622806565|
+-------------------+



In [0]:
print("The correlation value between mpg and hp is:")
auto.select(corr('displacement','mpg')).show()
print("The correlation value between ivs3, ivs1 is:")
auto.select(corr('mpg','horsepower')).show()
print("The correlation value between (ivs2, ivs1) is:")
auto.select(corr('mpg','weight')).show()

The correlation value between mpg and hp is:
+-----------------------+
|corr(displacement, mpg)|
+-----------------------+
|     -0.605712761555994|
+-----------------------+

The correlation value between ivs3, ivs1 is:
+---------------------+
|corr(mpg, horsepower)|
+---------------------+
|  -0.7472185104389009|
+---------------------+

The correlation value between (ivs2, ivs1) is:
+------------------+
| corr(mpg, weight)|
+------------------+
|-0.655365824425268|
+------------------+



In [0]:
# Checking Schema of thre DataFrames
auto.printSchema()
linreg.printSchema()

root
 |-- mpg: integer (nullable = true)
 |-- displacement: integer (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- accelaration: double (nullable = true)

root
 |-- dvs1: double (nullable = true)
 |-- ivs1: double (nullable = true)
 |-- ivs2: double (nullable = true)
 |-- ivs3: double (nullable = true)



<b>Simulating the values for each column  based on given dataset using random function

In [0]:

N = 500000   

# For autompg.csv
mpgmean = auto1.agg({'mpg': 'mean'})
mpgstd = auto1['mpg'].std()
random_mpgmean = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)

displacementmean = auto1.agg({'displacement': 'mean'})
displacementstd = auto1['displacement'].std()
random_displacement = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)

horsepowermean = auto1.agg({'horsepower': 'mean'})
horsepowerstd = auto1['horsepower'].std()
random_horsepower = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)
 
weightmean = auto1.agg({'weight': 'mean'})
weightstd = auto1['weight'].std()
random_weight = np.random.normal(loc=mpgmean, scale=mpgstd, size=N)
 
accelarationmean = auto1.agg({'accelaration': 'mean'})
accelarationstd = auto1['accelaration'].std()
random_acceleration = np.random.normal(loc=accelarationmean, scale=accelarationstd, size=N)

# For linearregression.csv
dvs1mean = Lin1.agg({'dvs1': 'mean'})
dvs1std = Lin1['dvs1'].std()
random_dvs1mean = np.random.normal(loc=dvs1mean, scale=dvs1std, size=N)

ivs1mean = Lin1.agg({'ivs1': 'mean'})
ivs1std = Lin1['ivs1'].std()
random_ivs1mean = np.random.normal(loc=ivs1mean, scale=ivs1std, size=N)

ivs2mean = Lin1.agg({'ivs2': 'mean'})
ivs2std = Lin1['ivs2'].std()
random_ivs2mean = np.random.normal(loc=ivs2mean, scale=ivs2std, size=N)
 
ivs3mean = Lin1.agg({'ivs3': 'mean'})
ivs3std = Lin1['ivs3'].std()
random_ivs3mean = np.random.normal(loc=ivs3mean, scale=ivs3std, size=N)

In [0]:
# For autompg
autompg_f = pd.DataFrame()
autompg_f['mpg'] = random_mpgmean.tolist()
autompg_f['displacement'] = random_displacement.tolist()
autompg_f['horsepower'] = random_horsepower.tolist()
autompg_f['weight'] = random_weight.tolist()
autompg_f['accelaration'] = random_acceleration.tolist()

# For linearregression
linerregression = pd.DataFrame()
linerregression['dvs1'] = random_dvs1mean.tolist()
linerregression['ivs1'] = random_ivs1mean.tolist()
linerregression['ivs2'] = random_ivs2mean.tolist()
linerregression['ivs3'] = random_ivs3mean.tolist()

In [0]:
print(linerregression)
print(autompg_f)

             dvs1      ivs1      ivs2      ivs3
0       41.308862  5.051648  5.809135  7.063589
1       38.288574  5.049448  5.922033  8.061691
2       41.434754  4.910453  4.474090  7.196875
3       41.020711  3.785745  5.139271  6.177507
4       34.725896  4.960531  5.388795  7.820159
...           ...       ...       ...       ...
499995  40.677272  4.516008  6.194588  5.157014
499996  48.003026  4.750632  5.805241  8.373799
499997  35.141439  4.700952  4.942809  7.245192
499998  36.616068  5.881182  5.741033  6.942923
499999  33.022735  4.262840  5.613307  4.972120

[500000 rows x 4 columns]
              mpg  displacement  horsepower     weight  accelaration
0       17.589479     15.245794   14.259709  17.195711     11.549342
1       18.709555     15.795997   18.298434  16.740048     11.744508
2       13.846541     16.897805   16.549683  14.806752     11.199649
3       17.712052     14.829168   17.031808  16.957512     11.990440
4       17.684473     16.395498   16.363340  19.4052

##Perform the following operations

For data “linearRegressionData.csv”, transform Dataframe  to RDD.

Create an RDD of the labeled point.

Divide the data into a training and testing set.

Create a linear regression model.

Train and save the model.

Predict data using the saved model.

Evaluate the created model and check its accuracy.

In [0]:
# Created RDD FILE
linerregression = spark.createDataFrame(linerregression)
from pyspark.ml.linalg import Vectors

In [0]:
mmScaler = MinMaxScaler(outputCol="scaled")
mmScaler.setInputCol("dvs1","ivs1","ivs2","ivs3")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-2193568496500103>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0mmmScaler[0m [0;34m=[0m [0mMinMaxScaler[0m[0;34m([0m[0moutputCol[0m[0;34m=[0m[0;34m"scaled"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mmmScaler[0m[0;34m.[0m[0msetInputCol[0m[0;34m([0m[0;34m"dvs1"[0m[0;34m,[0m[0;34m"ivs1"[0m[0;34m,[0m[0;34m"ivs2"[0m[0;34m,[0m[0;34m"ivs3"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mTypeError[0m: setInputCol() takes 2 positional arguments but 5 were given

In [0]:
###RDD
regressionDataRDDDict = linerregression.rdd
regressionDataRDDDict.take(10)

Out[357]: [Row(dvs1=41.30886179984029, ivs1=5.051648255362939, ivs2=5.809135109945622, ivs3=7.0635890079200845),
 Row(dvs1=38.288573809407, ivs1=5.049448082823586, ivs2=5.922032800949869, ivs3=8.061690892048752),
 Row(dvs1=41.43475368485305, ivs1=4.910452535797273, ivs2=4.474090200950141, ivs3=7.196875492577794),
 Row(dvs1=41.0207107610983, ivs1=3.785744530957665, ivs2=5.139271171696292, ivs3=6.177507405098462),
 Row(dvs1=34.72589585034545, ivs1=4.960531249584766, ivs2=5.388795296274207, ivs3=7.820159201672228),
 Row(dvs1=39.71556578495135, ivs1=5.589221695064517, ivs2=5.472722697465352, ivs3=6.846412951993103),
 Row(dvs1=37.51964911392747, ivs1=5.257326644452297, ivs2=4.896186569556468, ivs3=5.245213052329007),
 Row(dvs1=40.895646907309235, ivs1=5.5212266598867314, ivs2=5.503206268108367, ivs3=4.780619181385862),
 Row(dvs1=43.850969631273, ivs1=6.443967857794298, ivs2=4.480905244802052, ivs3=7.575843161918518),
 Row(dvs1=38.11345737370924, ivs1=4.233944778089338, ivs2=4.93613739181346

In [0]:
###label point
from pyspark.mllib.regression import LabeledPoint
regressionDataLabelPoint = regressionDataRDDDict.map(lambda data : LabeledPoint((data[0]),data[1:4]))

In [0]:
regressionDataLabelPoint.take(10)

Out[343]: [LabeledPoint(45.52613317661517, [5.4871654128925,6.271137800300416,5.987197818911166]),
 LabeledPoint(43.39159163397081, [5.228766882805268,5.337471031723521,5.015986474540695]),
 LabeledPoint(41.35450579425902, [4.992433591029207,5.786327174132791,3.786640438685884]),
 LabeledPoint(41.44553667698284, [6.062628140269364,6.455863625503648,7.345205612834677]),
 LabeledPoint(38.56278480186328, [5.975876623321357,5.647243382923713,6.043684617039238]),
 LabeledPoint(37.66173092487132, [5.4224659604107766,5.6366313216024775,6.400513678883467]),
 LabeledPoint(39.7423363984811, [5.228566735375911,5.503357544330988,6.954763525752146]),
 LabeledPoint(39.85377395715628, [4.973115171772099,6.569755198352541,6.863701189180316]),
 LabeledPoint(38.054356453288584, [4.895466728884977,6.581231749518742,6.619040328583982]),
 LabeledPoint(37.06286522071685, [5.436500191268203,4.878856592745898,6.636976186329192])]

In [0]:
####Random splitting the data
regressionLabelPointSplit = regressionDataLabelPoint.randomSplit([0.7,0.3])

In [0]:
regressionLabelPointTrainData = regressionLabelPointSplit[0]
regressionLabelPointTrainData.take(5)

Out[345]: [LabeledPoint(43.39159163397081, [5.228766882805268,5.337471031723521,5.015986474540695]),
 LabeledPoint(41.44553667698284, [6.062628140269364,6.455863625503648,7.345205612834677]),
 LabeledPoint(38.56278480186328, [5.975876623321357,5.647243382923713,6.043684617039238]),
 LabeledPoint(37.66173092487132, [5.4224659604107766,5.6366313216024775,6.400513678883467]),
 LabeledPoint(39.7423363984811, [5.228566735375911,5.503357544330988,6.954763525752146])]

In [0]:
regressionLabelPointTrainData.count()

Out[346]: 349772

In [0]:
regressionLabelPointTestData = regressionLabelPointSplit[1]
regressionLabelPointTestData.take(5)

Out[347]: [LabeledPoint(45.52613317661517, [5.4871654128925,6.271137800300416,5.987197818911166]),
 LabeledPoint(41.35450579425902, [4.992433591029207,5.786327174132791,3.786640438685884]),
 LabeledPoint(38.054356453288584, [4.895466728884977,6.581231749518742,6.619040328583982]),
 LabeledPoint(38.671294878776685, [5.669195485436321,6.667702930332037,8.902547853238811]),
 LabeledPoint(44.7942028110455, [5.492802963852535,6.339059163007353,7.203373116696601])]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 100)
predic

In [0]:
####Applying SGD regression on the data
from pyspark.mllib.regression import LinearRegressionWithSGD as lrSGD
LinearRegression = lrSGD.train(data = regressionLabelPointTrainData,iterations = 1000,step = 0.02,intercept = True)



In [0]:
LinearRegression.intercept

Out[309]: 1.4232074864824948

In [0]:
LinearRegression.weights

Out[310]: DenseVector([1.9575, 2.157, 2.2595])

In [0]:
LinearRegression.save(sc, '/home/pysparkbook/LinearRegression_3')

In [0]:
from pyspark.mllib.regression import LinearRegressionModel as linearRegressModel
LinearRegressionReloaded = linearRegressModel.load(sc, '/home/pysparkbook/LinearRegression_3')

In [0]:
LinearRegressionReloaded.intercept

Out[312]: 1.0290428021676348

In [0]:
LinearRegressionReloaded.weights

Out[313]: DenseVector([0.1451, 0.1578, 0.1833])

In [0]:
actualDataandLinearRegressionPredictedData = regressionLabelPointTestData.map(lambda data : (float(data.label) , float(LinearRegression.predict(data.features))))

In [0]:
actualDataandLinearRegressionPredictedData.take(10)

Out[315]: [(37.659682132579626, 39.10676478032987),
 (37.3802089198305, 35.40517312136872),
 (35.7289119035477, 41.120827301800134),
 (44.0247493800571, 37.776378566595525),
 (44.34218698683625, 33.089477262347984),
 (31.422709145392837, 42.488093265558746),
 (44.566997877452565, 39.77660767676995),
 (41.14516234885478, 35.25032158731995),
 (37.12529888338562, 36.2942239506412),
 (40.10142752460363, 35.8456567805374)]

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
ourLinearRegressionModelMetrics = rmtrcs(actualDataandLinearRegressionPredictedData)
ourLinearRegressionModelMetrics.rootMeanSquaredError

Out[316]: 5.211336441549443

In [0]:
###the metrics showing negative value as the correlation got degrded due to the random values used in the data.
ourLinearRegressionModelMetrics.r2

Out[317]: -1.4032059646838118

In [0]:
ourModelWithLinearRegression = lrSGD.train(data = regressionLabelPointTrainData,iterations = 10000,step = 0.02,intercept = True)
actualDataandLinearRegressionPredictedData =regressionLabelPointTestData.map(lambda data : (float(data.label) ,float(ourModelWithLinearRegression.predict(data.features))))

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
ourLinearRegressionModelMetrics = rmtrcs(actualDataandLinearRegressionPredictedData)

In [0]:
ourLinearRegressionModelMetrics.rootMeanSquaredError

Out[319]: 5.211336441549443

In [0]:
ourLinearRegressionModelMetrics.r2

Out[320]: -1.4032059646838118

####Problem #3
Perform the following operations
1.	For data “autoMPGDataModified.csv”, transform Dataframe to RDD.
2.	Create an RDD of the labeled point.
3.	Divide the data into a training and testing set.
4.	Create a ridge regression model.
5.	Train and save the model.
6.	Predict data using the saved model.
7.	Evaluate the created model and check its accuracy.
8.	Create a ridge regression model.
9.	Train and save the model.
10.	Predict data using the saved model.
11.	Evaluate the created model and check its accuracy.

In [0]:
# Created RDD FILE
autompg_f = spark.createDataFrame(autompg_f)
from pyspark.ml.linalg import Vectors

In [0]:
autoDataRDDDict = autompg_f.rdd
autoDataRDDDict.take(5)

Out[245]: [Row(mpg=16.400277607578637, displacement=15.717580072508866, horsepower=16.14825428416662, weight=17.604465830908296, accelaration=10.773802105742542),
 Row(mpg=18.449786274825613, displacement=19.3164428198157, horsepower=15.65470612224941, weight=16.81203463709437, accelaration=11.474701386426034),
 Row(mpg=16.052393009390876, displacement=17.019027128223403, horsepower=15.648787350588735, weight=18.164677259947545, accelaration=12.025407741016389),
 Row(mpg=16.088306069394047, displacement=16.95935180388593, horsepower=15.92318842111331, weight=15.57610543105493, accelaration=10.87496158224014),
 Row(mpg=15.548039536692208, displacement=17.613661399550505, horsepower=19.61606938655108, weight=14.836433126260381, accelaration=10.850254285957636)]

In [0]:
###label point
from pyspark.mllib.regression import LabeledPoint
autoDataLabelPoint = autoDataRDDDict.map(lambda data :LabeledPoint(data[0],
[data[1],data[2],float(data[3]),data[4]]))

In [0]:
autoDataLabelPoint.take(5)

Out[250]: [LabeledPoint(16.400277607578637, [15.717580072508866,16.14825428416662,17.604465830908296,10.773802105742542]),
 LabeledPoint(18.449786274825613, [19.3164428198157,15.65470612224941,16.81203463709437,11.474701386426034]),
 LabeledPoint(16.052393009390876, [17.019027128223403,15.648787350588735,18.164677259947545,12.025407741016389]),
 LabeledPoint(16.088306069394047, [16.95935180388593,15.92318842111331,15.57610543105493,10.87496158224014]),
 LabeledPoint(15.548039536692208, [17.613661399550505,19.61606938655108,14.836433126260381,10.850254285957636])]

In [0]:
autoDataLabelPointSplit = autoDataLabelPoint.randomSplit([0.7,0.3])
autoDataLabelPointTrain = autoDataLabelPointSplit[0]
autoDataLabelPointTest = autoDataLabelPointSplit[1]
autoDataLabelPointTrain.take(5)

Out[251]: [LabeledPoint(16.400277607578637, [15.717580072508866,16.14825428416662,17.604465830908296,10.773802105742542]),
 LabeledPoint(16.052393009390876, [17.019027128223403,15.648787350588735,18.164677259947545,12.025407741016389]),
 LabeledPoint(16.088306069394047, [16.95935180388593,15.92318842111331,15.57610543105493,10.87496158224014]),
 LabeledPoint(15.548039536692208, [17.613661399550505,19.61606938655108,14.836433126260381,10.850254285957636]),
 LabeledPoint(17.34677876520053, [19.760204847217093,14.363984557596742,17.27450846174418,10.798155815653963])]

In [0]:
autoDataLabelPointTest.count()

Out[252]: 150688

In [0]:
autoDataLabelPointTrain.count()

Out[253]: 349312

In [0]:
from pyspark.mllib.regression import RidgeRegressionWithSGD as ridgeSGD
ModelWithRidge = ridgeSGD.train(data = autoDataLabelPointTrain,
iterations = 400,
step = 0.0005,
regParam = 0.05,
intercept = True
)



In [0]:
ModelWithRidge.intercept

Out[255]: 1.0157678339588603

In [0]:
ModelWithRidge.weights

Out[256]: DenseVector([0.2668, 0.2667, 0.2667, 0.1816])

In [0]:
actualDataandRidgePredictedData = autoDataLabelPointTest.map(lambda data
: [float(data.label) , float(ModelWithRidge.predict(data.features))])
actualDataandRidgePredictedData.take(5)

Out[257]: [[18.449786274825613, 16.912671558777774],
 [18.59413583609658, 16.24627344523812],
 [16.586192355761025, 16.846769616552407],
 [16.32172710344836, 17.792892958015802],
 [20.072272946623205, 16.312225399558994]]

In [0]:
RidgeModelMetrics =rmtrcs(actualDataandRidgePredictedData)

RidgeModelMetrics.rootMeanSquaredError


Out[258]: 1.4628787808056007

In [0]:
from pyspark.mllib.regression import LassoWithSGD as lassoSGD
ModelWithLasso = lassoSGD.train(data = autoDataLabelPointTrain,
iterations = 500, step = 0.001,regParam = 0.04, intercept = True)



In [0]:
ModelWithLasso.intercept

Out[260]: 1.0160538615645032

In [0]:
ModelWithLasso.weights

Out[261]: DenseVector([0.2707, 0.2706, 0.2706, 0.184])

In [0]:
actualDataandLassoPredictedData = autoDataLabelPointTest.map(lambda data: (float(data.label) , float(ModelWithLasso.predict(data.features))))
actualDataandLassoPredictedData.take(5)

Out[262]: [(18.449786274825613, 17.141850785929908),
 (18.59413583609658, 16.46582711464482),
 (16.586192355761025, 17.07516201934041),
 (16.32172710344836, 18.035279762833795),
 (20.072272946623205, 16.532990895964)]

In [0]:
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
LassoModelMetrics = rmtrcs(actualDataandLassoPredictedData)
LassoModelMetrics.rootMeanSquaredError

Out[263]: 1.4425550730421637

In [0]:
LassoModelMetrics.r2

Out[264]: -4.3732987967434385

%md
##The lassoregression gave a little less root mean squred error than the ridge regression.

Let’s recall, both in ridge and lasso we added a penalty term, but the term was different in both cases. In ridge, we used the squares of theta while in lasso we used absolute value of theta. 

Actually, there are different possible choices of regularization with different choices of order of the parameter in the regularization term. This is more generally known as Lp regularizer.

Migh be use of some other pregularisaton parameter can give better result.