In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Linear Regression').getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.csv('tips.csv',header=True,inferSchema=True)
df.head(3)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3)]

In [5]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [6]:
# Handling categorical features
from pyspark.ml.feature import StringIndexer

In [7]:
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [8]:
indexer = StringIndexer(inputCol="sex",outputCol="sex_indexer") # this is for single column
df_r = indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexer|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [9]:
#let do for multiple columns
indexer = StringIndexer(inputCols=["smoker","day","time"],outputCols=["smoker_indexer","day_indexer","time_indexer"])
df_r = indexer.fit(df_r).transform(df_r)
df_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexer|smoker_indexer|day_indexer|time_indexer|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [10]:
df_r.columns

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_indexer',
 'smoker_indexer',
 'day_indexer',
 'time_indexer']

In [11]:
# Then we need to group the dependent and independent features togather.
# With the help of Vector assembler we can achive that, this is available in pyspark

from pyspark.ml.feature import VectorAssembler
outputassemble = VectorAssembler(inputCols=['tip','size','sex_indexer','smoker_indexer','day_indexer','time_indexer'],outputCol="independent_feature")
output = outputassemble.transform(df_r)

# Here we need to give the column name in correct order for inputcols
# Then all that are grouped are putted in outputcol -> independent_feature

In [12]:
output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexer|smoker_indexer|day_indexer|time_indexer| independent_feature|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|S

In [20]:
output.select('independent_feature').show() # this is we want. the grouped data are in form of UDT.

+--------------------+
| independent_feature|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [17]:
# So now get the finalized data of independent and dependent   
finalize_data = output.select('independent_feature','total_bill')
finalize_data.show()

+--------------------+----------+
| independent_feature|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [18]:
# apply Linear Regression
from pyspark.ml.regression import LinearRegression

In [19]:
# train and test split
train_data,test_data = finalize_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol='independent_feature',labelCol='total_bill')
regressor = regressor.fit(train_data)

In [21]:
regressor.coefficients

DenseVector([3.2972, 2.8469, -1.4536, 1.8463, -0.6497, -0.3606])

In [22]:
regressor.intercept

3.3555736098200093

In [23]:
# evaluate the prediction
pred_result = regressor.evaluate(test_data)

In [25]:
# let see the result
pred_result.predictions.show()
# this total_bill vs prediction_total_bill

+--------------------+----------+------------------+
| independent_feature|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[2.24,3.0])|     16.04|19.281893235878833|
|(6,[0,1],[2.31,3.0])|     18.69|19.512694842285054|
| (6,[0,1],[2.5,4.0])|     18.35|22.986045752407563|
|(6,[0,1],[2.72,2.0])|     13.28|18.017643415644454|
| (6,[0,1],[3.0,2.0])|      14.0| 18.94084984126934|
|(6,[0,1],[3.18,2.0])|     19.82|19.534339686313913|
|(6,[0,1],[3.39,2.0])|     11.61| 20.22674450553258|
|(6,[0,1],[3.76,2.0])|     18.24|21.446695853679753|
|(6,[0,1],[4.08,2.0])|     17.92|22.501788911536767|
|[1.17,2.0,0.0,1.0...|     32.83|14.753320766743915|
|[1.25,2.0,1.0,0.0...|      8.51|10.057099849064226|
|[1.44,2.0,0.0,0.0...|      7.56|12.137194167297922|
|[1.44,2.0,0.0,1.0...|      7.74| 15.64355553431077|
|[1.64,2.0,0.0,1.0...|     15.36|16.302988695471406|
|[1.66,3.0,0.0,0.0...|     10.34| 16.71979927927025|
|[1.71,2.0,0.0,0.0...|     10.27|14.0377681625

In [26]:
# performance matrics
pred_result.r2,pred_result.meanAbsoluteError,pred_result.meanSquaredError

(0.5450669785881137, 4.687339741293766, 43.12042819496724)