In [0]:
# Data processing
import pandas as pd
 
# Create synthetic dataset
from sklearn.datasets import make_regression
 
# Modeling
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel



In [0]:
# Create a synthetic dataset
X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42)
 
# Convert the data from numpy array to a pandas dataframe
pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y})
 
# Convert pandas dataframe to spark dataframe
sdf = spark.createDataFrame(pdf)
 
# Check data summary statistics
display(sdf.summary())

summary,feature1,feature2,dependent_variable
count,1000000.0,1000000.0,1000000.0
mean,-0.0006102200210227977,-0.0012327895979428,1.906778587875913
stddev,1.0014933873914778,0.9999968356171214,89.32555339793132
min,-4.644418774315494,-4.829436008511157,-396.3047194004824
25%,-0.676728422223492,-0.6754954830067165,-58.45186882930967
50%,-0.0008527737039159014,-0.00012074818883556866,1.9691443593719715
75%,0.6741941373134125,0.6732219446041146,62.18962778929686
max,4.827622729327414,4.526783893260273,409.8745926247189


In [0]:
# Train test split
trainDF, testDF = sdf.randomSplit([.8, .2], seed=42)
 
# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')

There are 799662 records in the training dataset.
There are 200338 records in the testing dataset.


In [0]:
# Linear regression expect a vector input
vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
 
# Take a look at the data
display(vecTrainDF)

feature1,feature2,dependent_variable,features
-4.435060898724634,0.7481186722693874,-336.0190978167691,"Map(vectorType -> dense, length -> 2, values -> List(-4.435060898724634, 0.7481186722693874))"
-4.374042798818833,-0.1044070942259938,-360.619329256344,"Map(vectorType -> dense, length -> 2, values -> List(-4.374042798818833, -0.10440709422599387))"
-4.164294602643378,-1.267004372717662,-384.60376689707607,"Map(vectorType -> dense, length -> 2, values -> List(-4.164294602643378, -1.267004372717662))"
-4.157733877399132,-0.7219988725181353,-364.87371288642674,"Map(vectorType -> dense, length -> 2, values -> List(-4.1577338773991315, -0.7219988725181353))"
-4.069621097018043,-1.8157531141967664,-395.349694083717,"Map(vectorType -> dense, length -> 2, values -> List(-4.069621097018043, -1.8157531141967662))"
-3.976841142750735,2.309187824565308,-243.79144512861603,"Map(vectorType -> dense, length -> 2, values -> List(-3.976841142750735, 2.3091878245653077))"
-3.914323671759687,-1.1330921991036622,-359.32849436302075,"Map(vectorType -> dense, length -> 2, values -> List(-3.914323671759687, -1.1330921991036622))"
-3.8325492238785817,-0.8129531560361218,-341.00465323254883,"Map(vectorType -> dense, length -> 2, values -> List(-3.8325492238785817, -0.8129531560361218))"
-3.827165377856318,2.382839964691455,-228.45123989242504,"Map(vectorType -> dense, length -> 2, values -> List(-3.827165377856318, 2.382839964691455))"
-3.818922004078393,0.4743250869163463,-295.3573919722635,"Map(vectorType -> dense, length -> 2, values -> List(-3.818922004078393, 0.47432508691634634))"


In [0]:
# Create linear regression
lr = LinearRegression(featuresCol="features", labelCol="dependent_variable")
 
# Fit the linear regresssion model
lrModel = lr.fit(vecTrainDF)
 
# Print model intercept and coefficients
print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')

The intercept of the model is 2.00 and the coefficients of the model are 82.09 and 35.03


In [0]:
# Create pipeline
stages = [vecAssembler, lr]
pipeline = Pipeline(stages=stages)
 
# Fit the pipeline model
pipelineModel = pipeline.fit(trainDF)

In [0]:
# Make predictions on testing dataset
predDF = pipelineModel.transform(testDF)
 
# Take a look at the output
display(predDF.select("features", "dependent_variable", "prediction"))

features,dependent_variable,prediction
"Map(vectorType -> dense, length -> 2, values -> List(-4.368673045782893, -0.9128799643341927))",-388.24009268452994,-388.59567073275
"Map(vectorType -> dense, length -> 2, values -> List(-4.066894021062747, -0.21618931749243925))",-339.5096852803994,-339.41784635103244
"Map(vectorType -> dense, length -> 2, values -> List(-3.969358437333755, -0.6974418586710005))",-347.8019437391816,-348.2697225832952
"Map(vectorType -> dense, length -> 2, values -> List(-3.802631227728344, -0.8781062717292447))",-341.04649620036434,-340.9120820637722
"Map(vectorType -> dense, length -> 2, values -> List(-3.7519232892047705, 1.014927715371012))",-270.5955448087131,-270.436026714301
"Map(vectorType -> dense, length -> 2, values -> List(-3.7200606043665414, -1.4080359466533736))",-352.91660190734905,-352.6975815975568
"Map(vectorType -> dense, length -> 2, values -> List(-3.6770433667294666, -0.5767285683993942))",-320.26206192456345,-320.0454306470354
"Map(vectorType -> dense, length -> 2, values -> List(-3.625635847808651, 0.958426238606658))",-262.5163929035231,-262.0485580610854
"Map(vectorType -> dense, length -> 2, values -> List(-3.5861473869749982, -0.04075664488610765))",-294.1220213312634,-293.8086768578483
"Map(vectorType -> dense, length -> 2, values -> List(-3.5805308182256352, -0.03435579088607286))",-293.1150925108174,-293.12339807507215


In [0]:
# Create regression evaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse")
 
# RMSE
rmse = regressionEvaluator.evaluate(predDF)
print(f"The RMSE for the linear regression model is {rmse:0.2f}")
 
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(predDF)
print(f"The MSE for the linear regression model is {mse:0.2f}")
 
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"The R2 for the linear regression model is {r2:0.2f}")
 
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(predDF)
print(f"The MAE for the linear regression model is {mae:0.2f}")

The RMSE for the linear regression model is 0.30
The MSE for the linear regression model is 0.09
The R2 for the linear regression model is 1.00
The MAE for the linear regression model is 0.24


In [0]:
# Visualize the data
display(predDF.select("dependent_variable", "prediction"))

dependent_variable,prediction
-388.24009268452994,-388.59567073275
-339.5096852803994,-339.41784635103244
-347.8019437391816,-348.2697225832952
-341.04649620036434,-340.9120820637722
-270.5955448087131,-270.436026714301
-352.91660190734905,-352.6975815975568
-320.26206192456345,-320.0454306470354
-262.5163929035231,-262.0485580610854
-294.1220213312634,-293.8086768578483
-293.1150925108174,-293.12339807507215


In [0]:
# Path to save the model
pipelinePath = '/mnt/demo4tutorial/model/linear_regression_pipeline_model'
 
# Save the model to the path
pipelineModel.write().overwrite().save(pipelinePath)

In [0]:
dbutils.fs.ls('/mnt/demo4tutorial/model/linear_regression_pipeline_model')

Out[26]: [FileInfo(path='dbfs:/mnt/demo4tutorial/model/linear_regression_pipeline_model/metadata/', name='metadata/', size=0, modificationTime=1667510910000),
 FileInfo(path='dbfs:/mnt/demo4tutorial/model/linear_regression_pipeline_model/stages/', name='stages/', size=0, modificationTime=1667510910000)]

In [0]:
# Create a new synthetic dataset
X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0)
 
# Convert the data from numpy array to a pandas dataframe
pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new})
 
# Convert pandas dataframe to spark dataframe
sdf_new = spark.createDataFrame(pdf_new)
 
# Check data summary statistics
display(sdf_new.summary())


summary,feature1,feature2,dependent_variable
count,1000.0,1000.0,1000.0
mean,-0.0099032827888682,-0.0217364843851635,0.7087430813817325
stddev,0.982226788348624,0.9748727784260626,55.76774258077021
min,-3.046143054799926,-2.994612860227619,-159.07692656605417
25%,-0.6780954607862469,-0.6963266538610828,-39.70912792901036
50%,-0.0261922373442504,-0.0100564718690145,2.2161746425432725
75%,0.6103793791072052,0.6225191403049977,38.84658985699836
max,2.759355114021582,3.17097477329018,176.13380265435686


In [0]:
# Load the saved model
loadedPipelineModel = PipelineModel.load(pipelinePath)
 
# Make prediction for the new dataset
predDF_new = loadedPipelineModel.transform(sdf_new)
 
# Take a look at the data
display(predDF_new.select("features", "dependent_variable", "prediction"))

features,dependent_variable,prediction
"Map(vectorType -> dense, length -> 2, values -> List(0.49949823346865946, 1.2674116548186567))",73.54523642658489,87.40086650459135
"Map(vectorType -> dense, length -> 2, values -> List(-0.4779740040404867, -1.8430695501566485))",-91.9629865668512,-101.79929419993552
"Map(vectorType -> dense, length -> 2, values -> List(-0.13482245109435406, -1.62632193782683))",-68.1880325270124,-66.03780793869807
"Map(vectorType -> dense, length -> 2, values -> List(-0.46847604467972315, -0.4015575444993436))",-33.57876886244969,-50.52304364765236
"Map(vectorType -> dense, length -> 2, values -> List(0.3464944420067607, 1.1452621730192247))",62.17382193645796,70.56210168684026
"Map(vectorType -> dense, length -> 2, values -> List(-0.8889713580954499, -1.6812182154944335))",-101.69669128893716,-129.86770285733397
"Map(vectorType -> dense, length -> 2, values -> List(-0.39079799608101357, 0.25233143138452124))",-3.5783581539168674,-21.240660303996545
"Map(vectorType -> dense, length -> 2, values -> List(0.8024563957963952, -0.2680033709513804))",24.41545145604581,58.48418333282216
"Map(vectorType -> dense, length -> 2, values -> List(-0.6801782039968504, -0.11816404512856976))",-30.362408218587987,-57.97397564334459
"Map(vectorType -> dense, length -> 2, values -> List(2.4124536795437486, -0.5028167006425383))",80.58222468131213,182.42067510286088


In [0]:
display(predDF_new.select("dependent_variable", "prediction"))

dependent_variable,prediction
73.54523642658489,87.40086650459135
-91.9629865668512,-101.79929419993552
-68.1880325270124,-66.03780793869807
-33.57876886244969,-50.52304364765236
62.17382193645796,70.56210168684026
-101.69669128893716,-129.86770285733397
-3.5783581539168674,-21.240660303996545
24.41545145604581,58.48418333282216
-30.362408218587987,-57.97397564334459
80.58222468131213,182.42067510286088
