# Preprocessing Pipeline

In [36]:
from typing import Union, List, Literal, Annotated, Callable, Dict, Any, Tuple

from humps import camel
from pydantic import BaseModel, Field

import numpy as np
import pandas as pd
import sklearn.preprocessing
import sklearn.datasets
import torch.nn

from fleet.model_builder import splitters
from fleet import data_types
from fleet.utils.data import dataset_topo_sort, get_default_data_type_featurizer
from fleet.yaml_model import YAML_Model
from fleet.dataset_schemas import DatasetConfig, ColumnConfig
from fleet.model_builder.featurizers import DNASequenceFeaturizer
from fleet.model_builder.utils import get_class_from_path_string

In [37]:
! [ ! -f SAMPL.csv ] && wget https://deepchemdata.s3-us-west-1.amazonaws.com/datasets/SAMPL.csv

In [38]:
iris_df = sklearn.datasets.load_iris(as_frame=True)
sampl_df = pd.read_csv('SAMPL.csv')

classes = iris_df.target_names
def get_class(item):
    return classes[item]

iris_df.data['sepal length (cm)']
iris_df.data['species'] = np.apply_along_axis(get_class, 0, iris_df.target)
iris_df = iris_df.data

if 'step' not in sampl_df.columns:
    splitters.apply_split_indexes(
        sampl_df,
        split_type="scaffold",
        split_column="smiles",
        split_target="80-10-10")
if 'step' not in iris_df.columns:
    splitters.apply_split_indexes(
        iris_df,
        split_type="random",
        split_target="60-20-20")
    
    sampl_df.to_csv('SAMPL.csv', index=False)

# Some utility classes:

class CamelCaseModel(BaseModel):
    """
    Subclass this class to work with camel case serialization of the model.
    """
    class Config:
        alias_generator = camel.case
        allow_population_by_field_name = True
        allow_population_by_alias = True
        underscore_attrs_are_private = True

class CreateFromType(BaseModel):
    """
    Adds a method to instantiate a class from it's class path (type) and constructor_args.
    
    Attributes:
        type (str): The class path of the class that will be instantiated.
        constructor_args (BaseModel): The constructor arguments passed to the class.
    """
    type: str
    constructor_args:  Union[None, BaseModel]
    
    def create(self):
        class_ = get_class_from_path_string(self.type)
        if self.constructor_args:
            return class_(**self.constructor_args.dict())
        return class_()

In [39]:
# Featurizers

class FPVecFilteredTransformerConstructorArgs(BaseModel):
    """
    Models the constructor arguments of a FPVecFilteredTransformer.
    """
    del_invariant: bool = None
    length: int = None


class FPVecFilteredTransformerConfig(CamelCaseModel, CreateFromType):
    """
    Models the usage of FPVecFilteredTransformer. 
    """
    name: str
    constructor_args: FPVecFilteredTransformerConstructorArgs = FPVecFilteredTransformerConstructorArgs()
    type: Literal['molfeat.trans.fp.FPVecFilteredTransformer'] = 'molfeat.trans.fp.FPVecFilteredTransformer'
    forward_args: Union[None, dict] = None
    
    
class DNASequenceFeaturizerConfig(CamelCaseModel):
    name: str
    type: Literal["fleet.model_builder.featurizers.DNASequenceFeaturizer"] = "fleet.model_builder.featurizers.DNASequenceFeaturizer"
    forward_args: Union[None, dict] = None

Featurizer = Annotated[Union[
        FPVecFilteredTransformerConfig,
        DNASequenceFeaturizerConfig,
    ], Field(discriminator="type")]
class FeaturizerConfig(CamelCaseModel):
    __root__: Featurizer



In [40]:
# Transforms

class LabelEncoderConfig(CreateFromType, CamelCaseModel):
    type: Literal['sklearn.preprocessing.LabelEncoder'] = 'sklearn.preprocessing.LabelEncoder'
    name: str
    forward_args: Dict[str, str]


class StandardScalerConstructorArgs(BaseModel):
    with_mean: bool = True
    with_std: bool = True


class StandardScalerConfig(CreateFromType, CamelCaseModel):
    type: Literal['sklearn.preprocessing.StandardScaler'] = 'sklearn.preprocessing.StandardScaler'
    constructor_args: StandardScalerConstructorArgs = StandardScalerConstructorArgs()
    name: str
    forward_args: Dict[str, str]

Transformer = Annotated[Union[
        StandardScalerConfig,
        LabelEncoderConfig,
    ], Field(discriminator="type")]

class TransformConfig(CamelCaseModel):
    __root__: Transformer



In [41]:
TransformConfig.parse_obj({
    'name': 'transformer',
    'type': 'sklearn.preprocessing.StandardScaler',
    'forward_args': {
        'X': '$sepal_width'
    }
}).__root__.create()

In [64]:
class ColumnConfigWithPreprocessing(CamelCaseModel):
    name: str
    data_type: data_types.DataType
    transforms: Union[None, List[CreateFromType]] = None
    featurizers: Union[None, List[CreateFromType]] = None
    
    
class DatasetConfig(CamelCaseModel):
    name: str
    feature_columns: List[ColumnConfig]
    target_columns: List[ColumnConfig]
    featurizers: List[Featurizer]
    transforms: List[Transformer]
    
    
class DatasetConfigWithPreprocessing(CamelCaseModel, YAML_Model):
    name: str
    target_columns: List[ColumnConfigWithPreprocessing]
    feature_columns: List[ColumnConfigWithPreprocessing]
    
    def to_dataset_config(self) -> DatasetConfig:
        featurizers = []
        transforms = []
        for col in self.feature_columns + self.target_columns:
            if col.featurizers:
                previous_key = col.name
                for idx, col_featurizer in enumerate(col.featurizers):
                    key = f'{col.name}-feat-{idx}'
                    featurizer_args = col_featurizer.dict() | {
                            'name': key,
                            'forward_args': {
                                'X': f'${previous_key}'
                            }
                        }
                    if featurizer_args["constructor_args"] is None:
                        featurizer_args.pop("constructor_args")
                    featurizers.append(FeaturizerConfig.parse_obj(
                        featurizer_args
                    ))
                    previous_key = key
            if col.transforms:
                previous_key = col.name
                for idx, col_transform in enumerate(col.transforms):
                    key = f'{col.name}-transformer-{idx}'
                    transformer_args = col_transform.dict() | {
                        'name': key,
                        'forward_args': {
                            'X': f'${previous_key}'
                        }
                    }
                    if transformer_args["constructor_args"] is None:
                        transformer_args.pop("constructor_args")
                    transforms.append(TransformConfig.parse_obj(transformer_args))
                    previous_key = key
        return DatasetConfig(
            name=self.name,
            target_columns=[
                ColumnConfig(name=col.name,data_type=col.data_type)
                for col in self.target_columns
            ],
            feature_columns=[
                ColumnConfig(name=col.name,data_type=col.data_type)
                for col in self.feature_columns
            ],
            featurizers=[feat.__root__ for feat in featurizers],
            transforms=[transf.__root__ for transf in transforms]
        )
            
iris_config = DatasetConfigWithPreprocessing.from_yaml_str("""
name: Iris
targetColumns:
  - name: species
    dataType:
      domainKind: categorical
      classes: {}
    transforms:
      - type: sklearn.preprocessing.LabelEncoder
featureColumns:
  - name: sepal length (cm)
    dataType:
      domainKind: numeric
      unit: cm
    transforms:
      - type: sklearn.preprocessing.StandardScaler
  - name: sepal width (cm)
    dataType:
      domainKind: numeric
      unit: cm
    transforms:
      - type: sklearn.preprocessing.StandardScaler
  - name: petal length (cm)
    dataType:
      domainKind: numeric
      unit: cm
    transforms:
      - type: sklearn.preprocessing.StandardScaler
  - name: petal width (cm)
    dataType:
      domainKind: numeric
      unit: cm
    transforms:
      - type: sklearn.preprocessing.StandardScaler
""").to_dataset_config()

sampl_config = DatasetConfigWithPreprocessing.from_yaml_str("""
name: SAMPL
featureColumns:
  - name: smiles
    dataType:
      domainKind: smiles
    featurizers:
      - type: molfeat.trans.fp.FPVecFilteredTransformer
targetColumns:
  - name: expt
    dataType:
      domainKind: numeric
    transforms:
      - type: sklearn.preprocessing.StandardScaler
""").to_dataset_config()

print(iris_config)
print(sampl_config)

name='Iris' feature_columns=[ColumnConfig(name='sepal length (cm)', data_type=NumericDataType(domain_kind='numeric')), ColumnConfig(name='sepal width (cm)', data_type=NumericDataType(domain_kind='numeric')), ColumnConfig(name='petal length (cm)', data_type=NumericDataType(domain_kind='numeric')), ColumnConfig(name='petal width (cm)', data_type=NumericDataType(domain_kind='numeric'))] target_columns=[ColumnConfig(name='species', data_type=CategoricalDataType(domain_kind='categorical', classes={}))] featurizers=[] transforms=[StandardScalerConfig(type='sklearn.preprocessing.StandardScaler', constructor_args=StandardScalerConstructorArgs(with_mean=True, with_std=True), name='sepal length (cm)-transformer-0', forward_args={'X': '$sepal length (cm)'}), StandardScalerConfig(type='sklearn.preprocessing.StandardScaler', constructor_args=StandardScalerConstructorArgs(with_mean=True, with_std=True), name='sepal width (cm)-transformer-0', forward_args={'X': '$sepal width (cm)'}), StandardScalerCo

In [67]:
target_cols = list(map(lambda x: x.name, iris_config.target_columns))
feature_cols = list(map(lambda x: x.name, iris_config.feature_columns))
print(feature_cols)
iris_df.loc[:, feature_cols]

['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm)
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2
...,...,...,...,...
145,6.7,3.0,5.2,2.3
146,6.3,2.5,5.0,1.9
147,6.5,3.0,5.2,2.0
148,6.2,3.4,5.4,2.3


In [93]:
from pprint import pformat
class PreprocessingPipeline:
    """
    Preprocesses the dataset.

    Args:
        dataset_config: The object describing the columns and data_types of the dataset.
        df: The :class:`pd.DataFrame` that holds the dataset data.
    """

    def __init__(
        self,
        dataset_config: DatasetConfig,
    ):
        """
        Creates the pipeline steps without executing them.

        Args:
            dataset_config: The object describing the pipeline.
        """
        self.dataset_config = dataset_config
        self.featurizers = []
        self.transforms = []
        self.featurizers_config = {
            feat.name: feat for feat in dataset_config.featurizers
        }
        self.transforms_config = {
            transform.name: transform for transform in dataset_config.transforms
        }
        self.featurizers = {
            feat.name: feat.create() for feat in dataset_config.featurizers
        }
        self.transforms = {
            transform.name: transform.create() for transform in dataset_config.transforms
        }
        self._fitted = False


    def _prepare_transform(self, func: Any):
        if isinstance(func, sklern.base.TransformerMixin):
            return func
        elif callable(func):
            return sklearn.preprocessing.FunctionTransformer(func)
        else:
            raise ValueError(
                'func must be one of %r' % (
                    ['sklearn.base.TransformerMixin', 'Callable']
                )
            )

       
                
    def get_X_and_y(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        feature_columns = [
            col.name
            for col in self.dataset_config.feature_columns
        
        ]
        target_columns = [
            col.name
            for col in self.dataset_config.target_columns
        ]
        return df.loc[:, feature_columns], df.loc[:, target_columns]

    def fit(
        self,
        X: Union[pd.DataFrame, np.ndarray],
        y: Union[pd.DataFrame, np.ndarray, None] = None
    ):
        """
        Fits the featurizers and transforms to the data.
        """
        
        feats, transforms = map(list, dataset_topo_sort(self.dataset_config))
        
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X, columns=[col.name for col in self.dataset_config.feature_columns])

        if not isinstance(y, pd.DataFrame):
            y = pd.DataFrame(y, columns=[col.name for col in self.dataset_config.target_columns])
            
        for config in feats:
            feat = self.featurizers[config.name]
            try:
                if 'LabelEncoder' in config.type: # special case for featurizers that take only y
                    feat.fit(y=y)
                else:
                    feat.fit(X=X, y=y)
            except:
                raise RuntimeError('Failed to fit %r' % pformat(config))

        for config in transforms:
            transform = self.transforms[config.name]
            try:
                if 'LabelEncoder' in config.type:
                    transform.fit(y=y)
                else:
                    transform.fit(X=X, y=y)
            except:
                raise RuntimeError('Failed to fit %r' % pformat(config))
        self._fitted = True


    def fit_transform(
        self,
        X: Union[pd.DataFrame, np.ndarray],
        y: Union[pd.DataFrame, np.ndarray, None] = None
    ):

        feats, transforms = map(list, dataset_topo_sort(self.dataset_config))
        
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X, columns=[col.name for col in self.dataset_config.feature_columns])

        if not isinstance(y, pd.DataFrame):
            y = pd.DataFrame(y, columns=[col.name for col in self.dataset_config.target_columns])
            
        for config in feats:
            feat = self.featurizers[config.name]
            try:
                if 'LabelEncoder' in config.type: # special case for featurizers that take only y
                    feat.fit_transform(y=y)
                else:
                    feat.fit_transform(X=X, y=y)
            except:
                raise RuntimeError('Failed to fit %r' % pformat(config))

        for config in transforms:
            transform = self.transforms[config.name]
            try:
                if 'LabelEncoder' in config.type:
                    transform.fit_transform(y=y)
                else:
                    transform.fit_transform(X=X, y=y)
            except:
                raise RuntimeError('Failed to fit %r' % pformat(config))
        self._fitted = True


    def transform(self, data: Union[pd.DataFrame, np.ndarray]):
        ...


iris_preprocessing = PreprocessingPipeline(iris_config)
X, y = iris_preprocessing.get_X_and_y(iris_df)
iris_preprocessing.fit_transform(X, y)

Node: {'type': 'sklearn.preprocessing.StandardScaler', 'constructorArgs': {'with_mean': True, 'with_std': True}, 'name': 'sepal length (cm)-transformer-0', 'forwardArgs': {'X': '$sepal length (cm)'}}
Node: {'type': 'sklearn.preprocessing.StandardScaler', 'constructorArgs': {'with_mean': True, 'with_std': True}, 'name': 'sepal width (cm)-transformer-0', 'forwardArgs': {'X': '$sepal width (cm)'}}
Node: {'type': 'sklearn.preprocessing.StandardScaler', 'constructorArgs': {'with_mean': True, 'with_std': True}, 'name': 'petal length (cm)-transformer-0', 'forwardArgs': {'X': '$petal length (cm)'}}
Node: {'type': 'sklearn.preprocessing.StandardScaler', 'constructorArgs': {'with_mean': True, 'with_std': True}, 'name': 'petal width (cm)-transformer-0', 'forwardArgs': {'X': '$petal width (cm)'}}
Node: {'type': 'sklearn.preprocessing.LabelEncoder', 'constructorArgs': None, 'name': 'species-transformer-0', 'forwardArgs': {'X': '$species'}}


RuntimeError: Failed to fit "LabelEncoderConfig(type='sklearn.preprocessing.LabelEncoder', constructor_args=None, name='species-transformer-0', forward_args={'X': '$species'})"