In [1]:
import pyspark

In [2]:
import numpy as np

In [3]:
import pandas as pd



In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [6]:
flights= spark.read.csv('../Datasets/flights.csv', header=True, inferSchema=True, nullValue='NA')

In [7]:
flights= flights.dropna()

In [8]:
from pyspark.ml import Pipeline

In [9]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [10]:
from pyspark.ml.regression import LinearRegression

In [11]:
flights.show(2)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 2 rows



In [12]:
flights= flights.withColumn('duration', flights.arr_time - flights.dep_time)

In [13]:
flights_train, flights_test = flights.randomSplit([0.8,0.2])

In [14]:
from pyspark.ml.feature import Bucketizer

In [15]:
buckets = Bucketizer(splits= [0,300,600,900,1200,1500,1800,2100,2400], inputCol='dep_time', outputCol='dep_buckets')

In [16]:
indexer = StringIndexer(inputCols=['carrier', 'origin', 'dest'], outputCols=['carrier_idx', 'origin_idx', 'dest_idx'])

In [17]:
ohe = OneHotEncoder(inputCols=['carrier_idx', 'origin_idx', 'dest_idx', 'dep_buckets'], outputCols=['carrier_dum', 'origin_dum', 'dest_dum', 'dep_dum'])

In [18]:
vec = VectorAssembler(inputCols=['carrier_dum', 'origin_dum', 'dest_dum', 'dep_dum'], outputCol='features')

In [19]:
reg = LinearRegression(labelCol='duration', elasticNetParam=0, regParam=0.3)

In [20]:
pipeline= Pipeline(stages=[buckets, indexer, ohe, vec, reg])

In [21]:
pipeline = pipeline.fit(flights_train)

In [22]:
predictions_train = pipeline.transform(flights_train)

In [23]:
predictions_test = pipeline.transform(flights_test)

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
evaluator = RegressionEvaluator(labelCol='duration')

In [26]:
evaluator.evaluate(predictions_train)

322.63233716189836

In [27]:
evaluator.evaluate(predictions_test)

326.91956762533727

In [28]:
predictions_train.select('duration', 'prediction').show()

+--------+------------------+
|duration|        prediction|
+--------+------------------+
|     287| 356.6275394307031|
|     242|  317.974009513096|
|     233|350.14798256285576|
|     203|250.83578780999505|
|     790|297.16176336147896|
|     577|  415.000785817959|
|     217|325.73372148232943|
|     524|504.27511894422946|
|     587| 514.7129464539712|
|     518|477.70166063251577|
|     319|290.21766760468336|
|     271| 322.2243953137813|
|     139|234.39709279008275|
|     402| 312.6875414240783|
|     591| 359.4451378901922|
|      92| 537.9190281506415|
|     192|249.66001002536382|
|     233| 310.4056380232687|
|     779| 407.2388066171709|
|     193|227.46757295545876|
+--------+------------------+
only showing top 20 rows



In [29]:
regression = pipeline.stages[4]

In [30]:
regression

LinearRegressionModel: uid=LinearRegression_cad21c4d0d4e, numFeatures=86

In [31]:
regression.coefficients

DenseVector([22.968, -58.8595, -26.964, 23.4379, -38.0493, 122.6801, -34.3289, -110.6835, -57.6441, -42.8743, 10.4378, -234.1747, -232.8787, -125.2893, -225.119, -197.1508, -330.8921, -113.9517, -224.4783, -135.852, -267.9967, -129.0101, -133.7735, -200.7048, 40.8993, -60.8623, 6.8119, -258.2506, 28.1962, -185.1624, -289.1352, -182.2359, -260.503, -372.2523, -186.846, -230.5728, -19.1954, -1.1029, -6.9329, -197.6518, 131.0651, -80.3162, -314.1933, -402.2105, -443.4846, 132.847, 209.2955, 230.0224, 34.3368, -101.8134, -164.8622, -232.1643, -258.399, -242.9527, 8.6764, -280.64, -165.535, 61.4345, 325.6595, 284.2582, 49.8963, 288.3599, -5.8348, 61.7894, -36.1573, 150.833, -318.5249, -608.4833, 39.2999, 88.4508, 271.5047, 145.9919, 80.7117, 1275.2043, -313.0876, 571.2039, -420.6939, -279.3364, -179.8256, 1786.8216, 1690.1397, 1660.3565, 1664.6069, 1654.3556, 1585.6113, 1553.0856])

In [32]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [33]:
reg2 = LinearRegression(labelCol='duration')

In [34]:
pipeline2= Pipeline(stages=[buckets, indexer, ohe, vec, reg2])

In [35]:
params = ParamGridBuilder()

In [36]:
params = params.addGrid(reg2.fitIntercept, [True, False]).addGrid(reg2.elasticNetParam, [0,1]).addGrid(reg2.regParam, [0.1,0.25,0.35,0.5,0.75]).build()

In [37]:
cv = CrossValidator(estimator=pipeline2, estimatorParamMaps= params, evaluator=evaluator, numFolds=3)

In [54]:
scores = cv.fit(flights_train)

In [61]:
scores.avgMetrics

[328.756758848803,
 328.784000780241,
 328.79874190184194,
 328.82071188861937,
 328.86298693825404,
 328.80076526050937,
 328.77753979423966,
 328.78273787425917,
 328.7947480611527,
 328.8191979170631,
 329.56509411761033,
 329.5812080104038,
 329.619787123241,
 329.70670555011566,
 329.90338515651183,
 330.7015792035514,
 331.0961897255177,
 331.38780739867593,
 332.0293661820819,
 333.33018004707696]

In [43]:
best_pipeline =scores.bestModel

In [53]:
best_pipeline.stages[4].extractParamMap()

{Param(parent='LinearRegression_3152f90d9989', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_3152f90d9989', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_3152f90d9989', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_3152f90d9989', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LinearRegression_3152f90d9989', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_3152f90d9989', name='labelCol', doc='label column name.'): 'duration',
 Param(parent='LinearRegression_3152f90d9989', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredEr

In [57]:
pred_test = best_pipeline.transform(flights_test)

In [59]:
evaluator.evaluate(pred_test)

326.92127625291727