In [None]:
import json
import numpy as np
import pandas as pd
import pickle

import mrmr
import time

from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, DoubleType 
import pyspark.sql.functions as F

from sklearn.feature_selection import SelectKBest, mutual_info_classif, chi2,VarianceThreshold
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.ensemble import RandomForestClassifier
import sklearn.metrics

from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

from boruta import BorutaPy
from random import sample
from functools import partial
from hyperopt import hp, fmin, tpe, Trials, STATUS_OK

import tensorflow as tf
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.constraints import MaxNorm
from tensorflow.keras.utils import to_categorical

In [None]:
import sys
sys.path.append('/opt/deep learning/utilities')

from spark_setup import SetupEnvironment
environment = SetupEnvironment (conda_env='my_env')
spark = environment.setup_spark()
dcRead = environment.setup_DataCatalog()
s3 = environment.setup_s3()

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
sys.path.append('/repos/mme010/DS_Artifacts/prototypes/V2')

In [None]:
from push_training.common import mrp_pipeline , preprocess

In [None]:
VARIANCE_THRESHOLD = 0.0001
FLOATING_POINT_ERROR = 0.000001
VIF_THRESHOLD = 10
CORRELATION_THRESHOLD = 0.95 
CORRELATION_METHOD = "pearson"
MAX_EVALS_TUNE_TRAIN = 300
N_FEATURES_TO_TRY = np.round(np.geomspace (10, 500, 16)).astype(int)
MAX_EVALS = 50
FEATURE_SELECTION_METHOD = "modified_mrmr" #boruta
NON_FEATURE_COLS = ['id', 'id_secondary', 'LABEL', 'tdate', 'pttrn_trade_dt_all' , 'split']
SOURCE_TSFRESH_DATATYPE_DICTIONARY = {'length': 'discrete', 'maximum': 'continuous', 'mean':'continuous', 'median':'continuous','minimum':'continuous','quantile':'continuous'}

In [None]:
def pickle_put(data, name, s3):
    with s3.open(f'{s3.project_home}/{name}', 'wb') as f: 
        pickle.dump(data, f)

def pickle_get(name, s3):
    with s3.open(f'{s3.project_home}/{name}', 'rb') as f: 
        return pickle.load(f)

def keras_put(model, name, s3): 
    tmp_path='/tmp/tmp_model.h5'
    model.save(tmp_path)
    s3.put(tmp_path, f'{s3.project_home}/{name}')

def keras_get(name, s3): 
    tmp_path ='/tmp/tmp_model/h5'
    s3.get(f'{s3.project_home}/{name}', tmp_path) 
    model = tf.keras.models.load_model(tmp_path)
    return model

def json_put(data, name, s3):
    with s3.open(f'{s3.project_home}/{name}', 'w') as f: 
        json.dump(data, f)

def json_get(name, s3):
    with s3.open(f'{s3.project_home}/{name}', 'r') as f: 
        return json.load(f)

------------------------------------------------------------

In [None]:
class GetTsfreshSimplifiedMapping(mrp_pipeline.MRPStep):

    name = "tsfesh_simplifed_mapping"

    def get_map_without_prepends (self, metadata):
        """Get mapping of tsfresh names to simplified tsfresh names
        (without prepended string indicating source time series)"""

        start = 'ASK_PR__' 
        simplified_tsfresh_map = {
            entry['column_name'][len (start):]: entry['tsfresh_feature_name'][len (start):] 
            for entry in metadata 
            if entry['column_name'].startswith(start)
        }
        return simplified_tsfresh_map

    def prepend_to_map(self, to_prepend, mapping):
        """Prepend string 'to prepend' to strings on each side of mapping"""
        prepended_map = {
            to_prepend+key: to_prepend+value 
            for key, value in mapping.items()
        }
        return prepended_map

    def get_full_map(self, no_prepends_map):
        """Get full MTC push tsfresh names to simplified tsfresh names 
        (including prepended strings indicating source time serieses)"""
        full_simplified_tsfresh_map = {} 
        for to_prepend in ['Price_f__', 'cumul_qty_f_prtcp_', 'PRICE_f_prtcp_']: 
            full_simplified_tsfresh_map.update(
                self.prepend_to_map(to_prepend, no_prepends_map)
            )
        return full_simplified_tsfresh_map
    
    def main(self, mi_feature_metadata):
        no_prepends_map = self.get_map_without_prepends(mi_feature_metadata) 
        full_simplified_tsfresh_map = self.get_full_map(no_prepends_map) 

        print('\n tsfesh_simplifed_mapping > full_simplified_tsfresh_map:', full_simplified_tsfresh_map,'\n')
        
        return {
        'full_simplified_tsfresh_map': full_simplified_tsfresh_map
        }

In [None]:
def make_tsfresh_simplified_mapping_step(dcRead, s3, spark):
    def ts_mapping_load_inputs():
        return {
            'mi_feature_metadata': json.load(open('/repos/mme010/DS_Artifacts/prototypes/v2/push_training/momentum_ignition_feature_metadata.json', 'r'))
        }
        
    def ts_mapping_handle_outputs(full_simplified_tsfresh_map): 
        pickle_put(
            full_simplified_tsfresh_map, 
            'full_simplified_tsfresh_map.pickle', 
            s3)

    step_ts_mapping = GetTsfreshSimplifiedMapping(
        input_loader=ts_mapping_load_inputs, 
        output_handler=ts_mapping_handle_outputs)
    
    return step_ts_mapping

tsfresh_simplified_mapping_step = make_tsfresh_simplified_mapping_step(dcRead, s3, spark)
tsfresh_simplified_mapping_step.run()

In [None]:
#pickle_get('full_simplified_tsfresh_map.pickle',s3)

------------------------------------------------------------

In [None]:
class GetMapping (mrp_pipeline.MRPStep):

    name = "get_a_tsfresh_mapping"

    def expand_json(self, df, json_col, schema, prefix='EXPANDED_'):
        """Expand json columns to individual columns for each feature"""
        non_json_cols = [c for c in df.columns if F.col != json_col] 
        df = df.withColumn(
            json_col + '_unpacked_struct', 
            F.from_json(F.col(json_col), schema)
        )
        new_cols_to_expand = df.schema[json_col + '_unpacked_struct'].dataType.names 
        return df.select(
            non_json_cols 
            + [ 
                F.col(
                    json_col + '_unpacked_struct.' + c 
                ).cast(DoubleType()).alias(prefix + c) 
                for c in new_cols_to_expand
            ]
        )

    def get_a_tsfresh_mapping(self, push_events):
        """Get mapping from a_? column names to tsfresh column names"""
        tsfresh_cols = list(
            json.loads(
                push_events.limit(1).rdd.collect()[0]['PRDCN FATRS_TX']
            ).keys()
        )
        schema = StructType([
            StructField(name, StringType())
            for name in tsfresh_cols

        ]) 
        tsfresh_data = self.expand_json(push_events, 'PRDCN_FATRS_TX', schema, '')
        price_cols = tsfresh_cols[:779] 
        cumul_qty_prtcp_cols = tsfresh_cols[779:1558] 
        price_prtcp_cols = tsfresh_cols[1558:]
        feats = (
              [f'a{n}_1' for n in np.arange(779)] 
            + [f'a{n}_2' for n in np.arange(779)] 
            + [f'a{n}_3' for n in np.arange(779)]
        )
        a_to_tsfresh = {} 
        for feat in feats:
            ts = feat.split('_')[-1] 
            i_col = int(feat.split('_')[0][1:])
            assert i_col < 780, f' Column number too high {i_col},  ts:{ts}'
            if ts == '1':
                a_to_tsfresh[feat] = price_cols[i_col] 
            elif ts == '2':
                a_to_tsfresh[feat] = cumul_qty_prtcp_cols[i_col] 
            elif ts == '3':
                a_to_tsfresh[feat] = price_prtcp_cols[i_col] 
            else:
                print(f'No time-series {ts}')
                break 
        return a_to_tsfresh
    
    def main(self, mtc_prdcn_table): 
        print('\n get_a_tsfresh_mapping > a_to_tsfresh:', self.get_a_tsfresh_mapping(mtc_prdcn_table),'\n')
        return {
            'a_to_tsfresh': self.get_a_tsfresh_mapping(mtc_prdcn_table)
        }

In [None]:
def make_mapping_step(dcRead, s3, spark):
    def mapping_load_inputs():
        return {
            'mtc_prdcn_table': dcRead('prod').loadDataFrame(
            'MRP',
            'MARKING_THE_CLOSE_PUSH_PRDCN',
            'PRC', 
            'BZ',
            '2021-08-05')
        }

    def mapping_handle_outputs(a_to_tsfresh):
        pickle_put(a_to_tsfresh, 'a_to_tsfresh.pickle', s3)

    step_mapping = GetMapping(
        input_loader=mapping_load_inputs,
        output_handler=mapping_handle_outputs
    )
    return step_mapping

mapping_step = make_mapping_step(dcRead, s3, spark)
mapping_step.run()

In [None]:
#pickle_get('a_to_tsfresh.pickle',s3)

------------------------------------------------------------

In [None]:
class ConstructObservations(mrp_pipeline.MRPStep):

    name = "construct_observations"

    def widen_data(self, df):
        '''Transform data from initial format to single observation per row format'''

        id_1 = df.filter(F.col('id_secondary') == 1) 
        id_2 = df.filter(F.col('id_secondary') == 2) 
        id_3 = df.filter(F.col('id_secondary') == 3) 
        features = id_1.drop(
            'id', 'id_secondary', 'LABEL', 'tdate', 'pttrn_trade_dt_all' 
        ).columns 
        id_1 = id_1.select(
            [F.col(c).alias(f'{c}_1') for c in features] 
            + ['id', 'id_secondary', 'LABEL', 'tdate', 'pttrn_trade_dt_all']
            )
        id_2 = id_2.select([F.col(c).alias(f'{c}_2') for c in features] + ['id'])
        id_3 = id_3.select([F.col(c).alias(f'{c}_3') for c in features] + ['id']) 
        
        feat_2 = id_2.drop('id_secondary', 'LABEL', 'tdate', 'pttrn_trade_dt_all' ).columns 
        feat_3 = id_3.drop('id_secondary', 'LABEL', 'tdate', 'pttrn_trade_bot_all').columns 
        
        return id_1.alias('a').join(
            id_2.select(feat_2).alias('b').join(
                id_3.select(feat_3).alias('c'), 
                'id', 
                'inner'
            ),
            'id', 
            'inner'
        )

    def train_val_test_split(self, df, val_percent=0.1, test_percent=0.1):
        """Split done alphabetically by symbol"""

        df = df.withColumn('symbol', F.split(F.col('id'), '_')[1]) 
        n_symbols = df.select('symbol').distinct().count() 
        n_test_symbols = int(test_percent*n_symbols) 
        n_val_symbols = int(val_percent*n_symbols) 
        n_train_symbols = n_symbols - n_test_symbols - n_val_symbols
        sorted_symbols = sorted(df.select('symbol').distinct().toPandas()['symbol']) 
        test_symbols = set(sorted_symbols[:n_test_symbols]) 
        val_symbols = set(
            sorted_symbols[n_test_symbols: (n_test_symbols+n_val_symbols)]
        )
        train_symbols = set (sorted_symbols[(n_test_symbols+n_val_symbols):]) 
        assert len(train_symbols) == n_train_symbols 
        assert len(val_symbols) == n_val_symbols 
        assert len(test_symbols) == n_test_symbols 
        df = df.withColumn(
            'split', 
            F.when(F.col('symbol').isin(test_symbols), 'test')
            .when(F.col('symbol').isin(val_symbols), 'val') 
            .otherwise ('train')
        )
        df = df.drop('symbol') 
        return df

    def rename_a_columns_to_close_tsfresh(self, split_data, a_column_mapping):
        '''Rename a_? format feature column names to format close to tsfresh names'''

        non_feature_cols = ['id', 'id_secondary', 'tdate', 'LABEL', 'pttrn_trade_dt_all', 'split'] 
        feature_cols = [c for c in split_data.columns if c not in non_feature_cols] 
        renamed_data = split_data.select(
            [F.col(c).alias (a_column_mapping(c)) for c in feature_cols] + non_feature_cols
        )
        return renamed_data

    def main(self, mtc_push_table, a_column_mapping):
        wide_data = self.widen_data(mtc_push_table) 
        split_data = self.train_val_test_split(wide_data) 
        observations = self.rename_a_columns_to_close_tsfresh(split_data, a_column_mapping) 
        print('\n construct_observations > observations:', observations.head(),'\n',observations.columns.tolist(),'\n')
        return {
            'observations': observations
        }

In [None]:
def make_co_step(dcRead, s3, spark):
    def co_load_inputs():
        mtc_push_table = dcRead('prody-dev').loadDataFrame(
            "MRP",
            "MTC_tsfresh_data_training_data",
            "PRC",
            "ORC",
            None
        )
        a_column_mapping = pickle_get('a_to_tsfresh.pickle', s3)
        return {
            'mtc_push_table': mtc_push_table,
            'a_column_mapping': a_column_mapping
        }

    def co_handle_outputs(observations):
        observations.write.parquet(f'{s3.project_home}/observations.parquet', mode='overwrite')
    
    step_construct_observations = ConstructObservations(
        input_loader=co_load_inputs, 
        output_handler=co_handle_outputs
        )
    return step_construct_observations

co_step = make_co_step(dcRead, s3, spark)
co_step.run()

------------------------------------------------------------

In [None]:
class ReduceFeatureSet(mrp_pipeline.MRPStep):

    name = "feature_reduction"
    parameters = {
        "variance_threshold": 0.0001, 
        "floating point_error": 0.000001, 
        "vif_threshold": 10, 
        "correlation_threshold": 0.95, 
        "correlation_method": "pearson", 
        "soure_tsfresh_datatype_dictionary" : {'length': 'discrete', 'maximum': 'continuous', 'mean': 'continuous', 'median': 'continuous', 'minimum': 'continuous', 'quantile':'continuous','standard_deviation': 'continuous'}
    } 

    def prep_data(self, split_data):
        train_data = split_data.filter (F.col('split') == 'train') 
        train_data = train_data.drop('split')

        X_train , _ = preprocess.preprocess_pre_scale(train_data)

        return X_train

    def create_tsfresh_feature_reduction_dataframe(self, X_train):
        n_features = len(X_train.columns) 

        data = {'tsfresh_column': X_train.columns,
                'datatype': pd.Series('not specified', index = range(n_features)), 
                'in_current_feature_set': pd.Series(1, index = range(n_features)), 
                'reason_removed': pd.Series('None', index = range(n_features))
                }

        tsfresh_feature_reduction_dataframe = pd.DataFrame (data, index = range (n_features))
        
        # Creates columns datatype
        for tsfresh_feature, datatype in self.parameters['soure_tsfresh_datatype_dictionary'].items(): 
            for index, row in tsfresh_feature_reduction_dataframe.iterrows(): 
                if tsfresh_feature in row['tsfresh_column']:
                    tsfresh_feature_reduction_dataframe['datatype'][index] = datatype
        
        return tsfresh_feature_reduction_dataframe


    def remove_duplicate_features(self, X_train, tsfresh_feature_reduction_dataframe):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[tsfresh_feature_reduction_dataframe.in_current_feature_set == 1]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]
        
        keep_features_vector = X_train_subset.T.drop_duplicates().index

        n_features = len(keep_features_vector) 
        data = {'tsfresh_column': keep_features_vector,
                'keep_column': pd.Series(True, index = range(n_features))}

        keep_features_dataframe = pd.DataFrame(data, index = range(n_features))
        # Update tsfresh_feature_reduction dataframe with results 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.merge(keep_features_dataframe[['tsfresh_column', 'keep_column']], on = 'tsfresh_column', how = 'left') 
        tsfresh_feature_reduction_dataframe.loc[(tsfresh_feature_reduction_dataframe. keep_column != True) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed']] = [0, 'duplicate_feature'] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop(columns = ['keep_column'])
        
        return tsfresh_feature_reduction_dataframe


    def remove_constant_features (self, X_train, tsfresh_feature_reduction_dataframe) :
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[tsfresh_feature_reduction_dataframe.in_current_feature_set == 1]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]
        
        keep_features_vector = VarianceThreshold(threshold = 0).fit(X_train_subset).get_support()
        
        # Dataframe of results 
        n_features = len(X_train_subset.columis) 
        data = {'tsfresh_column': X_train_subset.columns,
                'keep_column': keep_features_vector}

        keep_features_dataframe = pd. DataFrame (data, index = range(n_features))
        # Update tsfresh_feature_reduction_dataframe with results 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.copy (deep = True).merge (keep_features_dataframe[['tsfresh_column', 'keep_column']],
                                                    on = 'tsfresh_column', how = 'left') 
        tsfresh_feature_reduction_dataframe. loc[(tsfresh_feature_reduction_dataframe. keep_column == False) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed']] = [0, 'constant feature: variance = 0'] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop(columns = ['keep_column'])
        return tsfresh_feature_reduction_dataframe

    def remove_quasi_constant_features(self, X_train, tsfresh_feature_reduction_dataframe):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[tsfresh_feature_reduction_dataframe.in_current_feature_set == 1]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]

        max_minus_min = X_train_subset.max() - X_train_subset.min()
        
        n_features = len(X_train_subset.columns) 
        data = {'tsfresh_column': X_train_subset.columns,
            'max minus_min': max_minus_min.values, 
            'keep_column': pd.Series(True, index = range(n_features))}

        keep_features_dataframe = pd. DataFrame (data, index = range(n_features )) 
        keep_features_dataframe. loc[keep_features_dataframe.max_minus_min <= self.parameters['floating_point_error'], 'keep_column'] = False

        # Update tsfresh_feature_reduction_dataframe with results 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.merge(keep_features_dataframe[['tsfresh_column', 'keep_column']], on = 'tsfresh_column', how = 'left') 
        tsfresh_feature_reduction_dataframe. loc[(tsfresh_feature_reduction_dataframe.keep_column == False) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed' ] ] = [0, 'quasi-constant feature'] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop (columns = ['keep_column'])


        return tsfresh_feature_reduction_dataframe

    def remove_low_variance_features (self, X_train, tsfresh_feature_reduction_dataframe):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[tsfresh_feature_reduction_dataframe.in_current_feature_set == 1]['tsfresh_column'].tolist 
        X_train_subset = X_train[relevant_tsfresh_features]

        scaler = MinMaxScaler().fit(X_train_subset) 
        scaled_X_train_subset = pd. DataFrame (scaler.transform(X_train_subset), columns = X_train_subset.columns)

        # An array of true/false 
        keep_features_vector = VarianceThreshold(threshold = self.parameters["variance_threshold"]).fit(scaled_X_train_subset).get_support()
        
        # Dataframe of results 
        n_features = len(scaled_X_train_subset.columns) 
        data = {'tsfresh_column': scaled_X_train_subset.columns,
                'keep_column': keep_features_vector}


        keep_features_dataframe = pd.DataFrame(data, index = range(n_features))
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.merge(keep_features_dataframe[['tsfresh_column', 'keep_column']], on = 'tsfresh_column', how = 'left') 
        tsfresh_feature_reduction_dataframe.loc[(tsfresh_feature_reduction_dataframe.keep_column == False) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed']] = [0, 'low variance feature: min-max scaled variance <= ' + str(self.parameters ["variance_threshold"])] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop(columns = ['keep_column'])
        
        return tsfresh_feature_reduction_dataframe


    def remove_highly_correlated_features (self, X_train, tsfresh_feature_reduction_dataframe): 
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[(tsfresh_feature_reduction_dataframe.in_current_feature_set == 1) &
                                                                        (tsfresh_feature_reduction_dataframe.datatype == 'continuous')]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]

        # correlation matrix 
        corr_matrix = X_train_subset.corr(method = self.parameters["correlation_method"]).abs ()
        
        # Select upper triangle of correlation matrix 
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        
        # Find features with correlation greater than threshold 
        keep_features_vector = [column for column in upper.columns if any (upper[column] >= self.parameters['correlation_thershold'])]
        
        # Dataframe of results 
        n_features = len(keep_features_vector) 
        data = {'tsfresh_column': keep_features_vector,
                'keep_column': pd. Series (False, index = range(n_features))} 
        keep_features_dataframe = pd. DataFrame (data, index = range(n_features))
        
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.merge(keep_features_dataframe[['tsfresh_column', 'keep_column']], on = 'tsfresh_column', how = 'left') 
        tsfresh_feature_reduction_dataframe. loc[(tsfresh_feature_reduction_dataframe. keep_column == False) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed']] = [0, 'highly correlated feature: pearson correlation >= ' + str(self.parameters["correlation_threshold"])] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop(columns = ['keep_column'])
        
        return tsfresh_feature_reduction_dataframe

    def find_highest_vif_feature(self, X, over=None):
        '''Find feature with maximum VIF (Variance Inflation Factor).
        Parameters
        ----------
        X: pandas.DataFrame
            DataFrame whose only columns are continuous features. 
        over: list[str] (optional)
            List of features over which to find the highest-VIF feature. If passed, the VIF 
            of each feature will still be computed relative to all other features in X, but
            the maximum will taken only over this list of features.


        Returns
        -------
        feature_name: str
            Name of feature with maximum VIF
        vif: float
            Maximum VIF
        '''

        # copy 
        X = X.copy()
        # normalize and drop columns where variance is within rounding error of 0 
        X = ((X - X.mean()) / X.std()).dropna(axis=1)

        # add a constant column, if there isn't already one there 
        X = add_constant(X)
        
        # save column names for later 
        cols = list(X.columns)

        # compute VIFs 
        X_np = X.to_numpy() 
        vifs = np.diagonal(np.linalg.pinv(np.matmul(X_np.T, X_np))) * (X.shape[0]-1)

        # Combine column names and vifs into dictionary of column names and vifs 
        vifs_dict = dict(list(zip(cols, vifs)))

        # Find highest VIF feature 
        if over:
            # don't consider any feature not in over when taking the maximum vif
            vifs_to_consider = [(col, vif) for col, vif in vifs_dict.items() if col in over] 
        else:
            # don't consider the synthetic 'const' feature when taking maximum vif 
            vifs_to_consider = [(col, vif) for col, vif in vifs_dict.items() if col != 'const']
            
        sorted_vifs_to_consider = sorted(vifs_to_consider, key=lambda x: x[1])
        if len(sorted_vifs_to_consider) > 0:
            return sorted_vifs_to_consider(-1) # (feature, vif) with highest vif value 
        else:
            raise ValueError('No features to find VIF for!')
        
    def remove_high_vif_features (self, X_train, tsfresh_feature_reduction_dataframe) : 
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[(tsfresh_feature_reduction_dataframe.in_current_feature_set == 1) &
                                                                        (tsfresh_feature_reduction_dataframe.datatype == 'continuous')]['tsfresh_column'].tolist() 
        print(f'REMOVING HIGH VIF FEATURES (evaluating {len (relevant_tsfresh_features)} features for removal)')

        X_train_subset = X_train[relevant_tsfresh_features]

        num_features = len(X_train_subset.columns) 
        keep_features_dataframe = pd.DataFrame(
            {
                'tsfresh_column': X_train_subset.columns, 'vif': pd. Series (np.NaN, indest = range(num_features))
            },
            index = range(num_features)
        )

        columns_to_maybe_drop = [x for x in relevant_tsfresh_features ] # copy list so original is not modified
        
        while len(columns_to_maybe_drop) > 0:
            then = time.time() 
            highest_vif_feature, current_max_vif = self.find_highest_vif_feature(X_train_subset, over = columns_to_maybe_drop) 
            print(f' Current highest VIF is {current_max_vif} for feature {highest_vif_feature}') 
            keep_features_dataframe. loc[keep_features_dataframe.tsfresh_column == highest_vif_feature, ['vif']] = current_max_vif 
            if current_max_vif >= self.parameters["vif_threshold"]:
                print(f'Dropping feature {highest_vif_feature}') 
                X_train_subset = X_train_subset.drop(columns = highest_vif_feature)
                columns_to_maybe_drop.remove(highest_vif_feature) 
            else:
                print(f'Keeping feature {highest_vif_feature}')
                break
            columns_to_maybe_drop.remove(highest_vif_feature) 
            now = time.time() 
            print(f'Features Evaluated: {len (relevant_tsfresh_features) - len(columns_to_maybe_drop)}/{len (relevant_tsfresh_features)}', end='') 
            print(' Time elapsed:', now - then)


        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.merge(keep_features_dataframe[['tsfresh_column', 'vif']], on = 'tsfresh_column', how = 'left')
        
        tsfresh_feature_reduction_dataframe.loc[
            (tsfresh_feature_reduction_dataframe.vif >= self.parameters["vif_threshold"]) & (tsfresh_feature_reduction_dataframe.in_current_feature_set == 1),
            ['in_current_feature_set', 'reason_removed']
        ] = [0, "high vif: vif >= " + str(self.parameters ["vif_threshold" ])] 
        tsfresh_feature_reduction_dataframe = tsfresh_feature_reduction_dataframe.drop(columns = ['vif'])
        
        return tsfresh_feature_reduction_dataframe

    def main(self, split_data):
        X_train = self.prep_data(split_data)
        
        tsfresh_feature_reduction_dataframe = self.create_tsfresh_feature_reduction_dataframe(X_train) 
        tsfresh_feature_reduction_dataframe = self.remove_constant_features(X_train, tsfresh_feature_reduction_dataframe) 
        tsfresh_feature_reduction_dataframe = self.remove_duplicate_features (X_train, tsfresh_feature_reduction_dataframe) 
        tsfresh_feature_reduction_dataframe = self.remove_quasi_constant_features(X_train, tsfresh_feature_reduction_dataframe) 
        #tsfresh_feature_reduction_dataframe = self. remove_low_variance_features (X_train, tsfresh_feature_reduction_dataframe) # don't use 
        tsfresh_feature_reduction_dataframe = self.remove_highly_correlated_features (X_train, tsfresh_feature_reduction_dataframe)

        # vif takes - 4 hrs to run on 1650 features 
        tsfresh_feature_reduction_dataframe = self.remove_high_vif_features (X_train, tsfresh_feature_reduction_dataframe)
        print('\n feature_reduction > tsfresh_feature_reduction_dataframe:', tsfresh_feature_reduction_dataframe.head(),'\n',tsfresh_feature_reduction_dataframe.columns.tolist(),'\n')
        return {
            'tsfresh_feature_reduction_dataframe': tsfresh_feature_reduction_dataframe
        }

In [None]:
def make_feature_reduction_step(dcRead,
    s3,
    spark,
    variance_threshold = VARIANCE_THRESHOLD,
    floating_point_error = FLOATING_POINT_ERROR,
    vif_threshold = VIF_THRESHOLD,
    correlation_threshold = CORRELATION_THRESHOLD,
    correlation_method = CORRELATION_METHOD,
    soure_tsfresh_datatype_dictionary = SOURCE_TSFRESH_DATATYPE_DICTIONARY):

    def feature_reduction_load_inputs():
        return {
            'split_data': spark.read.parquet(f'{s3.project_home}/observations.parquet')
        }
    def feature_reduction_handle_outputs(tsfresh_feature_reduction_dataframe):
        pickle_put(tsfresh_feature_reduction_dataframe, 'tsfresh_feature_reduction_dataframe.pickle', s3)

    step_feature_reduction = ReduceFeatureSet(
        parameters = {
            'variance_threshold': variance_threshold,
            'floating point_error': floating_point_error,
            'vif_threshold': vif_threshold,
            'correlation_threshold': correlation_threshold,
            'correlation_method': correlation_method,
            'soure_tsfresh_datatype_dictionary': soure_tsfresh_datatype_dictionary
        },
        input_loader = feature_reduction_load_inputs,
        output_handler = feature_reduction_handle_outputs
    )
    return step_feature_reduction 

feature_reduction_step = make_feature_reduction_step(dcRead, s3, spark)
feature_reduction_step.run()

In [None]:
#pickle_get('tsfresh_feature_reduction_dataframe.pickle',s3)

------------------------------------------------------------

In [None]:
class ModifiedMRMRFit(mrp_pipeline.MRPStep):

    name = "modified_mrmr_fit"

    def prep_data(self, split_data, scale = "minmax"):
        scaler = preprocess_fit.PreprocessFit().main(split_data, scale)['scaler']

        transformed = preprocess_transform.PreprocessTransform().main(split_data, scaler) 
        X_train = transformed['X_train']
        y_train = transformed['y_train']

        return X_train, y_train

    def create_tsfresh_feature_selection_dataframe(self, tsfresh_feature_reduction_dataframe) :
        tsfresh_feature_selection_dataframe = tsfresh_feature_reduction_dataframe[['tsfresh_column', 'datatype', 'in_current_feature_set']]
        return tsfresh_feature_selection_dataframe
        
    def select_mrmr_features(self, X_train, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[(tsfresh_feature_reduction_dataframe.in_current_feature_set == 1) & (tsfresh_feature_reduction_dataframe.datatype == 'continuous')]['tsfresh_column'].tolist()
        X_train_subset = X_train[relevant_tsfresh_features]

        n_features = len(relevant_tsfresh_features) 

        select_features_vector = mrmr.mrmr_classif(X_train_subset, y_train, K = n_features, return_scores = True, n_jobs = -1)[0]

        # Dataframe with results
        # tedad featurehaii ke meqdar 'in_current_feature_set' barabar ba 1 darand be onvane n_features estefade mishavand
        col = 'select_mrmr_continuous_' + str(n_features) 
        data = {'tsfresh_column': select_features_vector,
                col: pd. Series (range(1, n_features + 1), index = range(n_features))}
        
        select_features_dataframe = pd.DataFrame (data, index = range(n_features ))
        
        tsfresh_feature_selection_dataframe = tsfresh_feature_selection_dataframe.copy(deep = True).merge(select_features_dataframe[['tsfresh_column', col]], on = 'tsfresh_column', how = 'left')
        
        return tsfresh_feature_selection_dataframe

    def select_k_best_features(self, X_train, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe, datatype = "discrete"):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[(tsfresh_feature_reduction_dataframe.in_current_feature_set == 1) & (tsfresh_feature_reduction_dataframe.datatype == datatype)]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]

        n_features = len(relevant_tsfresh_features)

        if datatype == 'discrete':
            score_func = partial(mutual_info_classif, discrete_features = True)

        if datatype == "binary" :
            score_func = chi2

        kbest = SelectKBest(score_func = score_func, k = n_features) 
        kbest.fit(X_train_subset, y_train)

        col = 'select_k_best_' + str(datatype) + '_' + str(n_features) 
        data = {'tsfresh_column': X_train_subset.columns,
                'score': kbest.scores_}

        select_features_dataframe = pd.DataFrame (data, index = range(n_features)) 
        select_features_dataframe[col] = select_features_dataframe['score'].rank(ascending = False)

        tsfresh_feature_selection_dataframe = tsfresh_feature_selection_dataframe.copy(deep = True).merge( select_features_dataframe[['tsfresh_column', col]], on = 'tsfresh_column', how = 'left')
        
        return tsfresh_feature_selection_dataframe

    def main(self, split_data, tsfresh_feature_reduction_dataframe) :
        # y_train on 0-1 scale 
        X_train_minmax_scaled, y_train = self.prep_data(split_data, scale = "minmax")

        tsfresh_feature_selection_dataframe = self.create_tsfresh_feature_selection_dataframe(tsfresh_feature_reduction_dataframe)
        
        # Option 1: 
        # only numeric 
        tsfresh_feature_selection_dataframe = self.select_mrmr_features(X_train_minmax_scaled, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe) 
        # only discrete 
        tsfresh_feature_selection_dataframe = self.select_k_best_features (X_train_minmax_scaled, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe, datatype = "discrete") 
        # only binary 
        tsfresh_feature_selection_dataframe = self.select_k_best_features (X_train_minmax_scaled, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe, datatype = "binary")
        
        print('\n modified_mrmr_fit > tsfresh_feature_selection_dataframe:', tsfresh_feature_selection_dataframe.head(),'\n',tsfresh_feature_selection_dataframe.columns.tolist(),'\n')
        return {
        'tsfresh_feature_selection_dataframe': tsfresh_feature_selection_dataframe
        }

In [None]:
def make_modified_mrmr_fit_step(dcRead, s3, spark): 
    def modified_mrmr_fit_load_inputs():
        return {
            'split_data': spark.read.parquet (f'{s3.project_home} /observations. parquet'),
            'tsfresh_feature_reduction_dataframe': pickle_get('tsfresh_feature_reduction_dataframe.pickle', s3)}

    def modified_mrmr_fit_handle_outputs(tsfresh_feature_selection_dataframe):
        pickle_put(tsfresh_feature_selection_dataframe, 'tsfresh_feature_selection_dataframe.pickle', s3)
        
    step_modified_mrmr_fit = ModifiedMRMRFit(
        input_loader = modified_mrmr_fit_load_inputs,
        output_handler = modified_mrmr_fit_handle_outputs
    )
    return step_modified_mrmr_fit

modified_mrmr_fit_step = make_modified_mrmr_fit_step(dcRead, s3, spark)
modified_mrmr_fit_step.run()

In [None]:
#pickle_get('tsfresh_feature_reduction_dataframe.pickle',s3)

------------------------------------------------------------

In [None]:
class BorutaFit(mrp_pipeline.MRPStep):
    name = "boruta_fit"
    def prep_data(self, split_data, scale = "minmax"):
        scaler = preprocess_fit.PreprocessFit().main(split_data, scale)['scaler']
        
        transformed = preprocess_transform.PreprocessTransform().main(split_data, scaler)
        X_train = transformed['X_train']
        y_train = transformed['y_train']
        return X_train, y_train

    def create_tsfresh_feature_selection_dataframe (self, tsfresh_feature_reduction_dataframe) :
        tsfresh_feature_selection_dataframe = tsfresh_feature_reduction_dataframe[['tsfresh_column', 'datatype', 'in_current_feature_set']]

        return tsfresh_feature_selection_dataframe


    def select_boruta_random_forest_features (self, X_train, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe):
        relevant_tsfresh_features = tsfresh_feature_reduction_dataframe[(tsfresh_feature_reduction_dataframe.in_current_feature_set == 1)]['tsfresh_column'].tolist() 
        X_train_subset = X_train[relevant_tsfresh_features]

        random_forest = RandomForestClassifier(n_jobs = -1, class_weight = 'balanced', max_depth = 5)
        
        boruta = BorutaPy(estimator = random_forest, n_estimators = 'auto', max_iter = 100, random_state = 50919) 
        boruta.fit(np.array(X_train_subset), np.array(y_train))

        select_features_vector = X_train_subset.columns[boruta.support_].to_list()
        
        n_features = len(select_features_vector) 
        col = 'select_Boruta_' + str(n_features) 
        data = {'tsfresh_column': select_features_vector,
                col: pd. Series (True, indexy= range(n_features))}

        select_features_dataframe = pd.DataFrame(data, index = range(n_features))
        
        tsfresh_feature_selection_dataframe = tsfresh_feature_selection_dataframe.copy(deep = True).merge (select_features_dataframe[['tsfresh_column', col]], on = 'tsfresh_column', how = 'left')
        
        return tsfresh_feature_selection_dataframe


def main(self, split_data, tsfresh_feature_reduction_dataframe) :
    # y_train on 0-1 scale 
    X_train_minmax_scaled, y_train = self.prep_data(split_data, scale = "minmax")

    tsfresh_feature_selection_dataframe = self.create_tsfresh_feature_selection_dataframe (tsfresh_feature_reduction_dataframe) 
    tsfresh_feature_selection_dataframe = self.select_Boruta_random_forest_features (X_train_minmax_scaled, y_train, tsfresh_feature_reduction_dataframe, tsfresh_feature_selection_dataframe)
    print('\nboruta_fit outputs: tsfresh_feature_selection_dataframe:', tsfresh_feature_selection_dataframe.head() ,tsfresh_feature_selection_dataframe.columns.tolist(),'\n')
    return {
        'tsfresh_feature_selection_dataframe': tsfresh_feature_selection_dataframe
    }

In [None]:
def make_boruta_fit_step(dcRead, s3, spark):
    def boruta_fit_load_inputs():
        return {
            'split_data': spark.read.parquet(f'{s3.project_home } /observations. parquet'),
            'tsfresh_feature_reduction_dataframe': pickle_get('tsfresh_feature_reduction_dataframe.pickle', s3)}

    def boruta_fit_handle_outputs(tsfresh_feature_selection_dataframe) :
        pickle_put(tsfresh_feature_selection_dataframe, 'tsfresh_feature_selection_dataframe.pickle', s3)
        
    step_boruta_fit = BorutaFit(
        input_loader = boruta_fit_load_inputs,
        output_handler = boruta_fit_handle_outputs)

    return step_boruta_fit

boruta_fit_step = make_boruta_fit_step(dcRead, s3, spark)
boruta_fit_step.run()

In [None]:
#pickle_get('tsfresh_feature_selection_dataframe.pickle',s3)

------------------------------------------------------------

In [None]:
class SelectNFeatures(mrp_pipeline.MRPStep):

    name = "select_n_features"

    parameters = {
        "n_features_to_try": np.round(np.geomspace(10, 500, 12)).astype (int), 
        "max_evals": 50, 
        "feature_selection_method" : 'modified_mrmr'
    }

    def prep_data(self, n, split_data, tsfresh_feature_selection_dataframe, feature_selection_method): 
        if feature_selection_method == "modified_mrmr":
            feature_names = modified_mrmr_tune_n_features.ModifiedMRMRTunenFeatures().main(n, tsfresh_feature_selection_dataframe)['feature_names']
        if feature_selection_method == "boruta":
            feature_names = boruta_tune_n_features.BorutaTuneNFeatures().main(n, tsfresh_feature_selection_dataframe) ['feature_names']
        
        reduced_data = feature_selection_transform.FeatureSelection_Transform().main(split_data, feature_names)['reduced_data'] 
        scaler = preprocess_fit.PreprocessFit().main(reduced_data, scale = "minmax")('scaler') 
        data = preprocess_transform. PreprocessTransform().main(reduced_data, scaler)
        return data

    def get_roc_auc_score(self, model, X, y):
        """Get model ROC-AUC score on features x and labels y"""
        y_predict = model.predict(X)[:, 1] 
        roc_auc = sklearn.metrics.roc_auc_score(y, y_predict) 
        return {'roc_auc': roc_auc,
                'y_predict': y_predict
        }

    def main(self, split_data, tsfresh_feature_selection_dataframe):
        train_scores = [] 
        val_scores = 0 
        val_y_predictions = []

        for n in self.parameters["n_features_to_try"]:
            data = self.prep_data(n, split_data, tsfresh_feature_selection_dataframe, self.parameters["feature_selection method"]) 
            model = tune_train.TuneTrain(parameters = { 'max_evals': self.parameters['max_evals']}).main(data['X_train'], data['y_train'], data['X_val'], data['y_val'])['model']
            
            train_roc_auc = self.get_roc_auc_score (model, data[ 'X_train'], data[ 'y_train'])['roc_auc'] 
            val_roc_auc = self.get_roc_auc_score (model, data['X_val'], data['y_val'])['roc_auc'] 
            val_y_predict = self.get_roc_auc_score (model, data['X_val'], data['y_val'])['y_predict']
            
            train_scores.append(train_roc_auc) 
            val_scores.append(val_roc_auc) 
            val_y_predictions.append(val_y_predict)

            n_feats_best = self.parameters['n_features_to_try'][np.argmax (val_scores)]
        print('\n select_n_features >',{'n_feats_best': n_feats_best, 'n_features_tried': self.parameters['n_features_to_try'], 
                'train_scores': train_scores,'val_scores': val_scores, 'val_y_predictions': val_y_predictions},'\n') 
        return {
            'n_feats_best': n_feats_best, 
            'n_features_tried': self.parameters['n_features_to_try'], 
            'train_scores': train_scores,
            'val_scores': val_scores, 
            'val_y_predictions': val_y_predictions
        }

In [None]:
def make_select_n_features_step(
    dcRead,
    s3,
    spark,
    max_evals = MAX_EVALS,
    n_features_to_try = N_FEATURES_TO_TRY,
    feature_selection_method = FEATURE_SELECTION_METHOD
    ):

    def select_n_features_load_inputs():
        return {
        'split_data': spark.read.parquet(f'{s3.project_home} /observations.parquet'),
        'tsfresh_feature_selection_dataframe': pickle_get('tsfresh_feature_selection_dataframe.pickle', s3)}

    def select_n_features_handle_outputs (n_feats_best, n_features_tried, train_scores, val_scores, val_y_predictions):
        pickle_put(n_feats_best, 'n_feats_best.pickle', s3)
        pickle_put(n_features_tried, 'select_n_feats_n_feats.pickle', s3)
        pickle_put(train_scores, 'select_n_feats_train scores.pickle', s3)
        pickle_put(val_scores, 'select_n_feats_val_scores.pickle', s3)
        pickle_put(val_y_predictions, 'val_y_predictions.pickle', s3)

    step_select_n_features = SelectNFeatures(
        parameters = {
            'n_features_to_try': n_features_to_try,
            'max_evals': max_evals,
            'feature_selection method': feature_selection_method},
        input_loader=select_n_features_load_inputs,
        output_handler=select_n_features_handle_outputs
    )
    return step_select_n_features

select_n_features_step = make_select_n_features_step(dcRead, s3, spark)
select_n_features_step.run()

In [None]:
#pickle_get('n_feats_best.pickle',s3)
#pickle_get('select_n_feats_n_feats.pickle',s3)
#pickle_get('select_n_feats_train.pickle',s3)
#pickle_get('select_n_feats_val_scores.pickle',s3)
#pickle_get('val_y_predictions.pickle',s3)

------------------------------------------------------------

In [None]:
class ModifiedMRMRTuneNFeatures (mrp_pipeline.MRPStep):
    name = "modified_mrmr_tune_n_features"
    parameters = {
        "fraction_binary" : 0.09, 
        "fraction_discrete": 0.04,
    }
    def prep_data(self, tsfresh_feature_selection_dataframe, regex_col):
        relevant_tsfresh_dataframe = tsfresh_feature_selection_dataframe.filter(regex = 'select_' + regex_col + 'tsfresh_column').dropna () 
        feature_names = relevant_tsfresh_dataframe.sort_values(relevant_tsfresh_dataframe.columns[1])['tsfresh_column'].tolist()
        return feature_names

    def main(self, n, tsfresh_feature_selection_dataframe) :
        mrmr_numeric_feature_names = self.prep_data(tsfresh_feature_selection_dataframe, regex_col = 'mrmr') 
        kbest_binary_feature_names = self.prep_data(tsfresh_feature_selection_dataframe, regex_col = 'k_best_binary') 
        kbest_discrete_feature_names = self.prep_data(tsfresh_feature_selection_dataframe, regex_col = 'k_best_discrete')
        n_binary_features = min(np.ceil(n * self.parameters['fraction_binary']).astype (int), len(kbest_binary_feature_names)) 
        n_discrete_features = min(np.ceil(n * self.parameters['fraction_discrete']).astype(int), len (kbest_discrete_feature_names)) 
        n_continuous_features = min(n-n_discrete_features - n_binary_features, len(mrmr_numeric_feature_names))
        feature_names = mrmr_numeric_feature_names[0:n_continuous_features] + kbest_discrete_feature_names[0:n_discrete_features] + kbest_binary_feature_names[0:n_binary_features]
        print('\n modified_mrmr_tune_n_features > feature_names:', feature_names,'\n')
        return {
            'feature_names': feature_names
        }

In [None]:
def make_modified_mrmr_tune_n_features_step(dcRead, s3, spark):
    def modified_mrmr_tune_n_features_load_inputs():
        return {
            'n': pickle_get('n_feats_best.pickle', s3),
            'tsfresh_feature_selection_dataframe': pickle_get('tsfresh_feature_selection_dataframe.pickle', s3)
        }
    def modified_mrmr_tune_n_features_handle_outputs(feature_names) :
        pickle_put(feature_names, 'feature_names.pickle', s3)

    step_modified_mrmr_tune_n_features = ModifiedMRMRTuneNFeatures(
        input_loader = modified_mrmr_tune_n_features_load_inputs,
        output_handler = modified_mrmr_tune_n_features_handle_outputs)

    return step_modified_mrmr_tune_n_features

modified_mrmr_tune_n_features_step = make_modified_mrmr_tune_n_features_step(dcRead, s3, spark)
modified_mrmr_tune_n_features_step.run()

In [None]:
#pickle_get('feature_names.pickle',s3)

------------------------------------------------------------

In [None]:
class FeatureSelectionTransform(mrp_pipeline.MRPStep):
    name = "feature_selection_transform"
    parameters = {
        "non_feature_cols": ['id', 'id secondary', 'LABEL', 'tdate', 'pttrn_trade_dt_all', 'split']
    }

    def main(self, split_data, feature_names):
        
        reduced_data = split_data.select(feature_names + self.parameters['non_feature_cols'])
        print('\n feature_selection_transform > reduced_data:', reduced_data,'\n')
        return {
            'reduced_data': reduced_data
        }

In [None]:
def make_feature_selection_transform_step(
    dcRead,
    s3,
    spark,
    non_feature_cols = NON_FEATURE_COLS):

    def feature_selection_transform_load_inputs():
        return {
        'split_data': spark.read.parquet(f'{s3.project_home}/observations.parquet'),
        'feature_names': pickle_get('feature_names.pickle', s3)}

    def feature_selection_transform_handle_outputs(reduced_data): 
        return { 
            'reduced_data': reduced_data.write.parquet(
            f'{s3.project_home} /reduced_data.parquet', 
            mode='overwrite')
        }

    step_mrmr_transform = FeatureSelectionTransform(
        parameters = {
            'non_feature_cols': non_feature_cols},
        input_loader=feature_selection_transform_load_inputs,
        output_handler=feature_selection_transform_handle_outputs
    )
    return step_mrmr_transform

feature_selection_transform_step = make_feature_selection_transform_step(dcRead, s3, spark)
feature_selection_transform_step.run()

------------------------------------------------------------

In [None]:
class PreprocessFit(mrp_pipeline.MRPStep):

    name = "preprocess_fit"
    
    def main(self, split_data, scale = "minmax"):
        train_data = split_data.filter (F.col('split') == 'train') 
        train_data = train_data.drop('split')
        X_train, _ = preprocess.preprocess_pre_scale(train_data)
        if scale == "minmax":
            scaler = MinMaxScaler().fit(X_train) 
        if scale == "standard":
            scaler = StandardScaler().fit(X_train)
        print('\n preprocess_fit > scaler:', scaler,'\n') 
        return {
            'scaler': scaler
        }

In [None]:
def make_preprocess_fit_step(dcRead, s3, spark):
    def pf_load_inputs():
        return {
        'split_data': spark.read.parquet (f'{s3.project_home} /reduced_data.parquet')
        }
    def pf_handle_outputs(scaler):
        pickle_put (scaler, 'min_max_scaler.pickle', s3)

    step_preprocess_fit = PreprocessFit(
        input_loader = pf_load_inputs,
        output_handler = pf_handle_outputs)

    return step_preprocess_fit

preprocess_fit_step = make_preprocess_fit_step(dcRead, s3, spark)
preprocess_fit_step.run()

In [None]:
#pickle_get('min_max_scaler.pickle',s3)

------------------------------------------------------------

In [None]:
class Preprocess_Transform(mrp_pipeline.MRPStep):

    name = "preprocess_transform"

    def apply(self, data, scaler):
        X, y = preprocess.preprocess_pre_scale(data) 
        X_scaled = pd.DataFrame (scaler.transform(X), columns=X.columns) 
        return X_scaled, y

    def main(self, split_data, scaler):
        train_data = split_data.filter (F.col('split') == 'train') 
        val_data = split_data.filter (F.col('split') == 'val') 
        test_data = split_data.filter (F.col('split') == 'test') 
        train_data = train_data.drop('split') 
        val_data = val_data.drop('split') 
        test_data = test_data.drop('split') 
        X_train, y_train = self.apply(train_data, scaler) 
        X_val, y_val = self.apply(val_data, scaler) 
        X_test, y_test = self.apply(test_data, scaler)
        print('\n preprocess_transform >',{'X val': X_val,'y_val': y_val, 
                                            'X_test': X_test, 'y_test': y_test},'\n') 
        return {
            'X_train': X_train, 
            'y_train': y_train,
            'X val': X_val, 
            'y_val': y_val, 
            'X_test': X_test, 
            'y_test': y_test
        }

In [None]:
def make_preprocess_transform_step(dcRead, s3, spark):
    def pt_load_inputs():
        return {
            'split_data': spark.read.parquet (f'{s3.project_home} /reduced_data.parquet'),
            'scaler': pickle_get('min_max_scaler. pickle', s3)}

    def pt_handle_outputs(
        X_train,
        y_train,
        X_val,
        y_val,
        X_test,
        y_test):

        pickle_put(X_train, 'X_train.pickle', s3)
        pickle_put(y_train, 'y_train.pickle', s3)
        pickle_put(X_val, 'x_val.pickle', s3) 
        pickle_put(y_val, 'y_val.pickle', s3)
        pickle_put(X_test, 'X_test.pickle', s3)
        pickle_put(y_test, 'y_test.pickle', s3)

    step_preprocess_transform = PreprocessTransform(
        input_loader=pt_load_inputs, 
        output_handler=pt_handle_outputs)

    return step_preprocess_transform

preprocess_transform_step = make_preprocess_transform_step(dcRead, s3, spark)
preprocess_transform_step.run()

In [None]:
#pickle_get('X_train.pickle',s3)
#pickle_get('y_train.pickle',s3)
#pickle_get('x_val.pickle',s3)
#pickle_get('y_val.pickle',s3)
#pickle_get('X_test.pickle',s3)
#pickle_get('y_test.pickle',s3)

------------------------------------------------------------

In [None]:
class TuneTrain(mrp_pipeline.MRPStep):
    parameters = {
    'max_evals': 50
    }
    name = "tune_train"
    def define_search_space (self):
        """Define a hyperparameter search space for hyperopt to optimize over"""
        search_space = {} 
        search_space['intro_neurons'] = hp.quniform( "intro_neurons", 500, 1000, 1) 
        search_space['activations_intro'] = hp.choice(
            "activations_intro", 
            ['relu','elu', 'selu']
        )
        search_space['number_layers'] = hp.choice(
            'number_layers',
            [
                {
                    'number_layers': 0, 
                    'num_neurons_1': hp.quniform(
                        "num_neurons_1", 
                        1, 
                        1000, 
                        1
                    ),
                    "activations_1": hp.choice(
                        "activations_1", ['relu', 'elu', 'selu']
                    ),
                    'learning_rate': hp.loguniform(
                        'learning_rate_zero', 
                        np.log(0.1), 
                        np.log(1.4)
                    )
                },
                {      
                    'number_layers': 1, 
                    'num_neurons_second_1': hp.quniform(
                        'num_neurons_second_1',
                        1,
                        1000,
                        1
                    ),
                    'num_neurons_second_2' :hp.quniform(
                        'num_neurons_second_2', 
                        1, 
                        1000,
                        1
                    ),
                    "activations_2_first": hp.choice(
                        "activations_2_first", 
                        ['relu','elu', 'selu']
                    ),
                    "activations_2_second": hp.choice(
                        "activations_2_second", 
                        ['relu', 'elu', 'selu']
                    ),
                    'learning_rate': hp.loguniform(
                        'learning_rate_one', 
                        np.log(0.1), 
                        np.log(1.4)
                    )
                },
                {
                    'number_layers' : 2, 
                    'num_neurons_3_1': hp.quniform(
                        "num_neurons_3_1",
                        1,
                        1000,
                        1
                    ),
                    'num_neurons_3_2': hp.quniform(
                        "num_neurons_3_2",
                        1,
                        800,
                        1
                    ),
                    'num_neurons_3_3': hp.quniform(
                        "num_neurons_3_3",
                        1,
                        1000, 
                        10
                    ),
                    "activations_3_1" : hp.choice(
                        "activations_3_1", 
                        ['relu', 'elu', 'selu']
                    ),
                    "activations_3_2": hp.choice(
                        "activations_3_2", 
                        ['relu', 'elu', 'selu']
                    ),
                    "activations_3_3": hp.choice(
                        "activations_3_3", 
                        ['relu', 'elu', 'selu']
                    ),
                    'learning_rate': hp. loguniform(
                        'learning_rate_two', 
                        np.log(0.1), 
                        np.log(1.4)
                    )
                },
            ]
        )
        return search_space

    def objective(self, X_val, X_train, y_binary_val, y_binary_train, params):
        """Train a model and get its performance to use for hyperparameter tuning"""
        classifier = tf.keras.Sequential() 
        es_p = tf.keras.callbacks. EarlyStopping(
            monitor='val_auc', 
            verbose=1, mode='max', 
            patience=50, 
            restore_best_weights=True
        )
        es_1 = tf.keras.callbacks.EarlyStopping(
            monitor='val_loss', 
            verbose=1, 
            mode='min', 
            patience=50,
            restore_best_weights=True
        )
        
        # set up the input layer and actually the first hidden layer 
        classifier.add(tf.keras.layers.Dropout(
            0.2, 
            input_shape=(X_train.shape[-1],)
        ))
        classifier.add(tf.keras.layers.Dense(
                params['intro_neurons'], 
                activation=params['activations_intro']
        ))
        if params['number_layers'] == 0:
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num_neurons_1'], 
                activation=params['number_layers']['activations_1'], 
                kernel_constraint=MaxNorm(3)
            ))

        if params['number_layers'] == 1:
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num_neurons_second_1'], 
                activation=params['number_layers']['activations_2_first'], 
                kernel_constraint = MaxNorm(3)
            ))
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num neurons second 2'], 
                activation=params['number_layers']['activations_2_second' ], 
                kernel_constraint=MaxNorm(3)
            ))

        if params['number_layers'] == 2:
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num_neurons_3_1'], 
                activation=params['number_layers']['activations_3_1'], 
                kernel_constraint=MaxNorm(3)
            ))
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num_neurons_3_2'], 
                activation=params['number_layers']['activations_3_2'],
                    kernel_constraint=MaxNorm(3) 
                )) 
            classifier.add(tf.keras.layers.Dropout(0.6)) 
            classifier.add(tf.keras.layers.Dense(
                params['number_layers']['num_neurons_3_3'], 
                activation=params['number_layers']['activations_3_3'], 
                kernel_constraint = MaxNorm(3)
            ))
        classifier.add(tf.keras.layers.Dense(
            2,
            activation='softmax', 
            kernel_constraint = MaxNorm(3)
        ))
        optimizer = SGD(
            lr = params['number_layers']['learning_rate'], 
            momentum=0.9
        )
        classifier.compile(
            loss = tf.keras.losses.categoricalCrossentropy(), 
            optimizer=optimizer, 
            metrics=[tf.keras.metrics.AUC(name='auc')]
        )            
        classifier.fit(
            X_train, 
            y_binary_train, 
            epochs=50, 
            validation_data = (X_val, y_binary_val), 
            verbose=2, 
            batch_size = 64, 
            callbacks=[es_p, es_1]
        )
        score_2 = classifier.evaluate (X_val, y_binary_val) 
        return {'loss': - score_2[1], 'status': STATUS_OK, 'model': classifier}

    def obj_with_data(self, X_val, X_train, y_binary_val, y_binary_train): 
        return lambda params: self.objective(
            X_val, X_train, y_binary_val, y_binary_train, params
        )

    def getBestModelfromTrials(self, trials): 
        valid_trial_list = [
            trial for trial in trials if STATUS_OK == trial['result']['status']
        ]
        losses = [float(trial['result']['loss']) for trial in valid_trial_list] 
        index_having_minumum_loss = np.argmin (losses) 
        best_trial_obj = valid_trial_list[index_having_minumum_loss ] 
        return best_trial_obj['result']['model']
        
    def main(self, X_train, y_train, X_val, y_val):
        y_binary_train = to_categorical(y_train.values) 
        y_binary_val = to_categorical(y_val.values) 
        search_space = self.define_search_space() 
        algo = tpe.suggest 
        trials = Trials() 
        best = fmin(
            self.obj_with_data(X_val, X_train, y_binary_val, y_binary_train),
            search_space, 
            algo=algo, 
            trials=trials, 
            max_evals=self.parameters['max_evals']
        )
        model = self.getBestModelfromTrials (trials) 
        print('\n tune_train > model:', model,'\n') 
        return {
            'model': model
        }

In [None]:
def make_tune_train_step(dcRead, s3, spark, max_evals=MAX_EVALS_TUNE_TRAIN):

    def tt_load_inputs():
        return{
        'X_train': pickle_get('X_train.pickle', s3),
        'y_train': pickle_get('y_train.pickle', s3),
        'X_val': pickle_get('X_val.pickle', s3),
        'y_val': pickle_get('y_val.pickle', s3)}

    def tt_handle_outputs (model):
        keras_put(model, 'model.h5', s3) 

    step_tune_train = TuneTrain(
        parameters={ 
            'max_evals': max_evals 
        },
        input_loader=tt_load_inputs, 
        output_handler=tt_handle_outputs
    )
    return step_tune_train

tune_train_step = make_tune_train_step(dcRead, s3, spark)
tune_train_step.run()

In [None]:
#keras_put(model, 'model.h5', s3)

------------------------------------------------------------

In [None]:
class Evaluate(mrp_pipeline.MRPStep):

    name = "evaluate"

    def main(self, model, X_test, y_test):
        y_predict = model.predict(X_test)[:, 1] 
        y_binary_predict = (y_predict > 0.5).astype (float) 
        metrics_dict = {
            'recall': [sklearn.metrics.recall_score(y_test, y_binary_predict)],
            'precision': [sklearn.metrics.precision_score(y_test, y_binary_predict)], 
            'roc_auc': [sklearn.metrics.roc_auc_score(y_test, y_predict)]
        }
        print('\n evaluate > ', metrics_dict,'\n')
        return metrics_dict

In [None]:
def make_evaluate_step (dcRead, s3, spark):
    def eval_load_inputs():
        return {
            'model': keras_get('model.h5', s3),
            'X_test': pickle_get('X_test.pickle', s3),
            'y_test': pickle_get('y_test.pickle', s3)}

    def eval_handle_outputs (recall, precision, roc_auc):
        print(f'Recall: {recall}')
        print(f'Precision: {precision}')
        print(f'ROC-AUC: {roc_auc}')
    
    step_evaluate = Evaluate(
        input_loader = eval_load_inputs,
        output_handler = eval_handle_outputs
    )
    return step_evaluate

evaluate_step = make_evaluate_step(dcRead, s3, spark)
evaluate_step.run()

------------------------------------------------------------

In [None]:
class CompileMetadata (mrp_pipeline.MRPStep):

    name = "compile_metadata"
    
    def main(self, simplified_tsfresh_map, selected_features, scaler):
        metadata = {} 
        metadata.update({
            'simplified_tsfresh_to_tsfresh' : {
                'description': 'simplifed tsfresh name to tsfresh name mapping', 
                'data': simplified_tsfresh_map
            }
        })
        metadata.update({ 
            'selected_features' : {
                'description': 'Selected features in simplified tsfresh name format', 
                'data': selected_features
            }
        })
        scaling = [
            {
                'feature': selected_features[i], 
                'min': scaler.data_min_[i], 
                'max': scaler.data_max_[i]
            }
            for i in range (len(selected_features))
        ]
        metadata.update({ 
        'scaling': {
                'description': 'Min and Max values for each selected feature with features in simplified tsfresh name format', 
                'data': scaling
            }
        })

        print('\n compile_metadata > metadata:', metadata,'\n')
        return {
            'metadata': metadata
        }

In [None]:
def make_compile_metadata_step(dcRead, s3, spark):
    def cm_load_inputs():
        return {
            'simplified_tsfresh_map': pickle_get('full_simplified_tsfresh_map.pickle', s3),
            'selected_features': pickle_get('feature_names.pickle', s3),
            'scaler': pickle_get('min_max_scaler.pickle', s3)
        }

    def cm_handle_outputs (metadata):
        json_put(metadata, 'metadata.json', s3)
        
    step_cm = CompileMetadata(
        input_loader=cm_load_inputs,
        output_handler=cm_handle_outputs)
    
    return step_cm

compile_metadata_step = make_compile_metadata_step(dcRead, s3, spark)
compile_metadata_step.run()

In [None]:
#json_get('metadata.json', s3)

------------------------------------------------------------

In [None]:
class Stage (mrp_pipeline.MRPStep):

    name = "stage"

    def main(self, split_data, metadata, model, run_info): 
        print('\n stage >',{'split_data': split_data,'run_info': run_info},'\n') 
        return {
            'split_data': split_data, 
            'metadata': metadata, 
            'model': model, 
            'run_info': run_info
        }

In [None]:
def make_stage_step(dcRead, s3, spark):
    def get_run_info() :
        with open('/mnt/metadata/run_information.json', 'r') as f:
            run_info = json.load(f)
        return run_info

    def stage_load_inputs():
        return {
            'split_data': spark.read.parquet (f'{s3.project_home} /observations.parquet'),
            'metadata': json_get('metadata.json', s3),
            'model': keras_get('model.h5', s3),
            'run_info': get_run_info()
        }

    def stage_handle_outputs (split_data, metadata, model, run_info):
        split_data.write.parquet(
            f'{s3.project_home}/staged/split_data. parquet',
            mode='overwrite'
        )
        json_put (metadata, 'staged/metadata.json', s3) 
        keras_put (model, 'staged/model.h5', s3)
        json_put (run_info, 'staged/run_info.json', s3)

    step_stage = Stage(
        input_loader = stage_load_inputs,
        output_handler = stage_handle_outputs
    )    
    return step_stage


stage_step = make_stage_step(dcRead, s3, spark)
stage_step.run()

In [None]:
#json_get('staged/metadata.json', s3)
#keras_put(model, 'model.h5', s3)
#json_get('staged/run_info.json', s3)