### Linear Regression

Initial Spark Session and provide app name as Linear Regression

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Linear Regression").getOrCreate()

Reading the csv file and saving as spark Dataframe

In [4]:
df = spark.read.csv("/Users/jessu/Documents/Spark ML/Linear_regression.csv",header=True,inferSchema=True)

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
df.show(5)

+-----+-----+-----+-----+-----+------+
|var_1|var_2|var_3|var_4|var_5|output|
+-----+-----+-----+-----+-----+------+
|  734|  688|   81|0.328|0.259| 0.418|
|  700|  600|   94| 0.32|0.247| 0.389|
|  712|  705|   93|0.311|0.247| 0.417|
|  734|  806|   69|0.315| 0.26| 0.415|
|  613|  759|   61|0.302| 0.24| 0.378|
+-----+-----+-----+-----+-----+------+
only showing top 5 rows



This can be utlitzed to get the size of the dataframe

In [11]:
print((df.count(),len(df.columns)))

(1232, 6)


In [12]:
df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- output: double (nullable = true)



In [14]:
df.describe().show()

+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|            var_1|            var_2|             var_3|               var_4|               var_5|             output|
+-------+-----------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|             1232|             1232|              1232|                1232|                1232|               1232|
|   mean|715.0819805194806|715.0819805194806| 80.90422077922078|  0.3263311688311693| 0.25927272727272715|0.39734172077922014|
| stddev| 91.5342940441652|93.07993263118064|11.458139049993724|0.015012772334166148|0.012907228928000298|0.03326689862173776|
|    min|              463|              472|                40|               0.277|               0.214|              0.301|
|    max|             1009|             1103|               116|               0.373|               0.294|     

In [15]:
df.head(3)

[Row(var_1=734, var_2=688, var_3=81, var_4=0.328, var_5=0.259, output=0.418),
 Row(var_1=700, var_2=600, var_3=94, var_4=0.32, var_5=0.247, output=0.389),
 Row(var_1=712, var_2=705, var_3=93, var_4=0.311, var_5=0.247, output=0.417)]

Importing necessary functions for calculating the correlation between any two variables

In [17]:
from pyspark.sql.functions import corr

In [21]:
df.select(corr("var_1","output")).show()

+-------------------+
|corr(var_1, output)|
+-------------------+
| 0.9187399607627283|
+-------------------+



### Feature Engineering

Importing all required packages to transoorm the given data into vector of input variables and diving the whole data into Test and Train data

In [23]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [37]:
cols = df.columns

In [38]:
cols.pop(5)

'output'

In [39]:
cols

['var_1', 'var_2', 'var_3', 'var_4', 'var_5']

In [40]:
vec_Assembly = VectorAssembler(inputCols=cols,outputCol='features')

In [41]:
feature_df = vec_Assembly.transform(df)

In [42]:
type(feature_df)

pyspark.sql.dataframe.DataFrame

In [43]:
feature_df.printSchema()

root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- output: double (nullable = true)
 |-- features: vector (nullable = true)



In [44]:
feature_df.show(2)

+-----+-----+-----+-----+-----+------+--------------------+
|var_1|var_2|var_3|var_4|var_5|output|            features|
+-----+-----+-----+-----+-----+------+--------------------+
|  734|  688|   81|0.328|0.259| 0.418|[734.0,688.0,81.0...|
|  700|  600|   94| 0.32|0.247| 0.389|[700.0,600.0,94.0...|
+-----+-----+-----+-----+-----+------+--------------------+
only showing top 2 rows



In [46]:
model_df = feature_df.select("features","Output")

In [49]:
model_df.show(4,False)

+------------------------------+------+
|features                      |Output|
+------------------------------+------+
|[734.0,688.0,81.0,0.328,0.259]|0.418 |
|[700.0,600.0,94.0,0.32,0.247] |0.389 |
|[712.0,705.0,93.0,0.311,0.247]|0.417 |
|[734.0,806.0,69.0,0.315,0.26] |0.415 |
+------------------------------+------+
only showing top 4 rows



Splitting the data into Train and Test Data

In [51]:
train_df, test_df = model_df.randomSplit([0.7,0.3])

In [56]:
model_df.count()

1232

In [58]:
train_df.count()

873

In [55]:
test_df.count()

359

In [63]:
print((train_df.count(),len(train_df.columns)))

(873, 2)


In [62]:
print((test_df.count(),len(test_df.columns)))

(359, 2)


### Building and train the model

In [64]:
from pyspark.ml.regression import LinearRegression

In [67]:
lin_Reg = LinearRegression(labelCol='Output')

In [68]:
lr_model = lin_Reg.fit(train_df)

In [69]:
print(lr_model.coefficients)

[0.00033679848250219697,4.938869021560889e-05,0.00016070548637789843,-0.6391322355540211,0.5245061054157909]


In [81]:
print(lr_model.intercept)

0.180783336259407


In [82]:
train_prediction = lr_model.evaluate(train_df)

In [83]:
print(train_prediction.r2)

0.8683709968290028


In [72]:
test_prediction = lr_model.evaluate(test_df)

In [79]:
print(test_prediction.r2)

0.8710955796624411


In [84]:
print(test_prediction.meanSquaredError)

0.00014359597161056826
