# SKLearn Pipelines

## Overview

SKLearn Pipelines allow you to simplify the final implementation of a model from data ingestion to inference into a neat package that is easily deployed. The first few cells largely replicate the 95% solution for BackBlaze and build a pipeline. The code for saving the pipeline is commented out. 

The more interesting code is at the end where we load a saved pipeline and use it with test data.

In [None]:
%%time
# Backblaze data
import numpy as np
import polars as pl
import glob

# Let's bring our accuracy function
def accuracy(y_hat, y):
    accuracy = 0
    correct = [y_hat[i] == y[i] for i in range(len(y_hat))]
    correct = np.array([1 if i else 0 for i in correct])
    accuracy = correct.sum()/len(y_hat)
    print(f'Overall accuracy: {accuracy * 100.0}%')
    return accuracy

    
# Get the list of files
csv_files = glob.glob('../data/Day 2/BackBlaze/data_Q4_2020/*.csv')

# Create the LazyFrame
lazy_df = pl.scan_csv(csv_files)

# Polars doesn't have a simple way to track categories, so we'll collect them and
# make some dictionaries for lookups.
models_df = lazy_df.select(pl.col('model')).cast(pl.Categorical).unique().collect(engine='streaming')
model_to_number = {model:index for index,model in enumerate(models_df['model'])}
number_to_model = {index:model for index,model in enumerate(models_df['model'])}

# Begin the query plan by excluding some features
# Manually reordering the columns so that the labels are first, making them easy to slice off later.
# This also makes the correlation matrix easier to work with... more on that soon.
lazy_df = lazy_df.select(pl.col('failure'), pl.col('model'), pl.col('^smart.*raw$'))

# Make the model categorical and then cast to an integer. Remove nulls
lazy_df = lazy_df.with_columns(pl.col("model").cast(pl.Categorical).cast(pl.UInt32))
lazy_df = lazy_df.with_columns(pl.all().cast(pl.Int64))
lazy_df = lazy_df.fill_null(-1)

# Add an index column to each row in a temporary LazyFrame named df_with_index
df_with_index = lazy_df.with_row_index(name="index")

# Collect 10 failed and ten not failed drives into two different materialized dataframes using .limit(10).
failed_df = df_with_index.filter(pl.col('failure') == 1).collect(engine='streaming').limit(10)
not_failed_df = df_with_index.filter(pl.col('failure') == 0).collect(engine='streaming').limit(10)

# Concatenate the two materialized dataframes into a dataframe named testing_df
test_df = pl.concat([failed_df, not_failed_df])

# Grab the indices of the values in the original data from this new dataframe
test_indices = test_df['index'].to_numpy()

# Create a new LazyFrame that adds to the current query plan, filtering out the test rows
train_df = df_with_index.filter(~pl.col("index").is_in(test_indices))

# Drop the 'index' column from the test_df. Add dropping the 'index' column to the train_df query plan
test_df = test_df.drop('index')
train_df = train_df.drop('index')

# Only perform the correlation if we have the memory for it
import psutil

memory = psutil.virtual_memory()
gigs_total = memory.total // 1024**3

# If the system has more than 16 gigabytes available, we can likely load the entire dataset.
# We manually set the columns rather than doing it dynamically to save RAM and time.
if gigs_total > 16:
    print(f'Since your system has {gigs_total:,} gigabytes total, we will calculate the correlation matrix live.')
    corr = lazy_df.collect(engine='streaming').corr()
    print("Generated correlation")
    features_to_preserve = (np.abs(corr[0].to_numpy()) > 0.005)[0]
    features_to_preserve[1] = True # preserve drive model
    # Use `collect_schema()` to perform a lazy scan rather than materializing everything
    relevant_columns = [col for i,col in enumerate(lazy_df.collect_schema().names()) if features_to_preserve[i]]
else:
    print("While your system meets the course requirements, this dataset is still too large. Preselecting relevant features...")
    relevant_columns = ['failure', 'model', 'smart_5_raw', 'smart_184_raw', 'smart_187_raw', 'smart_197_raw', 'smart_198_raw']
print(f'Relevant features: {relevant_columns}')

# Create the query plan for the training and testing dataframes
train_df = train_df.select(relevant_columns)
test_df = test_df.select(relevant_columns)

# Load the dataframes and report on memory usage
y_train = train_df.select('failure').collect(engine='streaming').to_numpy()
x_train = train_df.drop('failure').collect(engine='streaming').to_numpy()
x_test = test_df.drop('failure').to_numpy()
y_test = test_df.select('failure').to_numpy()
print(f'x_train is {x_train.nbytes:,} bytes in memory, y_train is {y_train.nbytes:,} bytes in memory.')
print(f'Shape: x_train = {x_train.shape}, y_train = {y_train.shape}, x_test = {x_test.shape}, y_test = {y_test.shape}')

# Reshape our label arrays:
y_train = y_train.reshape(-1)
y_test = y_test.reshape(-1)

# Let's also generate synthetic data separately. This time we will try to use all of it!
from imblearn.over_sampling import SMOTE

smote = SMOTE()
x_smote, y_smote = smote.fit_resample(x_train, y_train)
np.bincount(y_smote)

In [None]:
%%time
from sklearn.ensemble import GradientBoostingClassifier
gbc = GradientBoostingClassifier(n_estimators=200, subsample=1.0, 
                                 max_features='sqrt', verbose=1, learning_rate=0.003)
gbc.fit(x_smote, y_smote)
predictions = gbc.predict(x_test)
accuracy(y_test, predictions)

preprocess_data = FunctionTransformer(preprocess_data)
pipeline = pipeline.Pipeline(steps = [
    ('preprocess dataframe', preprocess_data),
    ('predictive model', gbc) ])

#joblib.dump(pipeline, 'filename.joblib')

# Explanation

The code above replicates the final solution from the RandomForest/GBC lab in section 3. The only addition to the data loading is code that extracts and builds a dictionary mapping of the drive models to numbers based on the categorical processing in Polars. Unfortunately, there isn't a very clean way to get Polars to do this directly, so this was just the most expedient solution. We generate the dictionary and then generate the data. The categories will be identical since we are building them from the same dataset.

After the pipeline is created, we save it with `joblib`. This has already been done. Please note that, if you are using a `FunctionTransformer` to handle things like preprocessing features (which we need to do here... we need to map the model name to a number and extract only the relevant features), you really need to have that function defined in a separate file that you are importing from. If you don't do this, pickle gets upset and doesn't know how to serialize it properly. The `preprocess_data()` function is in `pipeutils.py`.

You must also import this function (as done below) to load the pipeline later, but as you can see the pipeline does make the code very clean. It's also very easy to take this and use it with FastAPI or some other platform to serve models like this.

In [1]:
# To truly test this, you should kernel-restart and clear and then execute from here.
# Notice that when saving a pipeline that contains a FunctionTransformer, you really
# need to have that function (or class) imported from somewhere or you will end up
# fighting very strange errors about objects being redefined. It is also critical that
# your "environment" is the same, which in this case means we need to import that function
# here so that it is defined. Notice that we do not need to import GBC or anything else.

from sklearn import pipeline
from sklearn.preprocessing import FunctionTransformer
import joblib
import pandas as pd
from pipeutils import preprocess_data

model = joblib.load('GBC_backblaze_95.joblib')

In [2]:
test_data = pd.read_csv('drives.csv')
model.predict(test_data)



array([1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])