In [None]:
#| include: false
%load_ext autoreload
%autoreload 2

In [None]:
#| default_exp model_pipeline

## Overview

The functionality below uses the `NumerFrame`, `PreProcessor`, `Model` and `PostProcessor` objects to easily propagate
data, generate predictions and postprocess them in one go.

Specifically, this section introduces two objects:
1. `ModelPipeline`: Run all preprocessing, models and postprocessing that you define and return a `NumerFrame`.
2. `ModelPipelineCollection`: Manage and run multiple `ModelPipeline` objects.

In [None]:
#| include: false
from nbdev.showdoc import *

In [None]:
#| export
import uuid
import pandas as pd
from tqdm.auto import tqdm
from typeguard import typechecked
from typing import List, Union, Dict
from rich import print as rich_print

from numerblox.numerframe import NumerFrame, create_numerframe
from numerblox.preprocessing import BaseProcessor, CopyPreProcessor, GroupStatsPreProcessor, FeatureSelectionPreProcessor
from numerblox.model import BaseModel, ConstantModel, RandomModel
from numerblox.postprocessing import Standardizer, MeanEnsembler, FeatureNeutralizer

2023-01-05 14:39:25.029495: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-01-05 14:39:25.168902: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-01-05 14:39:25.168928: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-01-05 14:39:25.857640: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-

## 1. ModelPipeline

`ModelPipeline` handles all preprocessing, model prediction and postprocessing. It returns a `NumerFrame` with the preprocessed data, metadata and postprocessed prediction columns.

In [None]:
#| export
@typechecked
class ModelPipeline:
    """
    Execute all preprocessing, prediction and postprocessing for a given setup.

    :param models: Initiliazed numerai-blocks Models (Objects inheriting from BaseModel) \n
    :param preprocessors: List of initialized Preprocessors. \n
    :param postprocessors: List of initialized Postprocessors. \n
    :param copy_first: Whether to copy the NumerFrame as a first preprocessing step. \n
    Highly recommended in order to avoid surprise behaviour by manipulating the original dataset. \n
    :param pipeline_name: Unique name for pipeline. Only used for display purposes.
    """
    def __init__(self,
                 models: List[BaseModel],
                 preprocessors: List[BaseProcessor] = [],
                 postprocessors: List[BaseProcessor] = [],
                 copy_first = True,
                 standardize = True,
                 pipeline_name: str = None):
        self.pipeline_name = pipeline_name if pipeline_name else uuid.uuid4().hex
        self.models = models
        self.copy_first = copy_first
        self.standardize = standardize
        self.preprocessors = preprocessors
        self.postprocessors = postprocessors

    def preprocess(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """ Run all preprocessing steps. Copies input by default. """
        if self.copy_first:
            dataf = CopyPreProcessor()(dataf)
        for preprocessor in tqdm(self.preprocessors,
                                 desc=f"{self.pipeline_name} Preprocessing:",
                                 position=0):
            rich_print(f":construction: Applying preprocessing: '[bold]{preprocessor.__class__.__name__}[/bold]' :construction:")
            dataf = preprocessor(dataf)
        return NumerFrame(dataf)

    def postprocess(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """ Run all postprocessing steps. Standardizes model prediction by default. """
        if self.standardize:
            dataf = Standardizer()(dataf)
        for postprocessor in tqdm(self.postprocessors,
                                  desc=f"{self.pipeline_name} Postprocessing: ",
                                  position=0):
            rich_print(f":construction: Applying postprocessing: '[bold]{postprocessor.__class__.__name__}[/bold]' :construction:")
            dataf = postprocessor(dataf)
        return NumerFrame(dataf)

    def process_models(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """ Run all models. """
        for model in tqdm(self.models,
                                  desc=f"{self.pipeline_name} Model prediction: ",
                                  position=0):
            rich_print(f":robot: Generating model predictions with '[bold]{model.__class__.__name__}[/bold]'. :robot:")
            dataf = model(dataf)
        return NumerFrame(dataf)

    def pipeline(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        """ Process full pipeline and return resulting NumerFrame. """
        preprocessed_dataf = self.preprocess(dataf)
        prediction_dataf = self.process_models(preprocessed_dataf)
        processed_prediction_dataf = self.postprocess(prediction_dataf)
        rich_print(f":checkered_flag: [green]Finished pipeline:[green] [bold blue]'{self.pipeline_name}'[bold blue]! :checkered_flag:")
        return processed_prediction_dataf

    def __call__(self, dataf: Union[pd.DataFrame, NumerFrame]) -> NumerFrame:
        return self.pipeline(dataf)

Example using several preprocessor, dummy models and postprocessors

In [None]:
model_names = ["test_0.5", "test_0.8"]

dataf = create_numerframe("test_assets/mini_numerai_version_1_data.csv")
preprocessors = [GroupStatsPreProcessor(), FeatureSelectionPreProcessor(feature_cols=['feature_intelligence_mean', 'feature_intelligence_std'])]
models = [ConstantModel(constant=0.5, model_name=model_names[0]), ConstantModel(constant=0.8, model_name=model_names[1])]
postprocessors = [MeanEnsembler(cols=[f"prediction_{name}" for name in model_names], final_col_name='prediction_ensembled'),
                  FeatureNeutralizer(feature_names=['feature_intelligence_mean', 'feature_intelligence_std'],
                                     pred_name='prediction_ensembled', proportion=0.8)]

In [None]:
test_pipeline = ModelPipeline(preprocessors=preprocessors, models=models,
                              postprocessors=postprocessors, pipeline_name="test_pipeline",
                              standardize=False)
processed_dataf = test_pipeline(dataf)

test_pipeline Preprocessing::   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline Model prediction:   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline Postprocessing:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
assert processed_dataf.meta == dataf.meta
assert isinstance(processed_dataf, NumerFrame)
processed_dataf.head(2)

Unnamed: 0,feature_intelligence_mean,feature_intelligence_std,target,id,era,data_type,prediction_test_0.5,prediction_test_0.8,prediction_ensembled,prediction_ensembled_neutralized_0.8
0,0.333333,0.246183,0.5,n000315175b67977,era1,train,0.5,0.8,0.65,0.0
1,0.208333,0.234359,0.25,n0014af834a96cdd,era1,train,0.5,0.8,0.65,0.36088


## 2. ModelPipelineCollection

`ModelPipelineCollection` can be used to manage and run multiple `ModelPipeline` objects.

`ModelPipelineCollection` simply takes a list of `ModelPipeline` objects as input.

In [None]:
#| export
@typechecked
class ModelPipelineCollection:
    """
    Execute multiple initialized ModelPipelines in a sequence.

    :param pipelines: List of initialized ModelPipelines.
    """
    def __init__(self, pipelines: List[ModelPipeline]):
        self.pipelines = {pipe.pipeline_name: pipe for pipe in pipelines}
        self.pipeline_names = list(self.pipelines.keys())

    def process_all_pipelines(self, dataf: Union[pd.DataFrame, NumerFrame]) -> Dict[str, NumerFrame]:
        """ Process all pipelines and return Dictionary mapping pipeline names to resulting NumerFrames. """
        result_datafs = dict()
        for name, pipeline in tqdm(self.pipelines.items(),
                                   desc="Processing Pipeline Collection"):
            result_datafs[name] = self.process_single_pipeline(dataf, name)
        return result_datafs

    def process_single_pipeline(self, dataf: Union[pd.DataFrame, NumerFrame], pipeline_name: str) -> NumerFrame:
        """ Run full model pipeline for given name in collection. """
        rich_print(f":construction_worker: [bold green]Processing model pipeline:[/bold green] '{pipeline_name}' :construction_worker:")
        pipeline = self.get_pipeline(pipeline_name)
        dataf = pipeline(dataf)
        return NumerFrame(dataf)

    def get_pipeline(self, pipeline_name: str) -> ModelPipeline:
        """ Retrieve model pipeline for given name. """
        available_pipelines = self.pipeline_names
        assert pipeline_name in available_pipelines, f"Requested pipeline '{pipeline_name}', but only the following models are in the collection: '{available_pipelines}'."
        return self.pipelines[pipeline_name]

    def __call__(self, dataf: Union[pd.DataFrame, NumerFrame]) -> Dict[str, NumerFrame]:
        return self.process_all_pipelines(dataf=dataf)

We introduce a different pipeline with no preprocessing or postprocessing. Only a `RandomModel`.

In [None]:
test_pipeline2 = ModelPipeline(models=[RandomModel()], pipeline_name="test_pipeline2")

We process two `ModelPipeline`s with different characteristics on the same data.

In [None]:
collection = ModelPipelineCollection([test_pipeline, test_pipeline2])
assert collection.get_pipeline("test_pipeline2").pipeline_name == 'test_pipeline2'

In [None]:
result_datasets = collection(dataf=dataf)

Processing Pipeline Collection:   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline Preprocessing::   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline Model prediction:   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline Postprocessing:   0%|          | 0/2 [00:00<?, ?it/s]

test_pipeline2 Preprocessing:: 0it [00:00, ?it/s]

test_pipeline2 Model prediction:   0%|          | 0/1 [00:00<?, ?it/s]

test_pipeline2 Postprocessing: : 0it [00:00, ?it/s]

The `ModelPipelineCollection` returns a dictionary mapping pipeline names to `NumerFrame` objects, retaining all metadata and added prediction columns for each. Note that in this example, the 1st `NumerFrame` had a feature selection step, so it did not retain all columns. However, the second dataset retained all feature columns, because no preprocessing was done.

In [None]:
result_datasets.keys()

dict_keys(['test_pipeline', 'test_pipeline2'])

In [None]:
result_datasets['test_pipeline'].head(2)

Unnamed: 0,feature_intelligence_mean,feature_intelligence_std,target,id,era,data_type,prediction_test_0.5,prediction_test_0.8,prediction_ensembled,prediction_ensembled_neutralized_0.8
0,0.333333,0.246183,0.5,n000315175b67977,era1,train,0.5,0.8,0.65,0.0
1,0.208333,0.234359,0.25,n0014af834a96cdd,era1,train,0.5,0.8,0.65,0.36088


In [None]:
result_datasets['test_pipeline2'].head(2)

Unnamed: 0,id,era,data_type,feature_intelligence1,feature_intelligence2,feature_intelligence3,feature_intelligence4,feature_intelligence5,feature_intelligence6,feature_intelligence7,...,feature_wisdom39,feature_wisdom40,feature_wisdom41,feature_wisdom42,feature_wisdom43,feature_wisdom44,feature_wisdom45,feature_wisdom46,target,prediction_random
0,n000315175b67977,era1,train,0.0,0.5,0.25,0.0,0.5,0.25,0.25,...,1.0,0.75,0.5,0.75,0.5,1.0,0.5,0.75,0.5,0.5
1,n0014af834a96cdd,era1,train,0.0,0.0,0.0,0.25,0.5,0.0,0.0,...,1.0,0.0,0.0,0.75,0.25,0.0,0.25,1.0,0.25,0.1


-----------------------------------------------------------------------------