### Using pipelines

Instead of doing each step manually, we can also define a pipeline. This streamlines the process and makes it easy to inspect which steps the data has gone through.

Furthermore we will show how to use gridsearch.


In [2]:
# Create a pyspark session

from pyspark.sql import SparkSession


spark = SparkSession.builder.master('local[*]').appName('pipeline').getOrCreate()

In [3]:
# Split into training and testing sets in a 80:20 ratio
### Read the data
flights = spark.read.csv('./data/flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue="NA")

flights_train, flights_test = flights.randomSplit([0.8, 0.2], 17) ## 17 = seed

In [4]:
flights.show()

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
|  5|  2|  1|     UA|   704|SFO| 550|  7.98|     102|    2|
|  7|  2|  6|     AA|   380|ORD| 733| 10.83|     135|   54|
|  1| 16|  6|     UA|  1477|ORD|1440|   8.0|     232|   -7|
|  1| 22|  5|     UA|   620|SJC|1829|  7.98|     250|  -13|
| 11|  8|  1|     OO|  5590|SFO| 158|  7.77|      60|   88|
|  4| 26|  1|     AA|  1144|SFO|1464| 13.25|     210|  -10|
|  4| 25|  0|     AA|   321|ORD| 978| 13.75|     160|   31|
|  8| 30|  2|     UA|   646|ORD| 719| 13.28|     151|   16|
|  3| 16|  3|     UA|   107|ORD|1745|   

In [5]:
### First we show that we can create a custom transfomer
# We already have seen some transformers, such as StringIndexer, OneHotEncoder and VectorAssembler.
# But we can create our own!

from pyspark.ml.pipeline import Transformer
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

class MileToKm(Transformer):

    def __init__(self, inputCol='mile', outputCol='km'):
        super(MileToKm, self).__init__()
        self.inputCol = inputCol #the name of your columns
        self.outputCol = outputCol #the name of your output column

    def check_input_type(self, schema):
        field = schema[self.inputCol]
        #assert that field is a datetype 
        if (field.dataType != IntegerType()):
            raise Exception('MileToKm input type %s did not match input type IntType' % field.dataType)
            
    ## Need to define this function. (Can also define a fit function)
    def _transform(self, df):
        self.check_input_type(df.schema)
        return df.withColumn(self.outputCol, F.round(df[self.inputCol] * 1.60934, 0) )

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol='org_idx')
km_transform = MileToKm()
# One-hot encode index values
onehot = OneHotEncoder(
    inputCols=['org_idx', 'dow'],
    outputCols=['org_one_hot', 'dow_one_hot']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km', 'org_one_hot', 'dow_one_hot'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

In [7]:
# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline_regression = Pipeline(stages=[km_transform, indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline_regression = pipeline_regression.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline_regression.transform(flights_test)
predictions.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+------+-------+-------------+-------------+--------------------+------------------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|    km|org_idx|  org_one_hot|  dow_one_hot|            features|        prediction|
+---+---+---+-------+------+---+----+------+--------+-----+------+-------+-------------+-------------+--------------------+------------------+
|  0|  1|  2|     AA|     3|JFK|2475|  12.0|     370|   11|3983.0|    2.0|(7,[2],[1.0])|(6,[2],[1.0])|(14,[0,3,10],[398...|364.38375267209113|
|  0|  1|  2|     AA|   254|OGG|2486| 15.33|     310|  173|4001.0|    7.0|    (7,[],[])|(6,[2],[1.0])|(14,[0,10],[4001....| 313.2461800798374|
|  0|  1|  2|     AA|   336|ORD| 733| 21.58|     115|   55|1180.0|    0.0|(7,[0],[1.0])|(6,[2],[1.0])|(14,[0,1,10],[118...| 131.7211766912025|
|  0|  1|  2|     AA|   678|SFO| 337| 16.25|      80|  139| 542.0|    1.0|(7,[1],[1.0])|(6,[2],[1.0])|(14,[0,2,10],[542...| 76.35027806659579|

In [8]:
# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator

# Calculate the RMSE on testing data
RegressionEvaluator(labelCol='duration').evaluate(predictions)

11.163418774419267

In [9]:
### Next we will use a cross validation and grid search to find the best parameters of our model

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Create an empty parameter grid
params = ParamGridBuilder()
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0])
params = params.build()


evaluator = RegressionEvaluator(labelCol='duration')

# Construct a pipeline
pipeline_regression = Pipeline(stages=[km_transform, indexer, onehot, assembler, regression])

cv = CrossValidator(estimator=pipeline_regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)




# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

In [12]:
#Create the predictions. cv.transform uses the best model it found
predictions = cv.transform(flights_test)
evaluator.evaluate(predictions)

11.163705452721718

In [13]:
predictions.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+------+-------+-------------+-------------+--------------------+------------------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|    km|org_idx|  org_one_hot|  dow_one_hot|            features|        prediction|
+---+---+---+-------+------+---+----+------+--------+-----+------+-------+-------------+-------------+--------------------+------------------+
|  0|  1|  2|     AA|     3|JFK|2475|  12.0|     370|   11|3983.0|    2.0|(7,[2],[1.0])|(6,[2],[1.0])|(14,[0,3,10],[398...|364.35938675547595|
|  0|  1|  2|     AA|   254|OGG|2486| 15.33|     310|  173|4001.0|    7.0|    (7,[],[])|(6,[2],[1.0])|(14,[0,10],[4001....|313.35805172817106|
|  0|  1|  2|     AA|   336|ORD| 733| 21.58|     115|   55|1180.0|    0.0|(7,[0],[1.0])|(6,[2],[1.0])|(14,[0,1,10],[118...|131.72115830804273|
|  0|  1|  2|     AA|   678|SFO| 337| 16.25|      80|  139| 542.0|    1.0|(7,[1],[1.0])|(6,[2],[1.0])|(14,[0,2,10],[542...|  76.3575801752061|

In [14]:
# We can inspect the best model and see which stages where used
print(cv.bestModel)
print(cv.bestModel.stages)

PipelineModel_3744c088514a
[MileToKm_b98e3249739c, StringIndexerModel: uid=StringIndexer_6918cc853d66, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_97d53c79f3bb, dropLast=true, handleInvalid=error, numInputCols=2, numOutputCols=2, VectorAssembler_86d2f4b508b2, LinearRegressionModel: uid=LinearRegression_98338991af29, numFeatures=14]


In [15]:
# Chceck which params where used
params = cv.bestModel.stages[-1].extractParamMap()

In [17]:
for key, value in params.items():
    print(key)
    print(value)

LinearRegression_98338991af29__aggregationDepth
2
LinearRegression_98338991af29__elasticNetParam
0.0
LinearRegression_98338991af29__featuresCol
features
LinearRegression_98338991af29__fitIntercept
True
LinearRegression_98338991af29__labelCol
duration
LinearRegression_98338991af29__predictionCol
prediction
LinearRegression_98338991af29__solver
auto
LinearRegression_98338991af29__standardization
True
LinearRegression_98338991af29__epsilon
1.35
LinearRegression_98338991af29__loss
squaredError
LinearRegression_98338991af29__maxIter
100
LinearRegression_98338991af29__regParam
0.01
LinearRegression_98338991af29__tol
1e-06


In [20]:
#We can again check the coefficients
cv.bestModel.stages[-1].coefficients

DenseVector([0.0743, 28.0131, 20.0642, 52.3391, 46.2005, 15.2115, 17.8247, 17.1911, 0.4594, 0.0933, -0.1281, 0.2236, 0.2601, 0.1102])