In [52]:
# import os
# os.environ['HADOOP_HOME'] = ''
# os.environ['hadoop.home.dir'] = ''

In [1]:
from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName('regression_model').config("spark.hadoop.fs.file.impl.disable.cache", "true").getOrCreate()
spark = SparkSession.builder.appName('regression_model').getOrCreate()

In [2]:
df = spark.read.csv('tips.csv',header=True,inferSchema=True) 
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [3]:
df.printSchema() 

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [4]:
df.columns 

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [6]:
### Handling categorical Features 
from pyspark.ml.feature import StringIndexer 
indexer=StringIndexer(inputCols=["sex","smoker","day","time"],outputCols=["sex_indexed","smoker_indexed","day_indexed"
 ,"time_indexed"]) 
df_r=indexer.fit(df).transform(df) 
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [7]:
df_r.columns 

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed']

In [8]:
from pyspark.ml.feature import VectorAssembler 
feature_assembler = VectorAssembler(inputCols=['tip','size','sex_indexed', 'smoker_indexed', 
'day_indexed', 'time_indexed'], 
                outputCol="Independent Features") 
output = feature_assembler.transform(df_r) 
output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|S

In [9]:
finalized_data =  output.select("Independent Features","total_bill") 
finalized_data.show() 

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [10]:
from pyspark.ml.regression import LinearRegression 
## train test split 
train_data,test_data =  finalized_data.randomSplit([0.75,0.25]) 
regressor = LinearRegression(featuresCol='Independent Features',labelCol='total_bill') 
regressor = regressor.fit(train_data)

In [11]:
regressor.coefficients

DenseVector([3.1696, 3.3242, -0.9086, 2.6954, -0.177, -0.8186])

In [12]:
regressor.intercept

1.2991524105901768

In [13]:
### Predicions 
pred_results = regressor.evaluate(test_data) 

In [14]:
## Final Comparison 
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.97,2.0])|     12.02|14.191768792942673|
| (6,[0,1],[2.0,2.0])|     13.37|14.286857754862398|
|(6,[0,1],[2.24,3.0])|     16.04|18.371790058365438|
|(6,[0,1],[2.34,4.0])|     17.81|22.012973872909747|
|(6,[0,1],[3.15,3.0])|     20.08| 21.25615523659714|
| (6,[0,1],[3.6,3.0])|     24.06|22.682489665393046|
|(6,[0,1],[4.08,2.0])|     17.92| 20.87969244796345|
| (6,[0,1],[5.0,3.0])|     31.27| 27.11997455498029|
|(6,[0,1],[5.92,3.0])|     29.03|30.036036053851905|
|(6,[0,1],[6.73,4.0])|     48.27| 35.92765863382974|
|[1.0,1.0,1.0,1.0,...|      3.07| 9.579789835598707|
|[1.01,2.0,1.0,0.0...|     16.99|10.063289187148879|
|[1.1,2.0,1.0,1.0,...|      12.9|13.220973650143018|
|[1.36,3.0,1.0,0.0...|     18.64|13.501219607305579|
|[1.5,2.0,0.0,0.0,...|     19.08|11.529340040341518|
|[1.5,2.0,0.0,0.0,...|     12.46|12.1709209064

In [15]:
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError 

(0.5256631126918461, 4.204333086458442, 34.84928498776763)

In [16]:
from pyspark.ml.linalg import Vectors 
input_vector = Vectors.dense([2.0, 2.5, 0.0, 0.0, 1.0, 0.0]) 
regressor.predict(input_vector)

15.771927786794908

In [21]:
#!pip install scikit-learn
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

regressor.write().overwrite().save('regressor_model')
model_in = PipelineModel.load('regressor_model')

ModuleNotFoundError: No module named 'distutils'