In [1]:
from pyspark.sql import SparkSession

In [5]:
spark_session = SparkSession.builder.appName("linear_regression").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/09 08:19:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/09 08:19:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/09 08:19:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/09 08:19:49 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [6]:
spark_session

In [7]:
# Read tips dataset
tips = spark_session.read.csv("./data/tips.csv", header= True, inferSchema=True)
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [8]:
# Get schema of tips dataset
tips.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



### Handling Categorical Features

In [10]:
from pyspark.ml.feature import StringIndexer

In [12]:
# Encoding single columns
indexer = StringIndexer(inputCol="sex", outputCol="sex_encoded")
df = indexer.fit(tips).transform(tips)
df.show()

                                                                                

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_encoded|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [13]:
# Encoding multiple columns
indexer = StringIndexer(inputCols=["smoker","day", "time"], outputCols=["smoker_encoded", "day_encoded", "time_encoded"])
df = indexer.fit(df).transform(df)
df.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_encoded|smoker_encoded|day_encoded|time_encoded|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [14]:
df.columns

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_encoded',
 'smoker_encoded',
 'day_encoded',
 'time_encoded']

### Train-Test Split

In [17]:
from pyspark.ml.feature import VectorAssembler
features_assembler = VectorAssembler(inputCols=["tip","size","sex_encoded", "smoker_encoded", "day_encoded", "time_encoded"], 
                outputCol="independent_input_features")
df_intermediate = features_assembler.transform(df)
                

In [18]:
df_intermediate.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_encoded|smoker_encoded|day_encoded|time_encoded|independent_input_features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|      [1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|      [1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|      [3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|      [3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|      [3.61,

In [19]:
# Let's select input and output features from the above dataframe
df_final = df_intermediate.select("independent_input_features", "total_bill")
df_final.show()

+--------------------------+----------+
|independent_input_features|total_bill|
+--------------------------+----------+
|      [1.01,2.0,1.0,0.0...|     16.99|
|      [1.66,3.0,0.0,0.0...|     10.34|
|      [3.5,3.0,0.0,0.0,...|     21.01|
|      [3.31,2.0,0.0,0.0...|     23.68|
|      [3.61,4.0,1.0,0.0...|     24.59|
|      [4.71,4.0,0.0,0.0...|     25.29|
|      [2.0,2.0,0.0,0.0,...|      8.77|
|      [3.12,4.0,0.0,0.0...|     26.88|
|      [1.96,2.0,0.0,0.0...|     15.04|
|      [3.23,2.0,0.0,0.0...|     14.78|
|      [1.71,2.0,0.0,0.0...|     10.27|
|      [5.0,4.0,1.0,0.0,...|     35.26|
|      [1.57,2.0,0.0,0.0...|     15.42|
|      [3.0,4.0,0.0,0.0,...|     18.43|
|      [3.02,2.0,1.0,0.0...|     14.83|
|      [3.92,2.0,0.0,0.0...|     21.58|
|      [1.67,3.0,1.0,0.0...|     10.33|
|      [3.71,3.0,0.0,0.0...|     16.29|
|      [3.5,3.0,1.0,0.0,...|     16.97|
|      (6,[0,1],[3.35,3.0])|     20.65|
+--------------------------+----------+
only showing top 20 rows



In [20]:
# Train-test split
# 80:20 split
train_data, test_data = df_final.randomSplit([0.8, 0.2])

### Train Data

In [23]:
from pyspark.ml.regression import LinearRegression

In [29]:
regressor = LinearRegression(featuresCol="independent_input_features", labelCol="total_bill")
model = regressor.fit(train_data) # By default it will be trained for 100 epochs

23/10/09 08:42:02 WARN Instrumentation: [454e5520] regParam is zero, which might cause numerical instability and overfitting.


In [33]:
model.coefficients # Learned cofficients for all independent features

DenseVector([3.0236, 3.8157, -0.7053, 1.9901, -0.1141, -1.7917])

In [34]:
model.intercept

1.1277017403394627

In [40]:
model.getLoss()

'squaredError'

### Evaluation

In [35]:
results = model.evaluate(test_data)

In [36]:
results.predictions.show()

+--------------------------+----------+------------------+
|independent_input_features|total_bill|        prediction|
+--------------------------+----------+------------------+
|      (6,[0,1],[1.25,2.0])|     10.07|12.538452469799129|
|       (6,[0,1],[2.0,3.0])|     16.31|18.621772773290736|
|      (6,[0,1],[3.18,2.0])|     19.82|18.373922324280883|
|      (6,[0,1],[3.39,2.0])|     11.61|19.008869821400666|
|      (6,[0,1],[5.92,3.0])|     29.03|30.474126052859894|
|       (6,[0,1],[9.0,4.0])|     48.33|43.602340014870926|
|      [1.0,2.0,0.0,1.0,...|      12.6|13.772706031229564|
|      [1.25,2.0,1.0,0.0...|      8.51| 9.813274158429595|
|      [1.5,2.0,0.0,0.0,...|     19.08|11.274500151336149|
|      [1.5,2.0,0.0,1.0,...|     11.59|15.284485786276653|
|      [1.57,2.0,0.0,0.0...|     15.42|13.391904954237534|
|      [1.71,2.0,0.0,0.0...|     10.27|13.815203285650718|
|      [1.92,1.0,0.0,1.0...|      8.58|10.604801354816974|
|      [2.0,2.0,0.0,1.0,...|     14.48|16.68217898253200

### Performance Metrics

In [37]:
results.r2 

0.5714243193288067

In [38]:
results.meanAbsoluteError

4.319487635795117

In [39]:
results.meanSquaredError

29.80768188172504

### Save Model

In [41]:
model.save("./models/tips_regression_model")

                                                                                

### Train for 1000 epochs and evaluate Results

In [42]:
regressor = LinearRegression(maxIter=1000,featuresCol="independent_input_features", labelCol="total_bill")
model = regressor.fit(train_data) 

23/10/09 08:54:38 WARN Instrumentation: [42b2613d] regParam is zero, which might cause numerical instability and overfitting.


In [43]:
results = model.evaluate(test_data)
results.predictions.show() # Not, satisfactory

+--------------------------+----------+------------------+
|independent_input_features|total_bill|        prediction|
+--------------------------+----------+------------------+
|      (6,[0,1],[1.25,2.0])|     10.07|12.538452469799129|
|       (6,[0,1],[2.0,3.0])|     16.31|18.621772773290736|
|      (6,[0,1],[3.18,2.0])|     19.82|18.373922324280883|
|      (6,[0,1],[3.39,2.0])|     11.61|19.008869821400666|
|      (6,[0,1],[5.92,3.0])|     29.03|30.474126052859894|
|       (6,[0,1],[9.0,4.0])|     48.33|43.602340014870926|
|      [1.0,2.0,0.0,1.0,...|      12.6|13.772706031229564|
|      [1.25,2.0,1.0,0.0...|      8.51| 9.813274158429595|
|      [1.5,2.0,0.0,0.0,...|     19.08|11.274500151336149|
|      [1.5,2.0,0.0,1.0,...|     11.59|15.284485786276653|
|      [1.57,2.0,0.0,0.0...|     15.42|13.391904954237534|
|      [1.71,2.0,0.0,0.0...|     10.27|13.815203285650718|
|      [1.92,1.0,0.0,1.0...|      8.58|10.604801354816974|
|      [2.0,2.0,0.0,1.0,...|     14.48|16.68217898253200

In [44]:
results.meanSquaredError

29.80768188172504