<a href="https://colab.research.google.com/github/vnlvih/Estudos-PySpark/blob/main/04_Ensembles_%26_Pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
!pip install pyspark



In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [25]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# **Pipeline**
A pipeline consists of a series of operations.
You could apply each operation individually... or you could just apply the pipeline!

In [26]:
## Data Preparation ##

schema = StructType([
                     StructField("mon", IntegerType()),
                     StructField("dom", IntegerType()),
                     StructField("dow", IntegerType()),
                     StructField("carrier", StringType()),
                     StructField("flight", StringType()),
                     StructField("org", StringType()),
                     StructField("mile", DoubleType()),
                     StructField("depart", DoubleType()),
                     StructField("duration", DoubleType()),
                     StructField("delay", DoubleType()),
])

flights = spark.read.csv("/content/drive/MyDrive/Data Science & Afins/DATACAMP/03. Big Data with PySpark/05. Machine Learning with PySpark/00. DataSets/flights.csv", header=True, schema=schema)
flights.show(5)

+---+---+---+-------+------+---+------+------+--------+-----+
|mon|dom|dow|carrier|flight|org|  mile|depart|duration|delay|
+---+---+---+-------+------+---+------+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153.0|  9.48|   351.0| null|
|  0| 22|  2|     UA|  1107|ORD| 316.0| 16.33|    82.0| 30.0|
|  2| 20|  4|     UA|   226|SFO| 337.0|  6.17|    82.0| -8.0|
|  9| 13|  1|     AA|   419|ORD|1236.0| 10.33|   195.0| -5.0|
|  4|  2|  5|     AA|   325|ORD| 258.0|  8.92|    65.0| null|
+---+---+---+-------+------+---+------+------+--------+-----+
only showing top 5 rows



In [27]:
## Filtering out missing data

#Drop records with missing values in the delay column
flights = flights.filter("delay IS NOT NULL")

"""
- Derive a new km column from the mile column,
  rounding to zero decimal places. One mile is 1.60934 km.
  Remove the mile column.
"""

# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column
flights = flights.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

In [28]:
flights.show(5)

+---+---+---+-------+------+---+------+--------+-----+------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|    km|
+---+---+---+-------+------+---+------+--------+-----+------+
|  0| 22|  2|     UA|  1107|ORD| 16.33|    82.0| 30.0| 509.0|
|  2| 20|  4|     UA|   226|SFO|  6.17|    82.0| -8.0| 542.0|
|  9| 13|  1|     AA|   419|ORD| 10.33|   195.0| -5.0|1989.0|
|  5|  2|  1|     UA|   704|SFO|  7.98|   102.0|  2.0| 885.0|
|  7|  2|  6|     AA|   380|ORD| 10.83|   135.0| 54.0|1180.0|
+---+---+---+-------+------+---+------+--------+-----+------+
only showing top 5 rows



## Categorical columns

In the flights data there are two columns, carrier and org, which hold categorical data. You need to transform those columns into indexed numerical values.

In [29]:
from pyspark.ml.feature import StringIndexer

"""
Create an indexer to convert the 'org' column into an indexed column called 'org_idx'.
"""

indexer = StringIndexer(inputCol="org", outputCol='org_idx').fit(flights)

## One-hot encoding


In [30]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder
"""
Create a one-hot encoder to convert the 'org_idx' and 'dow'
 columns into dummy variable columns called 'org_dummy' and 'dow_dummy'.

"""
# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=["org_idx","dow"], outputCols=["org_dummy","dow_dummy"])

## Assembling columns

In [31]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

"""
Create an assembler which will combine the 'km' column with the 
two dummy variable columns. The output column should be called 'features'.
"""

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    "km","org_dummy","dow_dummy"
], outputCol='features')

In [32]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

"""
Create a linear regression object to predict flight duration.
"""

# Create a regression object and train on training data
regression = LinearRegression(labelCol = "duration")

In [33]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights.randomSplit([.8,.2], seed=17)

- Import the class for creating a pipeline.
- Create a pipeline object and specify the indexer, onehot, assembler and - regression stages, in this order.
- Train the pipeline on the training data.
- Make predictions on the testing data.

In [34]:
# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline.transform(flights_test)

### **SMS spam pipeline**
You haven't looked at the SMS data for quite a while. Last time we did the following:

- split the text into tokens
- removed stop words
- applied the hashing trick
- converted the data from counts to IDF and
- trained a logistic regression model.
Each of these steps was done independently. This seems like a great application for a pipeline!

The Pipeline and LogisticRegression classes have already been imported into the session, so you don't need to worry about that!

In [35]:
# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data Science & Afins/DATACAMP/03. Big Data with PySpark/05. Machine Learning with PySpark/00. DataSets/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [36]:
sms.show(5)

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
+---+--------------------+-----+
only showing top 5 rows



In [37]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

In [38]:
# Break text into tokens at non-word characters
"""
Create an object for splitting text into tokens.

"""

tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words

"""
Create an object to remove stop words. 
Rather than explicitly giving the input column name, use the getOutputCol() 
method on the previous object.
"""
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF

"""
Create objects for applying the hashing trick and 
transforming the data into a TF-IDF. 
Use the getOutputCol() method again.
"""

hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline

"""
Create a pipeline which wraps all of the above 
steps as well as an object to create a Logistic Regression model.
"""
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

## **Cross validating simple flight duration model**

You've already built a few models for predicting flight duration and evaluated them with a simple train/test split. However, cross-validation provides a much better way to evaluate model performance.

In this exercise you're going to train a simple model for flight duration using cross-validation. Travel time is usually strongly correlated with distance, so using the km column alone should give a decent model.

In [39]:
# Create an assembler object
assembler = VectorAssembler(inputCols=[
    "km"
], outputCol='features')

# Consolidate predictor columns
flights_train = assembler.transform(flights_train)
flights_test = assembler.transform(flights_test)

In [40]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol = "duration")
evaluator = RegressionEvaluator(labelCol="duration")

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

# NOTE: Since cross-valdiation builds multiple models, the fit() method can take a little while to complete.

## **Cross validating flight duration model pipeline**
The cross-validated model that you just built was simple, using km alone to predict duration.

Another important predictor of flight duration is the origin airport. Flights generally take longer to get into the air from busy airports. Let's see if adding this predictor improves the model!

In this exercise you'll add the org field to the model. However, since org is categorical, there's more work to be done before it can be included: it must first be transformed to an index and then one-hot encoded before being assembled with km and used to build the regression model. We'll wrap these operations up in a pipeline.

The following objects have already been created:

- params — an empty parameter grid
- evaluator — a regression evaluator
- regression — a LinearRegression object with labelCol='duration'.
- The StringIndexer, OneHotEncoderEstimator, VectorAssembler and PipelineCrossValidator classes have already been imported.

In [41]:
#Create an indexer for the org field
indexer = StringIndexer(inputCol="org", outputCol='org_idx')

#Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=["org_idx"], outputCols=["org_dummy"])

#Create an assembler object
assembler = VectorAssembler(inputCols=[
    "km","org_dummy"
], outputCol='features')

#Create an empty parameter grid
params = ParamGridBuilder().build()

#Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol = "duration")
evaluator = RegressionEvaluator(labelCol="duration")

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator)

## **Grid Search**

### **Optimizing flights linear regression**
Up until now you've been using the default hyper-parameters when building your models. In this exercise you'll use cross validation to choose an optimal (or close to optimal) set of model hyper-parameters.

The following have already been created:

- regression — a LinearRegression object
- pipeline — a pipeline with string indexer, one-hot encoder, vector assembler and linear regression and
- evaluator — a RegressionEvaluator object.

In [42]:
# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01,0.1,1.0,10.0]) \
               .addGrid(regression.elasticNetParam, [0.0,0.5,1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

Number of models to be tested:  12


## **Dissecting the best flight duration model**
You just set up a CrossValidator to find good parameters for the linear regression model predicting flight duration.

The model pipeline has multiple stages (objects of type StringIndexer, OneHotEncoderEstimator, VectorAssembler and LinearRegression), which operate in sequence. The stages are available as the stages attribute on the pipeline object. They are represented by a list and the stages are executed in the sequence in which they appear in the list.

Now you're going to take a closer look at the pipeline, split out the stages and use it to make predictions on the testing data.

The following objects have already been created:

- cv — a trained CrossValidatorModel object and
- evaluator — a RegressionEvaluator object.


The flights data have been randomly split into flights_train and flights_test.

In [43]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights.randomSplit([.8,.2], seed=17)

In [48]:
# Get the best model from cross validation

#Fit de model

cv = cv.setSeed(13).fit(flights_train)

print(cv.avgMetrics)

"""
Retrieve the best model.
"""
best_model = cv.bestModel 


# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
"""
Isolate the linear regression stage and extract its parameters.
"""
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
"""
Use the best model to generate predictions on the testing data and calculate the RMSE.
"""
predictions = best_model.transform(flights_test)
print("RMSE =", evaluator.evaluate(predictions))

[StringIndexerModel: uid=StringIndexer_43b4b64e4bf3, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_8e4b5240feae, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1, VectorAssembler_6013c13ea5de, LinearRegressionModel: uid=LinearRegression_60cc1261f65a, numFeatures=8]
RMSE = 11.075754750996959


### SMS spam optimized
The pipeline you built earlier for the SMS spam model used the default parameters for all of the elements in the pipeline. It's very unlikely that these parameters will give a particularly good model though.

In this exercise you'll set up a parameter grid which can be used with cross validation to choose a good set of parameters for the SMS spam classifier.

In [50]:
# Create parameter grid
params = ParamGridBuilder()

# Add grid for hasing trick parameters
"""
Add grid points for numFeatures and binary parameters to the HashingTF object, 
giving values 1024, 4096 and 16384, and True and False, respectively.
"""

params = params.addGrid(hasher.numFeatures, (1024,4096,16384))\
              .addGrid(hasher.binary, (True, False))

# Add grid for logistic regression parameters
"""
Add grid points for regParam and elasticNetParam parameters to 
the LogisticRegression object, giving values of 0.01, 0.1, 1.0 
and 10.0, and 0.0, 0.5, and 1.0 respectively.
"""
params = params.addGrid(logistic.regParam, (0.01, 0.1, 1.0, 10.0))\
               .addGrid(logistic.elasticNetParam,(0.0,0.5,1.0))

#Buil parameter grid
params = params.build()

print('Number of models to be tested: ', len(params))

Number of models to be tested:  72


## Ensemble
- collection of models
- **Wisdom of the Crowd** - collective opinion of a group better than that of a single expert
- Random Forest
    - an ensemble of Decision Tree
    - Creating model diversity
        - each tree trained on random subset of data
        - random subset of features used for splitting at each node
    - No two trees in the forest should be the same
- Gradient-Boosted Trees
    - Iterative boosting algorithm:
        1. Build a Decision Tree and add to ensemble
        2. Predict label for each training instance using ensemble
        3. Compare predictions with known labels
        4. Emphasize training instances with incorrect predictions
        5. return to 1.

### Delayed flights with Gradient-Boosted Trees
You've previously built a classifier for flights likely to be delayed using a Decision Tree. In this exercise you'll compare a Decision Tree model to a Gradient-Boosted Trees model.

In [54]:
assembler = VectorAssembler(inputCols=["mon","depart","duration"], outputCol="features")
flights = assembler.transform(flights.drop("features"))
flights = flights.withColumn("label", (flights.delay >= 15).cast("integer"))
flights = flights.select("mon","depart","duration","features","label")
flights = flights.dropna()

flights.show(5)

+---+------+--------+-----------------+-----+
|mon|depart|duration|         features|label|
+---+------+--------+-----------------+-----+
|  0| 16.33|    82.0| [0.0,16.33,82.0]|    1|
|  2|  6.17|    82.0|  [2.0,6.17,82.0]|    0|
|  9| 10.33|   195.0|[9.0,10.33,195.0]|    0|
|  5|  7.98|   102.0| [5.0,7.98,102.0]|    0|
|  7| 10.83|   135.0|[7.0,10.83,135.0]|    1|
+---+------+--------+-----------------+-----+
only showing top 5 rows



In [55]:
"""
Import the classes required to create Decision Tree and Gradient-Boosted Tree classifiers.
"""
#Import the classes required
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pprint import pprint

flights_train, flights_test = flights.randomSplit([.8,.2])


"""
Create Decision Tree and Gradient-Boosted Tree classifiers. Train on the training data.
"""
#Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

"""
Create an evaluator and calculate AUC on testing data for both classifiers. Which model performs better?
"""
# Compare AUC on test data
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(tree.transform(flights_test))
evaluator.evaluate(gbt.transform(flights_test))

"""
For the Gradient-Boosted Tree classifier print the number of trees and the relative importance of features.
"""
#Find the number of trees and the relative importance of featuers
pprint(gbt.trees)
#print(len(gbt.trees))
print(gbt.featureImportances)

[DecisionTreeRegressionModel: uid=dtr_0ef27271b109, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_013ab6c475b9, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_b2ee00b5ffe4, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_dc5192677f5a, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_9ff8a9835383, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_e5b6caa4ad18, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_639b802e98d0, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_2d41911bdc9a, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_2bbb623d1380, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_a5ded268c70a, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_2a2f8eb0c70d, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressi

### Delayed flights with a Random Forest
In this exercise you'll bring together cross validation and ensemble methods. You'll be training a Random Forest classifier to predict delayed flights, using cross validation to choose the best values for model parameters.

You'll find good values for the following parameters:

- `featureSubsetStrategy` — the number of features to consider for splitting at each node and
- `maxDepth` — the maximum number of splits along any branch.

***Unfortunately building this model takes too long, so we won't be running the .fit() method on the pipeline.***

In [58]:
from pyspark.ml.classification import RandomForestClassifier

#Create a random forest classifier
forest = RandomForestClassifier()

"""
Create a parameter grid builder object. Add grid points for the featuresSubsetStrategy and maxDepth parameters.
"""
params = ParamGridBuilder()\
        .addGrid(forest.featureSubsetStrategy, ["all","onethird","sqrt","log2"])\
        .addGrid(forest.maxDepth, [2,5,10])\
        .build()

#Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

"""
Create a cross-validator object, specifying the estimator, parameter grid and evaluator. Choose 5-fold cross validation
"""
cv = CrossValidator(estimator=forest, estimatorParamMaps=params,
                    evaluator=evaluator, numFolds=5)

### Evaluating Random Forest
In this final exercise you'll be evaluating the results of cross-validation on a Random Forest model.

In [61]:
#Fit de model
cvModel = cv.fit(flights_train)

In [66]:
import numpy as np

<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>
<class 'float'>


In [69]:
# Average AUC for each parameter combination in grid
avg_auc = cvModel.avgMetrics

# Average AUC for the best model
best_model_auc = np.max(avg_auc)

# What's the optimal paramter value?
opt_max_depth = cvModel.bestModel.explainParam('maxDepth')
opt_feat_substrat = cvModel.bestModel.explainParam('featureSubsetStrategy')

# AUC for best model on test data
best_auc = evaluator.evaluate(cvModel.transform(flights_test))
print(best_auc)

0.6797834961822065
