In [0]:
data = '/FileStore/tables/tips.csv'

In [0]:
import pyspark 
from pyspark.sql import SparkSession 
#SparkSession is now the entry point of Spark 
#SparkSession can also be construed as gateway to spark libraries 
  
#create instance of spark class 
spark=SparkSession.builder.appName('tips_regression_model').getOrCreate() 

df = spark.read.csv(data, header = True, inferSchema=True)
df.display()

total_bill,tip,sex,smoker,day,time,size
16.99,1.01,Female,No,Sun,Dinner,2
10.34,1.66,Male,No,Sun,Dinner,3
21.01,3.5,Male,No,Sun,Dinner,3
23.68,3.31,Male,No,Sun,Dinner,2
24.59,3.61,Female,No,Sun,Dinner,4
25.29,4.71,Male,No,Sun,Dinner,4
8.77,2.0,Male,No,Sun,Dinner,2
26.88,3.12,Male,No,Sun,Dinner,4
15.04,1.96,Male,No,Sun,Dinner,2
14.78,3.23,Male,No,Sun,Dinner,2


In [0]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [0]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols = ['sex','smoker','day','time'], outputCols = ['sex_indexed','smoker_indexed','day_indexed','time_indexed'])
df1 = indexer.fit(df).transform(df)
df1.display()


total_bill,tip,sex,smoker,day,time,size,sex_indexed,smoker_indexed,day_indexed,time_indexed
16.99,1.01,Female,No,Sun,Dinner,2,1.0,0.0,1.0,0.0
10.34,1.66,Male,No,Sun,Dinner,3,0.0,0.0,1.0,0.0
21.01,3.5,Male,No,Sun,Dinner,3,0.0,0.0,1.0,0.0
23.68,3.31,Male,No,Sun,Dinner,2,0.0,0.0,1.0,0.0
24.59,3.61,Female,No,Sun,Dinner,4,1.0,0.0,1.0,0.0
25.29,4.71,Male,No,Sun,Dinner,4,0.0,0.0,1.0,0.0
8.77,2.0,Male,No,Sun,Dinner,2,0.0,0.0,1.0,0.0
26.88,3.12,Male,No,Sun,Dinner,4,0.0,0.0,1.0,0.0
15.04,1.96,Male,No,Sun,Dinner,2,0.0,0.0,1.0,0.0
14.78,3.23,Male,No,Sun,Dinner,2,0.0,0.0,1.0,0.0


In [0]:
df1.columns

Out[13]: ['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed']

In [0]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols = ['tip','size','sex_indexed',
 'smoker_indexed','day_indexed', 'time_indexed'], outputCol = "Independent Features")



In [0]:
 output = feature_assembler.transform(df1)

In [0]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [0]:
finalized_data = output.select("Independent Features", "total_bill")
finalized_data.show()


+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [0]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol = 'Independent Features', labelCol = "total_bill")
regressor = regressor.fit(train_data)

In [0]:

regressor.coefficients

Out[22]: DenseVector([3.3339, 3.3873, -1.4003, 1.9765, -0.0861, -0.885])

In [0]:
regressor.intercept

Out[23]: 1.0131177560300106

In [0]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|12.688534976980426|
|(6,[0,1],[2.01,2.0])|     20.23|14.488837543166415|
|(6,[0,1],[2.31,3.0])|     18.69|18.876302419769573|
| (6,[0,1],[3.0,4.0])|     20.45| 24.56398581639595|
|(6,[0,1],[3.15,3.0])|     20.08|21.676773078281112|
| (6,[0,1],[3.6,3.0])|     24.06| 23.17702521676944|
| (6,[0,1],[5.0,3.0])|     31.27|27.844476314288674|
|(6,[0,1],[7.58,4.0])|     39.42| 39.83321869256602|
|[1.25,2.0,1.0,0.0...|      8.51|  9.49757712847054|
|[1.44,2.0,0.0,1.0...|      7.74|14.565061736135716|
|[1.5,2.0,0.0,0.0,...|     19.08|11.731329802814487|
|[1.5,2.0,0.0,0.0,...|     12.46|12.530256571092288|
|[1.5,2.0,0.0,1.0,...|     15.69|14.678996949560597|
|[1.5,2.0,1.0,0.0,...|      8.35|10.331050538741833|
|[1.5,2.0,1.0,0.0,...|     11.17|10.331050538741833|
|[1.57,2.0,0.0,0.0...|     15.42|12.9358259360

In [0]:
### PErformance Metrics
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

Out[25]: (0.4872235624387542, 4.210143785580853, 38.42361102931222)

In [0]:
print("RMSE: {}".format(pred_results.rootMeanSquaredError)) 
print("MSE: {}".format(pred_results.meanSquaredError))

RMSE: 6.198678167909044
MSE: 38.42361102931222
