# Machine Learning Pipelines

At the core of the `pyspark.ml` module are the `Transformer` and `Estimator` classes. Almost every other class in the module behaves similarly to these two basic classes.

`Transformer` classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

`Estimator` classes all implement a `.fit()` method. These methods also take a DataFrame, but instead of returning another `DataFrame` they return a `model object`. This can be something like a StringIndexerModel for including categorical data saved as strings in your models, or a RandomForestModel that uses the random forest algorithm for classification or regression.

- First, rename the year column of planes to plane_year to avoid duplicate column names.
- Create a new DataFrame called model_data by joining the flights table with planes using the tailnum column as the key.

In [1]:
# # Rename year column
# planes = planes.withColumnRenamed("year","plane_year")

# # Join the DataFrames
# model_data = flights.join(planes, on="tailnum", how="leftouter")

# Data Types

It's important to know that Spark only handles numeric data.Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.

To remedy this, you can use the .cast() method in combination with the .withColumn() method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames.

The only argument you need to pass to .cast() is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer" and for decimal numbers you'll use "double".

You can put this call to .cast() inside a call to .withColumn() to overwrite the already existing column

- Spark needs numeric values (doubles or integers) to do machine learning.

- Use the method .withColumn() to .cast() the following columns to type "integer". Access the columns using the df.col notation:
- model_data.arr_delay
- model_data.air_time
- model_data.month
- model_data.plane_year

In [2]:
# # Cast the columns to integers
# model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
# model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
# model_data = model_data.withColumn("month", model_data.month.cast("integer"))
# model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

- Create the column plane_age using the .withColumn() method and subtracting the year of manufacture (column plane_year) from the year (column year) of the flight.

In [3]:
# # Create the column plane_age
# model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

- Use the .withColumn() method to create the column is_late. This column is equal to model_data.arr_delay > 0.
- Convert this column to an integer column so that you can use it in your model and name it label (this is the default name for the response variable in Spark's machine learning routines).
- Filter out missing values (this has been done for you).

In [4]:
# # Create is_late
# model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# # Convert to an integer
# model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# # Remove missing values
# model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

# Strings and factors

Spark requires numeric data for modeling. But There are codes as strings and there isn't any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this task. One-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

The first step to encoding your categorical feature is to create a StringIndexer. Members of this class are Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder. This works exactly the same way as the StringIndexer by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!


Why do you have to encode a categorical feature as a one-hot vector?
- Spark can only model numeric features.

- Create a StringIndexer called carr_indexer by calling StringIndexer() with inputCol="carrier" and outputCol="carrier_index".
- Create a OneHotEncoder called carr_encoder by calling OneHotEncoder() with inputCol="carrier_index" and outputCol="carrier_fact".

In [5]:
# # Create a StringIndexer
# carr_indexer = StringIndexer(inputCol="carrier",outputCol="carrier_index")

# # Create a OneHotEncoder
# carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")

- Create a StringIndexer called dest_indexer by calling StringIndexer() with inputCol="dest" and outputCol="dest_index".
- Create a OneHotEncoder called dest_encoder by calling OneHotEncoder() with inputCol="dest_index" and outputCol="dest_fact".

In [7]:
# # Create a StringIndexer
# dest_indexer = StringIndexer(inputCol="dest" , outputCol="dest_index")

# # Create a OneHotEncoder
# dest_encoder = OneHotEncoder(inputCol="dest_index" , outputCol="dest_fact")

- Create a VectorAssembler by calling VectorAssembler() with the inputCols names as a list and the outputCol name "features".
- The list of columns should be ["month", "air_time", "carrier_fact", "dest_fact", "plane_age"]

In [8]:
# # Make a VectorAssembler
# vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

- Import Pipeline from pyspark.ml.
- Call the Pipeline() constructor with the keyword argument stages to create a Pipeline called flights_pipe.
- stages should be a list holding all the stages you want your data to go through in the pipeline. Here this is just: [dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]

In [9]:
# # Import Pipeline
# from pyspark.ml import Pipeline

# # Make the pipeline
# flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# Test vs. Train

After you've cleaned your data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, don't touch your test data until you think you have a good model! As you're building models and forming hypotheses, you can test them on your training data to get an idea of their performance.

Once you've got your favorite model, you can see how well it predicts the new data in your test set. This never-before-seen data will give you a much more realistic idea of your model's performance in the real world when you're trying to predict or classify new data.

In Spark it's important to make sure you split the data after all the transformations. This is because operations like StringIndexer don't always produce the same index even when given the same list of strings.


Why is it important to use a test set in model evaluation?
- By evaluating your model with a test set you can get a good idea of performance on new data.

- Create the DataFrame piped_data by calling the Pipeline methods .fit() and .transform() in a chain. Both of these methods take model_data as their only argument.

In [10]:
# # Fit and transform the data
# piped_data = flights_pipe.fit(model_data).transform(model_data)

- Use the DataFrame method .randomSplit() to split piped_data into two pieces, training with 60% of the data, and test with 40% of the data by passing the list [.6, .4] to the .randomSplit() method.

In [11]:
# # Split the data into training and test sets
# training, test = piped_data.randomSplit([.6, .4])