In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *

#spark=SparkSession.builder.master("local").setApp("Linear Regression Example").getOrCreate()

df=spark.read.format("csv").options(inferSchema="true",header="true").load("/FileStore/tables/Linear_regression_dataset.csv")
print(df.show())
print("Getting the Shape of the Data")
print("Num of Rows:{0} Num of Columns:{1}".format(df.count(), len(df.columns)))
print("\nViewing the Schema of the Data")
print(df.printSchema)


In [2]:
print("\nViewing the Statistics of the Table")
print(df.describe().show())

print("Determing Co-Relation between Input and Output Columns")
print("Correlation between var_1 and label : {0}".format(df.select(corr('var_1','label')).show()))
print("Correlation between var_2 and label : {0}".format(df.select(corr('var_2','label')).show()))
print("Correlation between var_3 and label : {0}".format(df.select(corr('var_3', 'label')).show()))
print("Correlation between var_4 and label : {0}".format(df.select(corr('var_4', 'label')).show()))
print("Correlation between var_5 and label : {0}".format(df.select(corr('var_5', 'label')).show()))	  

In [3]:
# Feature Engineering
# This is the part where we combine all our input features into a Single Vector using Spark's VectorAssembler
# It creates only a single feature that captures the input values for that Row
# Hence instead of 5 Input Columns it essentially merges all input columns into a Single Feature Vector Column

from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
# We can select the list of the Columns that are to be assembled within the Vector and can pass only those columns to VectorAssembler
print(df.columns)

# Creating a VectorAssembler object by passing the required Input Features
vec_assembler=VectorAssembler(inputCols=['var_1','var_2','var_3','var_4','var_5'],outputCol='features')
# Applying the created vec_assembler over our Input DataFrame by calling transform function
features_df=vec_assembler.transform(df)
print(features_df.printSchema())
print(features_df.show(truncate=False))
# One Additional column named "features" that contains single Dense Vector for all of the inputs
print(features_df.select("features").show(truncate=False))




      

In [4]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

# Now we can take only the features column (DenseVector containing our Input Features) and ouput Column and create a Separate DataFrame
model_df=features_df.select("features","label")
print(model_df.show(truncate=False))
print("Checking the Shape of the Model DataSet")
print("Num. of Rows: {0} Num. of Columns: {1}".format(model_df.count(), len(model_df.columns)))
# STEP_5 Splitting the DataSet (Apply randomSplit() function)
"""We have to split out DataSet into training and test dataset in order to train and evaluate the Performance of Linear Regression Model 
We Split it into 70%, 30% ratio to train our Model on 70% of the DataSet. """
train_df,test_df=model_df.randomSplit([0.7,0.3])      

print("Printing Shape of Training and test DataSets")
print("Training DataSet")
print("Num of Rows: {0} Num. of Columns: {1}".format(train_df.count(),len(train_df.columns)))
print("Test DataSet")
print("Num of Rows: {0} Num. of Columns: {1}".format(test_df.count(),len(test_df.columns)))

In [5]:
# STEP_6: Build and Train Linear Regression Model  (fit(train_df) method of pyspark.ml.regression.LinearRegression)
                                                    model.evaluate(train_df) -> Apply the developed model over the train DataFrame
                                                    modelPrediction1.r2 ->  Retrieve the accuracy % of the model based of training Data
                                                    model.evaluate(test_df) -> Apply the developed  model over the test DataFrame
                                                    modelPrediction2.r2 -> Retrieve the Accuracy % of model based of test Data
                                                    modelPrediction2.meanSquaredError -> Compute the accuracy of the Model 
"""
In this step we build and train the Linear Regression Model using Features of the Input and Output Columns
We can fetch the Co-efficients (B1, B2, B3, B4, B5) for each of the Features and Intercept (B0) values of the model as well
We can also evaluate the Performance of the Model on Training Data as well using r2. 
"""

from pyspark.ml.regression import LinearRegression
lin_Reg=LinearRegression(labelCol="label")

lr_model=lin_Reg.fit(train_df)
print(lr_model.coefficients)
print(lr_model.intercept)
# Evaluating the Model on the Training DataSet
training_predictions=lr_model.evaluate(train_df)
# Fetching the accuracy of the Model on Training Data
print(training_predictions.r2)

# STEP_7: Evaluate Linear Regression Model on Test Data ()
# Evaluating the Model on Test DataSet
test_predictions=lr_model.evaluate(test_df)
# Checking the accuracy of the Model in Test Dataset
print(test_predictions.r2)
# Compute the mean Squared Error 
print("Mean Squared Error: {0}".format(test_predictions.meanSquaredError))