# Linear Regression with PySpark.

The idea of this workbook is to get familiarised with the pyspark package for ML.

In [1]:
from pyspark.sql import SparkSession

Creating the spark session

In [2]:
spark=SparkSession.builder.appName('LinearRegression').getOrCreate()

Downloading the dataset

In [3]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/Ecommerce_Customers.csv >> Ecommerce_Customers.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 86871  100 86871    0     0   130k      0 --:--:-- --:--:-- --:--:--  130k


In [5]:
df=spark.read.csv('docs/Ecommerce_Customers.csv', header=True, inferSchema=True)

In [6]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [7]:
df.head()

Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)

## Setup dataframe for ML

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [9]:
df.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [10]:
assembler=VectorAssembler(inputCols=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'], outputCol='features')

In [11]:
output=assembler.transform(df)

In [13]:
final_df=output.select('features', 'Yearly Amount Spent')

In [14]:
train_data, test_data = final_df.randomSplit([0.7,0.3])

In [16]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                357|
|   mean|  496.2043447343574|
| stddev|   82.3535565831607|
|    min| 256.67058229005585|
|    max|  744.2218671047146|
+-------+-------------------+



In [17]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                143|
|   mean|  507.0773990148931|
| stddev|  70.84127600809234|
|    min| 308.52774655803336|
|    max|  765.5184619388373|
+-------+-------------------+



In [18]:
from pyspark.ml.regression import LinearRegression

In [19]:
lm = LinearRegression(labelCol='Yearly Amount Spent')

In [20]:
model = lm.fit(train_data)

In [21]:
import pandas as pd

In [22]:
pd.DataFrame({"Coefficients": model.coefficients}, index=['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'])

Unnamed: 0,Coefficients
Avg Session Length,25.697673
Time on App,38.450908
Time on Website,0.021401
Length of Membership,61.770985


In [23]:
res = model.evaluate(test_data)

In [24]:
res.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -4.072083963788089|
|-3.2380514658606785|
| -6.206416037537622|
| -6.831774257770576|
| 11.130070175274568|
| 19.770281393793596|
|  4.278108557898008|
|  3.732941589343625|
|-5.4444720515215295|
| -7.958913133816338|
|   5.27521410547007|
| 18.403860375693966|
|  7.618650827461522|
| -6.441889262210566|
|-1.3074797653404744|
|-5.9007853239253905|
|-3.8496372105240084|
| -11.66548658868237|
|  7.600489334197334|
|  8.315836591191612|
+-------------------+
only showing top 20 rows



In [25]:
unlabeled_data = test_data.select('features')

In [26]:
predictions = model.transform(unlabeled_data)

In [27]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[30.8364326747734...| 471.5739843907777|
|[30.8794843441274...|493.44465145071536|
|[31.0613251567161...| 493.7618740954392|
|[31.1280900496166...| 564.0844610048252|
|[31.1695067987115...|416.22646062701824|
|[31.3123495994443...|  443.821136634147|
|[31.3584771924370...| 490.8978418915774|
|[31.4459724827577...|481.14402334578494|
|[31.5257524169682...| 449.4100988614034|
|[31.5261978982398...|417.05343932615415|
|[31.5316044825729...| 431.2403916238925|
|[31.6005122003032...|460.76899111540297|
|[31.6548096756927...|  467.644772900087|
|[31.7207699002873...| 545.2168227402335|
|[31.7216523605090...| 349.0844063972131|
|[31.7242025238451...| 509.2886726118859|
|[31.7656188210424...|500.40371884613114|
|[31.8093003166791...| 548.4373859515235|
|[31.8209982016720...|  417.074791679016|
|[31.8512531286083...| 464.6764100756068|
+--------------------+------------

In [28]:
print("MAE: ", res.meanAbsoluteError)
print("MSE: ", res.meanSquaredError)
print("RMSE: ", res.rootMeanSquaredError)
print("R2: ", res.r2)
print("Adj R2: ", res.r2adj)

MAE:  8.227879172405556
MSE:  110.51137241068629
RMSE:  10.512438937310709
R2:  0.9778240662737375
Adj R2:  0.9771812855860198


Source: <https://www.youtube.com/watch?v=2m9xI4gs3HM>