# Linear Regression using Pyspark

In [1]:
#Some Options about pandas and matplotlib
import pandas as pd
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.max_columns', None) 

import matplotlib 
matplotlib.rc('xtick', labelsize=15) 
matplotlib.rc('ytick', labelsize=15) 
matplotlib.rc('axes', labelsize=20, titlesize=25)
matplotlib.rc('figure', titlesize=25)

#Disable Warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
#create sparksession object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/11 21:31:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/11 21:31:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/01/11 21:31:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/01/11 21:31:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/01/11 21:31:15 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [3]:
#import Linear Regression from spark's MLlib
from pyspark.ml.regression import LinearRegression

In [4]:
#Load the dataset
df = spark.read.csv('Linear_regression_dataset.csv',inferSchema = True, header = True)

                                                                                

In [5]:
#shape of dataframe
print('The shape of given dataset is ({m} , {n})'.format(m = df.count(), n = len(df.columns)))

The shape of given dataset is (1232 , 6)


In [6]:
# Statistical summury of dataset
df.describe().show()

+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|            var_1|            var_2|             var_3|               var_4|               var_5|             output|
+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|             1232|             1232|              1232|                1232|                1232|               1232|
|   mean|715.0819805194806|715.0819805194806| 80.90422077922078|  0.3263311688311693| 0.25927272727272715|0.39734172077922014|
| stddev| 91.5342940441652|93.07993263118064|11.458139049993724|0.015012772334166148|0.012907228928000298|0.03326689862173776|
|    min|              463|              472|                40|               0.277|               0.214|              0.301|
|    max|             1009|             1103|               116|               0.373|               0.294|     

In [7]:
#explore the data
df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- output: double (nullable = true)



In [8]:
#view statistical measures of data 
df.describe().show(10, truncate = True)

+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|            var_1|            var_2|             var_3|               var_4|               var_5|             output|
+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|             1232|             1232|              1232|                1232|                1232|               1232|
|   mean|715.0819805194806|715.0819805194806| 80.90422077922078|  0.3263311688311693| 0.25927272727272715|0.39734172077922014|
| stddev| 91.5342940441652|93.07993263118064|11.458139049993724|0.015012772334166148|0.012907228928000298|0.03326689862173776|
|    min|              463|              472|                40|               0.277|               0.214|              0.301|
|    max|             1009|             1103|               116|               0.373|               0.294|     

In [9]:
#sneak into the dataset
df.head(3)

[Row(var_1=734, var_2=688, var_3=81, var_4=0.328, var_5=0.259, output=0.418),
 Row(var_1=700, var_2=600, var_3=94, var_4=0.32, var_5=0.247, output=0.389),
 Row(var_1=712, var_2=705, var_3=93, var_4=0.311, var_5=0.247, output=0.417)]

In [10]:
#import corr function from pyspark functions
from pyspark.sql.functions import corr

In [11]:
# check for correlation
df.select(corr('var_1','output')).show()

+-------------------+
|corr(var_1, output)|
+-------------------+
| 0.9187399607627283|
+-------------------+



In [12]:
#import vectorassembler to create dense vectors
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [13]:
#select the columns to create input vector
df.columns

['var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'output']

In [14]:
#create the vector assembler 
vec_assmebler = VectorAssembler(inputCols = ['var_1', 'var_2', 'var_3',\
                                             'var_4', 'var_5'], outputCol = 'features')

In [15]:
#transform the values
features_df = vec_assmebler.transform(df)

In [16]:
#validate the presence of dense vectors 
features_df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- output: double (nullable = true)
 |-- features: vector (nullable = true)



In [17]:
#view the details of dense vector
features_df.select('features').show(5,False)

+------------------------------+
|features                      |
+------------------------------+
|[734.0,688.0,81.0,0.328,0.259]|
|[700.0,600.0,94.0,0.32,0.247] |
|[712.0,705.0,93.0,0.311,0.247]|
|[734.0,806.0,69.0,0.315,0.26] |
|[613.0,759.0,61.0,0.302,0.24] |
+------------------------------+
only showing top 5 rows



In [18]:
#create data containing input features and output column
model_df = features_df.select('features','output')

In [19]:
model_df.show(10,truncate = False)

+------------------------------+------+
|features                      |output|
+------------------------------+------+
|[734.0,688.0,81.0,0.328,0.259]|0.418 |
|[700.0,600.0,94.0,0.32,0.247] |0.389 |
|[712.0,705.0,93.0,0.311,0.247]|0.417 |
|[734.0,806.0,69.0,0.315,0.26] |0.415 |
|[613.0,759.0,61.0,0.302,0.24] |0.378 |
|[748.0,676.0,85.0,0.318,0.255]|0.422 |
|[669.0,588.0,97.0,0.315,0.251]|0.411 |
|[667.0,845.0,68.0,0.324,0.251]|0.381 |
|[758.0,890.0,64.0,0.33,0.274] |0.436 |
|[726.0,670.0,88.0,0.335,0.268]|0.422 |
+------------------------------+------+
only showing top 10 rows



In [20]:
#shape of dataframe
print('The shape of given dataset is ({m} , {n})'.format(m = model_df.count(), n = len(model_df.columns)))

The shape of given dataset is (1232 , 2)


### Split Data - Train & Test sets


In [21]:
#split the data into 70/30 ratio for train test purpose
train_df, test_df = model_df.randomSplit([0.7,0.3])

In [22]:
#shape of dataframe
print('The shape of train dataset is ({m} , {n})'.format(m = train_df.count(), n = len(train_df.columns)))
print('\n')
print('The shape of test  dataset is ({m} , {n})'.format(m = test_df.count(), n = len(test_df.columns)))

The shape of train dataset is (842 , 2)


The shape of test  dataset is (390 , 2)


In [23]:
train_df.describe().show()

+-------+--------------------+
|summary|              output|
+-------+--------------------+
|  count|                 842|
|   mean|  0.3977339667458425|
| stddev|0.033546042835984516|
|    min|               0.301|
|    max|               0.491|
+-------+--------------------+



## Build Linear Regression Model 

In [24]:
#Build Linear Regression model 
lin_Reg = LinearRegression(labelCol = 'output')

In [25]:
#fit the linear regression model on training data set 
lr_model = lin_Reg.fit(train_df)

22/01/11 21:31:31 WARN Instrumentation: [aa0275be] regParam is zero, which might cause numerical instability and overfitting.
22/01/11 21:31:31 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/11 21:31:31 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/11 21:31:31 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [26]:
import numpy as np

print(np.hstack((lr_model.intercept, lr_model.coefficients)))

[ 1.70766333e-01  3.29569912e-04  5.17359371e-05  1.51448947e-04
 -5.91191628e-01  5.17034270e-01]


In [27]:
training_predictions = lr_model.evaluate( train_df)

In [28]:
training_predictions.meanSquaredError

0.00014458019229341882

In [29]:
training_predictions.r2

0.8713699912097836

In [30]:
#make predictions on test data 
test_results = lr_model.evaluate(test_df)

In [31]:
#view the residual errors based on predictions 
test_results.residuals.show(10)

+--------------------+
|           residuals|
+--------------------+
|0.009681496978953541|
|0.007428594668491162|
|-0.00418684561471...|
|-6.90801651783812...|
|0.002146774005926555|
|-0.00153811602671...|
|-0.00543891245928...|
|0.001607577891647...|
|-0.01095239631638...|
|0.009676027530104658|
+--------------------+
only showing top 10 rows



In [32]:
#coefficient of determination value for model
test_results.r2

0.8624849270710128

In [33]:
test_results.rootMeanSquaredError

0.012104181079393072

In [34]:
test_results.meanSquaredError

0.00014651119960273725