In [25]:
# load dependencies
import pandas as pd
import numpy as np

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
from warnings import filterwarnings
from joblib import dump, load

#pd.options.display.max_columns = 100

In [2]:
filterwarnings('ignore')

In [3]:
# set constants
TRAIN_DATA_DIR = "python-dev-test/data/train.tsv"
TEST_DATA_DIR = "python-dev-test/data/test.tsv"

In [4]:
def read(filepath, sep = '\t', chunksize = 10000):
    iter_batch = pd.read_csv(filepath, sep = sep, chunksize = chunksize, engine = 'python')
    data = pd.DataFrame()
    for each_batch in iter_batch:
        data = data.append(each_batch, ignore_index = True)
    
    # split features
    features_splited = data['features'].str.split(',', expand = True).astype(np.int16)
    features_splited.index = data['id_job']

    # get unique feature types
    unique_feature_types = features_splited.loc[:,0].unique()
    
    data_dict = {}
    for each in features_splited.loc[:,0].unique():
        mask = features_splited.loc[:,0] == each
        temporary = features_splited[mask]
        temporary.drop(0,axis = 1,inplace = True)
        indexes = temporary.index
    #     f_union = FeatureUnion([
    #         ('z', Z_Score()),
    #         ('max_feature_index',MaxFeatureIndex()),
    #         ('max_feature_abs_diff',MaxFeatureAbsMeanDiff())
    #     ])

    #     pipe = Pipeline(
    #         steps = [('feature_unions',f_union)],
    #         verbose = True)

        #temporary = pd.DataFrame(pipe.fit_transform(temporary))
        #temporary.set_index(indexes,inplace = True)
        #temporary.columns = ['feature_{feature_type}_stand_{feature_n}'.format(feature_type = each, feature_n = each_feature) for each_feature in temporary.columns]
        #scaled_feature_titles = ['feature_{feature_type}_stand_{feature_n}'.format(feature_type = each, feature_n = each_feature) for each_feature in temporary.columns[:-2]]

        data_dict['feature_type_{i}'.format(i = each)] = temporary
    
        
    return data_dict

In [29]:
class PipelineBuilder:
    """Main pipeline craetor object"""
    
    def __init__(self):
        self.steps = []
        pass
        
    def add_step(self, step_title, transformer_obj):
        self.steps.append((step_title, transformer_obj)) 
        print("Pipeline steps: ", self.steps)
        return self
    
    def remove_step(self, step_title):
        for index,i in enumerate(self.steps):
            if step_title in i[0]:
                self.steps.pop(index)
        print("Pipeline steps: ", self.steps)
        return self
    
    def build_transformer(self, return_final_pipeline = True):
        if return_final_pipeline:
            self.feature_unions = FeatureUnion(self.steps, n_jobs = -1, )
            pipeline = Pipeline([('feature_unions', self.feature_unions)])
            return pipeline
        else:
            return self
    
    def add_model_to_pipe(self, step_title, model):
        raise NotImplementedError("Adding sklearn model's to pipeline not implemented yet.")
    
    

class Z_Score(BaseEstimator, TransformerMixin):
    """Takes in dataframe, extracts road name column, outputs average word length"""

    def __init__(self):
        pass

    def transform(self, X, y=None):
        mu = X.mean(axis = 0)
        sigma = X.std(axis = 0)
        X = (X - mu) / sigma
        return X

    def fit(self, X, y=None):
        """Returns `self` unless something different happens in train and test"""
        return self

class MaxFeatureIndex(BaseEstimator, TransformerMixin):

    def __init__(self):
        pass


    def transform(self, X, y = None):
        """
        Find max feature index for a given dataframe
        """

        max_feature_index = X.values.argmax(axis = 1)
        X['max_feature_index'] = max_feature_index
        return X[['max_feature_index']]

    def fit(self, X, y=None):
        """Returns `self` unless something different happens in train and test"""
        return self

class MaxFeatureAbsMeanDiff(BaseEstimator, TransformerMixin):

    def __init__(self):
        pass

    def fit(self, X, y = None):
        return self


    def transform(self, X, y = None):
        max_feature_index_values = X.values.argmax(axis = 1)
        unique_indexes = np.unique(max_feature_index_values)
        mean_by_feature_index = X.iloc[:, unique_indexes].mean(axis = 0)
        mean_by_feature_index = pd.DataFrame(mean_by_feature_index).set_index(unique_indexes).squeeze()

        X['max_feat_idx'] = max_feature_index_values
        X['max_value'] = X.max(axis = 1)
        X['mean_by_feature_index'] = X['max_feat_idx'].map(mean_by_feature_index)
        X['diff'] = abs(X['max_value'] - X['mean_by_feature_index'])
        return X[['diff']] 

class CustomNormalizer(BaseEstimator, TransformerMixin):

    def __init__(self, custom_normalization_func):
        self.custom_normalization_func = custom_normalization_func
        pass

    def fit(self, X, y = None):
        return self

    def transform(self, X, y = None):

        return self.custom_normalization_func(X)
        
    
def save_transformer_state(pipe_obj, filepath_name):
    joblib.dump(pipe_obj, filepath_name)


def load_transformer(filepath_name):
    loaded_pipe = joblib.load(filepath_name)

    return loaded_pipe

        

In [23]:
def transformsss(X):
    mu = X.mean(axis = 0)
    sigma = X.std(axis = 0)
    X = (X - mu) / sigma
    return X

In [30]:
class Data:
    
    def __init__(self, filepath, separator = '\t', batch_size = 10000):
        self.filepath = filepath
        self.separator = separator
        self.batch_size = batch_size
        
    def read(self):
        iter_batch = pd.read_csv(
            filepath_or_buffer = self.filepath, 
            sep = self.separator, 
            chunksize = self.batch_size, 
            engine = 'python')
        
        data = pd.DataFrame()
        for each_batch in iter_batch:
            data = data.append(each_batch, ignore_index = True)

        # split features
        features_splited = data['features'].str.split(',', expand = True).astype(np.int16)
        features_splited.index = data['id_job']

        # get unique feature types
        unique_feature_types = features_splited.loc[:,0].unique()
        
        # in case if there is multiple feature types - each dataframe will be stored in dictionary
        data_dict = {}
        for each in features_splited.loc[:,0].unique():
            mask = features_splited.loc[:,0] == each
            temporary = features_splited[mask]
            temporary.drop(0,axis = 1,inplace = True)
            indexes = temporary.index

            data_dict['feature_type_{i}'.format(i = each)] = temporary

        print("{path} - successfuly loaded. Avaliable feature types: {f_types}".format(path = self.filepath, f_types = list(data_dict.keys())))
        return data_dict

In [31]:
train = Data(TRAIN_DATA_DIR).read()
#test = Data(TEST_DATA_DIR).read()

python-dev-test/data/train.tsv - successfuly loaded. Avaliable feature types: ['feature_type_2']


In [32]:
piper = PipelineBuilder()

piper.add_step('Z_Score',Z_Score())
piper.add_step('MaxFeatureIndex', MaxFeatureIndex())
piper.add_step('MaxFeatureAbsMeanDiff', MaxFeatureAbsMeanDiff())

piper = piper.build_transformer()

piper.fit(train['feature_type_2'])

Pipeline steps:  [('Z_Score', Z_Score())]
Pipeline steps:  [('Z_Score', Z_Score()), ('MaxFeatureIndex', MaxFeatureIndex())]
Pipeline steps:  [('Z_Score', Z_Score()), ('MaxFeatureIndex', MaxFeatureIndex()), ('MaxFeatureAbsMeanDiff', MaxFeatureAbsMeanDiff())]


Pipeline(memory=None,
         steps=[('feature_unions',
                 FeatureUnion(n_jobs=-1,
                              transformer_list=[('Z_Score', Z_Score()),
                                                ('MaxFeatureIndex',
                                                 MaxFeatureIndex()),
                                                ('MaxFeatureAbsMeanDiff',
                                                 MaxFeatureAbsMeanDiff())],
                              transformer_weights=None, verbose=False))],
         verbose=False)

In [34]:
dump(piper,'some.joblib')

PicklingError: Can't pickle <class '__main__.Z_Score'>: it's not the same object as __main__.Z_Score