# Final Project - Linear Regression

Predict `total_bill` based on the independent features of a (relatively) large data set.


In [2]:
# Create a Spark session and load the data set

from pyspark.sql import SparkSession

spark_session = SparkSession.builder.appName(
    "Lesson 8 - Linear Regression").getOrCreate()

data_frame = spark_session.read.csv('tips.csv', header=True, inferSchema=True)
data_frame.show()

22/11/03 19:02:09 WARN Utils: Your hostname, dev1 resolves to a loopback address: 127.0.1.1; using 192.168.0.59 instead (on interface wlp2s0)
22/11/03 19:02:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/03 19:02:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner

In [3]:
data_frame.printSchema()
data_frame.columns

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [4]:
# Handling categorical features
# Index string categories and enumerate them: sex, smoker, day, time

from pyspark.ml.feature import StringIndexer

# Index sex
indexer = StringIndexer(inputCol="sex", outputCol="sex_indexed")
data_frame = indexer.fit(data_frame).transform(data_frame)
data_frame.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [5]:
# Index smoker, day and time
columns = ['smoker', 'day', 'time']
indexer = StringIndexer(inputCols=columns,
                        outputCols=["{}_indexed".format(c) for c in columns])
data_frame = indexer.fit(data_frame).transform(data_frame)
data_frame.show()


+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [6]:
# Group independent features with VectorAssembler

from pyspark.ml.feature import VectorAssembler

independent_columns = ['tip', 'size', 'sex_indexed',
                       'smoker_indexed', 'day_indexed', 'time_indexed']
featur_asm = VectorAssembler(inputCols=independent_columns,
                outputCol='Independent Features')
data_frame = featur_asm.transform(data_frame)
data_frame.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|S

In [7]:
# Isolate inputs and outputs
finalized_data = data_frame.select('Independent Features', 'total_bill')
finalized_data.show()


+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [8]:
# Create and train a linear regression model

from pyspark.ml.regression import LinearRegression

train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
print(train_data.count())
print(test_data.count())
regressor = LinearRegression(
    featuresCol='Independent Features', labelCol='total_bill')
regressor = regressor.fit(train_data)

178
66
22/11/03 19:02:31 WARN Instrumentation: [30225f0f] regParam is zero, which might cause numerical instability and overfitting.


In [9]:
# Coefficients
regressor.coefficients

DenseVector([3.04, 3.1195, -1.1612, 1.6494, -0.2981, -1.0246])

In [10]:
# Intercept
regressor.intercept

2.886605395147833

In [11]:
# Compare results to test data
results = regressor.evaluate(test_data)
results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.07|12.925687551372441|
|(6,[0,1],[1.45,2.0])|      9.55|13.533690954681852|
| (6,[0,1],[2.0,2.0])|     12.69|15.205700313782735|
|(6,[0,1],[2.31,3.0])|     18.69|19.267636031682713|
| (6,[0,1],[2.5,4.0])|     18.35|22.964769707597046|
|(6,[0,1],[2.72,2.0])|     13.28|17.394512565696616|
| (6,[0,1],[3.0,2.0])|      14.0|18.245717330329793|
| (6,[0,1],[3.0,4.0])|     20.45|24.484778215870577|
|(6,[0,1],[7.58,4.0])|     39.42|38.408056151656105|
|[1.25,2.0,1.0,0.0...|      8.51|10.143538281870102|
|[1.5,2.0,0.0,0.0,...|     12.46|12.791265621999216|
|[1.5,2.0,1.0,0.0,...|     26.41|12.524452738921102|
|[1.5,2.0,1.0,0.0,...|     11.17|10.903542536006867|
|[1.56,2.0,0.0,0.0...|      9.94|13.569950765332035|
|[1.57,2.0,0.0,0.0...|     15.42|13.600350935497506|
|[1.66,3.0,0.0,0.0...|     10.34| 16.993482909

In [12]:
# Performance metrics
results.r2, results.meanAbsoluteError, results.meanSquaredError

(0.6287647835874317, 4.129968253846933, 33.53216533772697)

In [19]:
# results.predictions.write.mode('overwrite').save('./bill_regression_results')

# Save prediction results to .csv
results.predictions.toPandas().to_csv('bill_regression_results.csv')

In [22]:
# Save linear regression model
regressor.save('bill_regressor.spark')