In [None]:
# File location and type
file_location = "/FileStore/tables/tips-1.csv"
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.csv(file_location, header = True, inferSchema = True)

In [None]:
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [None]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [None]:
df.columns

Out[4]: ['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [None]:
## Handling Categorical Features
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols= ['sex', 'smoker', 'day', 'time'], outputCols= ['sex_indexed', 'smoker_indexed', 'day_indexed', 'time_indexed'])
df_r = indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [None]:
df_r.columns

Out[11]: ['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed']

In [None]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols =['tip', 'size', 'sex_indexed', 'smoker_indexed', 'day_indexed', 'time_indexed'], outputCol = 'Independent Features')
output = featureassembler.transform(df_r)

In [None]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [None]:
final_data = output.select('Independent Features', 'total_bill')

In [None]:
final_data.show()

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [None]:
from pyspark.ml.regression import LinearRegression
# train test split
train_data, test_data = final_data.randomSplit([0.70, 0.30])
regressor = LinearRegression(featuresCol = 'Independent Features', labelCol = 'total_bill')
regressor = regressor.fit(train_data)

In [None]:
regressor.coefficients

Out[17]: DenseVector([3.0235, 3.7654, -1.1066, 2.2053, -0.2249, -1.1792])

In [None]:
regressor.intercept

Out[18]: 1.25642176762118

In [None]:
#Predictions
pred_results = regressor.evaluate(test_data)

In [None]:
#Actual Data vs Predicted Data
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.07| 12.56659942267595|
|(6,[0,1],[1.45,2.0])|      9.55|13.171293294965556|
|(6,[0,1],[2.34,4.0])|     17.81|23.393021979899036|
| (6,[0,1],[2.5,4.0])|     18.35| 23.87677707773072|
|(6,[0,1],[3.15,3.0])|     20.08|22.076611686049574|
|(6,[0,1],[3.18,2.0])|     19.82| 18.40189529027065|
|(6,[0,1],[3.35,3.0])|     20.65| 22.68130555833918|
| (6,[0,1],[3.6,3.0])|     24.06| 23.43717289870119|
|(6,[0,1],[7.58,4.0])|     39.42| 39.23600143388672|
|[1.5,2.0,1.0,0.0,...|      8.35|10.586868779136037|
|[1.63,2.0,1.0,0.0...|     11.87| 10.97991979612428|
|[1.71,2.0,0.0,0.0...|     10.27|13.732482716588482|
|[1.73,2.0,0.0,0.0...|      9.78|12.388818750548545|
|[1.92,1.0,0.0,1.0...|      8.58| 11.17829445390769|
|[2.0,2.0,0.0,0.0,...|      8.77| 14.60928883140841|
|[2.0,2.0,0.0,0.0,...|     13.13| 14.609288831

In [None]:
#Performance Metrics
pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError

Out[21]: (0.5474622139239447, 3.6930651582418146, 25.708378393048324)

In [None]:
#Saving the model using Pickle 
   
#import pickle
#with open('model_pickle', 'wb') as f:
#    pickle.dump(model, f)

#Saving the model using Joblib 
#import joblib
#joblib.dump(model, "model_joblib.pkl")