In [1]:
# All the libraries to be imported are defined here
import numpy as np
import pandas as pd
import os
import json
import copy
import sys
from datetime import datetime
from sklearn.preprocessing import KBinsDiscretizer, OneHotEncoder
from sklearn.base import TransformerMixin

# from .supporting_functions import process_stage_nosrom
# from upset_flagger.upset.upset_scoring import score_upset_subevent
from functools import partial

### Prepare for model parameters

In [12]:
upset_flagger_path = 'C:/Users/XiaoShuoCui/Documents/GitHub/suncor/upset_flagging/'
sys.path.append(upset_flagger_path) ## to use the module(upset flagging folder) defined under this path later on

In [13]:
from upset_flagger.upset.upset_scoring import score_upset_subevent

In [3]:
config_data = json.load(open(upset_flagger_path + 'upset_config.json','r'))

In [4]:
target_se = 'HPW Tank Temperature'  # Target upset subevent

In [5]:
# get the appropriate upset flagging config
for ut in config_data['upset_types']:
    for se in ut['flags']:
        if se['upset_subevent'] == target_se:
            se_config = copy.copy(se)
            break
upset_flag_config = se_config
upset_flag_config

{'asset_id': 'a6e127fe-e718-4f78-a483-67f75b69d43d',
 'upset_subevent': 'HPW Tank Temperature',
 'rule_type': 'special hpw tank temperature',
 'lookback': 11,
 'resolution': 5,
 'robustness': 1,
 'tag_names': ['82TI867'],
 'window_size': 36,
 'ui_subevent_label': 'Tank Temperature Drop',
 'ui_threshold_label': ' -2 F/hr',
 'options': [],
 'persistence_type': 'sustained increase',
 'persistence_robustness': 3,
 'persistence_threshold': 0.01,
 'threshold_change': -1.1111,
 'threshold_temp': 82.2222}

In [6]:
## parameter preparation
''' New configuration which leverages FPA functionality

    hw1: window sizes for computing differences (x(t) - x(t-w)) where w is the window size
    hw2: window sizes for getting previous values to include in the current prediction record. This
        is essentialy a type of flattening to incorporate prior values of features. A window size 
        of w means you would get x(t-w).
    dead period: how much in advance to try to force the predictor to anticipate the actual upset
    window size: size of the prediction window (prior to dead period) in which to generate positive 
        samples. 
    special functions: list of special functions, which need to be defined in base/base_upset.py
        these functions will be expanded and their tags added to the list of tags (tags_extended)
        the special function name should not appear in the tags list
    tags: list of tags on which feature generation will occur, should include categorical tags and weather tags
    categorical_tags: list of tags which should be handled as categorical
    weather_tags: tags which come from weather data (do we need this distinction?)

'''

# original
hw1 = ["{}min".format(k) for k in range(55, 0, -5)]
hw2 = ["{}min".format(k) for k in range(25, 0, -5)]

modeling_params = {
    'timestamp_column': 'timestamp_utc',
    'upset_flag_config': upset_flag_config,
    'special_functions': ['87FI510_60F+4FI_58_60F'],
    'tags': ['BOILER12_STAT', '25TI982', '35FY510A', '39TI317', '31FY2', '31PIC19', '35FY110A', 
             'BOILER4_STAT', 'BOILER13_STAT', '39TI402', 'BOILER2_STAT', 'P3_3FC311_TOT', 
             '31FY3', '35FY210A', '31PIC18', '31-FIT-1', 'BOILER3_STAT', '31PI1A', '82LC860', 
             '39PIC573', '82TI867', 'P3_3E1_HPW_TEMP', 'BOILER1_STAT', '25FI29', 
             'BOILER14_STAT', '39FI104', 'P39_GTG5_STATUS', '4MKC10FE902', 'P16_PEW_TEMP_TOT', 
             '3MKC10FE902', '31FIC172', 'temperature', '35FY410A', '57FC1261', '39FI204', 
             '57TI1502', 'PEW1_HPW_FLOW', 'BOILER15_STAT'
            ], # list of regular tags to include
    'categorical_tags': ['BOILER14_STAT', 'BOILER12_STAT', 'BOILER2_STAT', 'BOILER3_STAT', 
                         'BOILER13_STAT', 'P39_GTG5_STATUS', 'BOILER1_STAT', 'BOILER4_STAT', 
                         'BOILER15_STAT'],
    'conversions': { 
        'null_to_zero_tags': [],
    },
    'fpa_params': {
        'failure_detection_window': '30min', # default is 5 min
        'dead_period': '0min',
        'stages': [
                   { 'window_sizes': hw1,
                     'simple_functions': [None]*len(hw1),
                     'advanced_functions': [['delta_diff']]*len(hw1),
                     'tags': 'numeric',
                   },
                   { 'window_sizes': hw2,
                     'simple_functions': [None]*len(hw2),
                     'advanced_functions': [['past_value']]*len(hw2),
                     'tags': 'all'
                   }
                  ]
    },
    'debug': True}

In [7]:
class SeriesOneHotEncoder(TransformerMixin):
    def __init__(self, onehot_name_format="{}_cat={}"):
        self.onehot_nf = onehot_name_format
        self.colnames = None
        self.encoder = None

    def _check_input(self, X):
        # check for pandas series
        if not isinstance(X, pd.Series):
            raise ValueError("Data passed to one-hot encoding function is not a pandas.Series")

    def _transform(self, v, mask, index, colname):
        v_enc = self.encoder.transform(v[mask].reshape(-1, 1))

        v_enc_final = np.zeros(shape=(mask.shape[0], v_enc.shape[1]))
        v_enc_final[mask] = v_enc

        names = [self.onehot_nf.format(colname, af) for af in self.encoder.categories_[0]]
        self.colnames = names
        return pd.DataFrame(v_enc_final, index=index, columns=names)

    def fit_transform(self, X, y=None, **kwargs):

        self._check_input(X)
        v = X.values
        colname = X.name

        self.encoder = OneHotEncoder(handle_unknown='ignore', sparse=False)
        mask = ~np.isnan(v)
        self.encoder.fit(v[mask].reshape(-1, 1))

        return self._transform(v, mask, X.index, colname)

    def transform(self, X, y=None, **kwargs):

        self._check_input(X)
        v = X.values
        colname = X.name
        mask = ~np.isnan(v)

        return self._transform(v, mask, X.index, colname)


In [8]:
class SpecialFunctions(object):
    def __init__(self):
        self.specials = {
            '87FI510_60F+4FI_58_60F': {
                'tags': ['87FI510_60F', '4FI_58_60F'],
                'function': self.ips_flow,
            },
            '4VOL_13CHG - (300FC5 + 86FIC613)': {
                'tags': ['4VOL_13CHG', '300FC5', '86FIC613'],
                'function': self.vol_change,
            },
            '87FI122-4FI_70': {
                'tags': ['87FI122', '4FI_70'],
                'function': self.froth_flow,
            },
            '87_FI_SUM': {
                'tags': ['87FI205', '87FI235', '87FI265', '87FI1205', '87FI305', '87FI335', '87FI365', '87FI1305'],
                'function': self.feed_87,
            },
            'NHT1': {
                'tags': ['7FIC3', 'PROD_PLAN_2W_UPG_NHT1_PROD', 'PROD_PLAN_2W_UPG_NHT1_PROD'],
                'function': self.nht1,
            },
            'NHT2': {
                'tags': ['P55_NHU_CHG_TOT', 'PROD_PLAN_2W_UPG_NHT2_PROD', 'PROD_PLAN_2W_UPG_NHT2_PROD'],
                'function': self.nht2,
            },
            'NHT3': {
                'tags': ['P64_NHU_CHG_TOT', 'PROD_PLAN_2W_UPG_NHT3_PROD', 'PROD_PLAN_2W_UPG_NHT3_PROD'],
                'function': self.nht3,
            },
            'KHT': {
                'tags': ['7FIC20', 'PROD_PLAN_2W_UPG_KHT_PROD', 'PROD_PLAN_2W_UPG_KHT_PROD'],
                'function': self.kht,
            },
            'GOHT1': {
                'tags': ['7FIC_39_40_41', 'PROD_PLAN_2W_UPG_GOTH1_PROD', 'PROD_PLAN_2W_UPG_GOTH1_PROD'],
                'function': self.goht1,
            },
            'GOHT2': {
                'tags': ['P55_GOHU_CHG_TOT', 'PROD_PLAN_2W_UPG_GOTH2_PROD', 'PROD_PLAN_2W_UPG_GOTH2_PROD'],
                'function': self.goht2,
            },
            'DHT': {
                'tags': ['P55_DHU_CHG_TOT', 'PROD_PLAN_2W_UPG_DTH1_PROD', 'PROD_PLAN_2W_UPG_DTH1_PROD'],
                'function': self.dht,
            },
            'MILL_OILSAND_MASS_1HOUR_AVG': {
                'tags': ['MILL_OILSAND_MASS'],
                'function': self.mill_oilsand_mass_1hour_avg,
            }
        }
        self.consts = {'bbl_per_m3': 6.29}
        self.functions = {k: partial(v['function'], consts=self.consts) for k, v in self.specials.items()}
        self.tag_map = {k: v['tags'] for k, v in self.specials.items()}

    @staticmethod
    def ips_flow(df_in, consts=None):
        return df_in['87FI510_60F'] + df_in['4FI_58_60F']

    @staticmethod
    def vol_change(df_in, consts=None):
        return df_in['4VOL_13CHG']/consts['bbl_per_m3'] - (df_in['300FC5'] + df_in['86FIC613'])

    @staticmethod
    def froth_flow(df_in, consts=None):
        return df_in['87FI122']-df_in['4FI_70']

    @staticmethod
    def feed_87(df_in, consts=None):
        return df_in['87FI205'] + df_in['87FI235'] + df_in['87FI265'] + df_in['87FI1205'] + df_in['87FI305'] + df_in['87FI335'] + df_in['87FI365'] + df_in['87FI1305']

    @staticmethod
    def nht1(df_in, consts=None):
        return (df_in['7FIC3'] - df_in['PROD_PLAN_2W_UPG_NHT1_PROD']) / df_in['PROD_PLAN_2W_UPG_NHT1_PROD']

    @staticmethod
    def nht2(df_in, consts=None):
        return (df_in['P55_NHU_CHG_TOT'] - df_in['PROD_PLAN_2W_UPG_NHT2_PROD']) / df_in['PROD_PLAN_2W_UPG_NHT2_PROD']

    @staticmethod
    def nht3(df_in, consts=None):
        return (df_in['P64_NHU_CHG_TOT'] - df_in['PROD_PLAN_2W_UPG_NHT3_PROD']) / df_in['PROD_PLAN_2W_UPG_NHT3_PROD']

    @staticmethod
    def kht(df_in, consts=None):
        return (df_in['7FIC20'] - df_in['PROD_PLAN_2W_UPG_KHT_PROD']) / df_in['PROD_PLAN_2W_UPG_KHT_PROD']

    @staticmethod
    def goht1(df_in, consts=None):
        return (df_in['7FIC_39_40_41'] - df_in['PROD_PLAN_2W_UPG_GOTH1_PROD']) / df_in['PROD_PLAN_2W_UPG_GOTH1_PROD']

    @staticmethod
    def goht2(df_in, consts=None):
        return (df_in['P55_GOHU_CHG_TOT'] - df_in['PROD_PLAN_2W_UPG_GOTH2_PROD']) / df_in['PROD_PLAN_2W_UPG_GOTH2_PROD']

    @staticmethod
    def dht(df_in, consts=None):
        return (df_in['P55_DHU_CHG_TOT'] - df_in['PROD_PLAN_2W_UPG_DTH1_PROD']) / df_in['PROD_PLAN_2W_UPG_DTH1_PROD']

    @staticmethod
    def mill_oilsand_mass_1hour_avg(df_in, consts=None):
        return (df_in['MILL_OILSAND_MASS'].rolling(12).mean())

#### Notes:
Here we learnt the decorator @staticmethod:
1. What: 
It is a decorator which is simply a normal function that is logically contained in the class for readability purposes. It can be called without instantiating the class first, which means we do not need to pass the class instance as the first argument via self, unlike other class functions.
2. Access: 
Static methods can also be accessed via class instances or objects.
3. VS Class mthd: 
Class methods are called via the Class containing it, rather than from an instance. Which is why a classmethod is defined as class_method(cls, param1, param2), which does not contain self, which means that it cannot be called using a class instance. Static methods can be called from both a class instance as well as from a Class.
4. VS Instance mthd:
Instance methods can only be called from a class instance, which is why any instance method is of the forminstance_method(self, param1, param2), where the self keyword signifies the class instance calling the method. @staticmethod can be called from both a class instance as well as from a Class.

REF: https://www.askpython.com/python/staticmethod-in-python

The partial function from functool:
1. What:
它是对原始函数的二次封装，是将现有函数的部分参数预先绑定为指定值，从而得到一个新的函数，该函数就称为偏函数。相比原函数，偏函数具有较少的可变参数，从而降低了函数调用的难度。
2. Formula:
偏函数名 = partial(func, * args, ** kwargs)
其中，func 指的是要封装的原函数，*args 和 ** kwargs 分别用于接收位置实参和关键字实参。

REF:http://c.biancheng.net/view/5674.html AND https://zhuanlan.zhihu.com/p/47124891

In [9]:
def prep_base_data(df,
                   mode='deploy',
                   timestamp_column='timestamp_utc',
                   upset_flag_config=None,
                   special_functions=None,
                   tags=None,
                   categorical_tags=None,
                   fpa_params=None,
                   debug=False,
                   modeling_params = None,
                   **kwargs):
    '''
    This function did the basic data processing;
    timestamp_column: the name of the time col in df;
    upset_flag_config-debug: config data obtainted from **kwargs;
    return:
    
    '''    
    # convert string dates to real dates
    if (timestamp_column in df.columns) and isinstance(df[timestamp_column].iloc[0], str):
        df[timestamp_column] = pd.to_datetime(df[timestamp_column], utc=True)

    # set the index
    if df.index.name != timestamp_column:
        df = df.set_index(timestamp_column)

    # get the flag_all / flag_filtered based on the upset config
    df_s = score_upset_subevent(df, upset_flag_config, remove_post_processing=True, remove_persistence=True)
    df = df_s.join(df, how='outer')
    # need to understand pre-processing from the above

    # handle the special functions
    # assumes the input data contains all the required columns (we checked
    # above)
    # computes the new functions of the appropriate columns
    # deletes any columns that were requred for the computation, but
    # only if they weren't in the original tag_names input

    tags_to_drop = []
    for s in special_functions:
        print("Adding column: {}".format(s))
        df[s] = spfunc.functions[s](df)
    tags_to_drop = []
    for s in special_functions:
        tspec = spfunc.tag_map[s]
        for t in tspec:
            if t not in tags:
                tags_to_drop.append(t)
    df = df.drop(columns=tags_to_drop)

    # hande conversions/recoding
    if 'conversions' in kwargs:
        if 'null_to_zero_tags' in kwargs['conversions']:
            for t in kwargs['conversions']['null_to_zero_tags']:
                if t in df.columns:
                    if debug:
                        print('Converting null to 0 for {}'.format(t))
                    df[t] = df[t].fillna(0)  # df.loc[df[t].isnull(), t] = 0

    if 'null_conversions' in kwargs:
        for t, v in kwargs['null_conversions']:
            if t in df.columns:
                if debug:
                    print('{}: converting null to {}'.format(t, v))
                df[t] = df[t].fillna(v)


    # handle categorical variables, for each we train an encoder and
    # save it for use during deployment
    if mode == 'train':
        trained_params['categorical_transformers'] = dict()
        for c in categorical_tags:
            ## check the datatype
            if not isinstance(X, pd.Series):
                raise ValueError("Data passed to one-hot encoding function is not a pandas.Series")

            ## transform into numpy values
            v = X.values
            colname = X.name

            ## train encoder
            encoder = OneHotEncoder(handle_unknown='ignore', sparse=False)
            mask = ~np.isnan(v)
            encoder.fit(v[mask].reshape(-1, 1))

            ## transform and change dtype
            v_enc = encoder.transform(v[mask].reshape(-1, 1))
            v_enc_final = np.zeros(shape=(mask.shape[0], v_enc.shape[1]))
            v_enc_final[mask] = v_enc
            
            ## rename and add new col
            onehot_nf = "{}_cat={}"
            names = [onehot_nf.format(colname, af) for af in encoder.categories_[0]]
            new_cols = pd.DataFrame(v_enc_final, index=index, columns=names)
            
            # drop the column
            df = df.drop(columns=[c])
            
            # add the new columns
            df = pd.concat([df, new_cols], axis=1)
    else:
        raise ValueError('Unknown mode: {}'.format(mode))

    return df

In [11]:
## find extra tags if any, named as tag_extended
tags_extended = copy.copy(modeling_params['tags'])
spfunc = SpecialFunctions()
for s in modeling_params['special_functions']:
    for t in spfunc.tag_map[s]:
        if t not in tags_extended:
            tags_extended.append(t)    

In [None]:
def fpa_sensors_and_target_preparation(self,
                                       df,
                                       mode='test',
                                       debug=False,
                                       fpa_params=None,
                                       timestamp_column='timestamp_utc',
                                       **kwargs):

    '''
        This function...
        df: data frame with indexed timestamp column with name given by timestamp_column;
        fpa_params: pre-defined dict to 
        

    '''
    # need to handle both millennium and steepbank

    flag_all = 'flag_all'
    flag_filtered = 'flag_filtered'
    flag_column = flag_all
    
    ## separate info in fpa_param and store in variable 
    if fpa_params is None:
        raise ValueError('Must supply appropriate FPA parameters')
        # Window size for rolling average during feature generation
    else:
        stages = fpa_params['stages']
        # Future prediction window size
        failure_detection_window = fpa_params['failure_detection_window']
        dead_period = fpa_params['dead_period']

    if debug:
        print('Mode: {}'.format(mode))
        print(df[flag_filtered].value_counts())
        # print(df.target_num.describe())
        print(df.columns.tolist())

    ## 0. Input dataframe
    # The created dataframe consists of all related senors (raw values) and variable to indicate upset events

    sensor_table = df.copy().reset_index()

    ### 1. Loading the Sensor Data

    # Create the appropriate sensor data frame to agree with FPA flow. The sensor dataframe should only consist of sensor readings and timestamp and asset identifier columns.

    # User needs to specify the asset_id column name is specified in the variable 'sensor_asset_id' and the date column name is specified in the variable 'sensor_date'. Finally, the format of the date needs to specified using [python's datetime format.](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior)
    # to do: don't forward fill the target_num_2

    if mode == 'train':
        if debug:
            print('* Forward filling missing values *')
        sensor_table = sensor_table.fillna(method='ffill')

    sensor_table['asset_id'] = 1

    # sensor_table.describe()
    # sensor_table.head(10)

    # input_manager = InputDataManager("object_storage", params=dict(credentials=credentials, filename="sensor_data.csv"))
    # sensor_table = input_manager.get_pandas_data_frame()

    sensor_asset_id = 'asset_id'
    sensor_date = timestamp_column
    sensor_date_format = '%Y-%m-%d %H:%M:%S'

    # Drop duplicates is problematic because of daylight savings time issue -- changed to UTC properly
    # sensor_table = sensor_table.drop_duplicates(subset=[sensor_asset_id, sensor_date])
    sensor_table[sensor_date] = pd.to_datetime(sensor_table[sensor_date], format=sensor_date_format)

    # # # # #
    # Create failure table and associated features
    failure_table = df[df[flag_filtered] == 1].reset_index()[[timestamp_column, flag_all]].copy()
    failure_table['asset_id'] = 1
    # rename columns to agree with FPA framework -- we have only one failure ID (1)
    failure_table.columns = [timestamp_column, 'failure_id', 'asset_id']

    failure_asset_id = 'asset_id'
    failure_date = timestamp_column
    failure_id = 'failure_id'
    failure_date_format = '%Y-%m-%d %H:%M:%S'

    failure_table[failure_date] = pd.to_datetime(failure_table[failure_date], format=failure_date_format)
    # for suncor all failures are the same type
    failure_table[failure_id] = 1

    failure_table = failure_table.drop_duplicates(subset=[failure_asset_id, failure_date])
    failure_table = failure_table.dropna()

    '''
    ### Define failure-specific features
    Define some features directly related to frequency and time since last failure event
    '''

    failure_feature = df.reset_index()[[timestamp_column, flag_column]].copy()
    failure_feature['asset_id'] = 1
    failure_feature['active_failure'] = failure_feature[flag_column].shift(1)
    failure_feature_names = ['active_failure']

    # there is a problem with the binning of failure intervals
    if False:
        failure_feature = df.reset_index()[[timestamp_column, flag_column]].copy()
        failure_feature['asset_id'] = 1
        failure_feature.columns = [timestamp_column, flag_column, 'asset_id']
        failure_feature[failure_date] = pd.to_datetime(failure_feature[failure_date], format=failure_date_format)
        failure_feature['active_failure'] = failure_feature[flag_column].shift(1)

        failure_feature['cumsum'] = failure_feature['active_failure'].cumsum()
        failure_feature['tmp'] = 1 - failure_feature['active_failure']
        failure_feature['time_since_failure'] = failure_feature.groupby(['cumsum'])['tmp'].cumsum()
        # failure_feature = failure_feature.drop(columns=['tmp', 'cumsum', 'Flag_all'])

        failure_interarrival = failure_feature.groupby(['cumsum'])['time_since_failure'].max()
        failure_interarrival_raw = failure_interarrival[failure_interarrival > 0].values.reshape(-1, 1)

        failure_feature_names = ['active_failure']

        if mode=='train':
            n_bins = 4
            kbd = KBinsDiscretizer(n_bins=n_bins, strategy='quantile', encode='onehot-dense')
            kbd.fit(failure_interarrival_raw)
            self.trained_params['time_since_failure_bin_model'] = kbd
        else:
            # assume pre-trained kbd in trained_params
            kbd = self.trained_params['time_since_failure_bin_model']

        n_bins = kbd.n_bins

        out = np.zeros(shape=(failure_feature.shape[0], n_bins))*np.NaN
        out[~failure_feature['time_since_failure'].isna(), ] = kbd.transform(
            failure_feature['time_since_failure'][~failure_feature['time_since_failure'].isna()].values.reshape(-1,1))

        for b in range(n_bins):
            label = "time_since_failure__bin_{}".format(b)
            failure_feature[label] = out[:, b]
            failure_feature_names.append(label)

    # merge features to create overall sensor table
    assert (sensor_table[timestamp_column] == failure_feature[timestamp_column]).all(), "Mismatch in timestamp column"
    sensor_table = pd.concat([sensor_table, failure_feature[failure_feature_names]], axis=1)

    # # # # #
    # Final feature generation on the sensors tables
    sensor_variables = sensor_table.columns.tolist()
    sensor_variables.remove(timestamp_column)
    sensor_variables.remove('asset_id')
    sensor_variables.remove(flag_all)
    sensor_variables.remove(flag_filtered)

    # create list of all categorical columns, using expanded names
    categorical_columns = ['active_failure']
    for x in self.trained_params['categorical_transformers'].values():
        categorical_columns.extend(x.colnames)

    # remove categorical from list of all sensors
    for c in categorical_columns:
        sensor_variables.remove(c)

    stage_sensor_table = sensor_table.copy()
    sensor_date = 'datetime'
    stage_sensor_table = stage_sensor_table.rename(columns={timestamp_column: 'datetime'})

    # sensor to add at end -- these are only available in training mode
    sensors_to_add = stage_sensor_table[[sensor_date, sensor_asset_id, flag_all, flag_filtered]].copy()

    # split categorical and numeric dataframes
    if len(categorical_columns) > 0:
        stage_sensor_table_num = stage_sensor_table.drop(columns=categorical_columns)
    else:
        stage_sensor_table_num = stage_sensor_table.copy()

    if len(sensor_variables) > 0:
        stage_sensor_table_cat = stage_sensor_table.drop(columns=sensor_variables)
    else:
        stage_sensor_table_cat = stage_sensor_table.copy()
    stage_variables_num = copy.copy(sensor_variables)
    stage_variables_cat = copy.copy(categorical_columns)

    if debug:
        print('Numerical variables:')
        print(stage_variables_num)

        print('Categorical variables:')
        print(stage_variables_cat)

    for stage in stages:
        features_num = []
        features_cat = []
        tags = stage['tags']
        if tags not in ['all', 'numeric', 'categorical']:
            raise ValueError('Invalid tags processing directive in FPA configuration')

        for hws, agg_simple, agg_adv in zip(stage['window_sizes'],
                                            stage['simple_functions'],
                                            stage['advanced_functions']):

            if debug:
                print("Generating features: {}".format(hws))
                print("Simple aggregtions list: ", agg_simple)
                print("Advanced aggregtions list: ", agg_adv)

            if len(stage_variables_num) > 0 and tags in ['all', 'numeric']:
                if debug:
                    print("Processing {} numeric variables...".format(len(stage_variables_num)))

                features_num_t = process_stage_nosrom(
                    agg_simple, agg_adv, stage_sensor_table_num, hws,
                    stage_variables_num, sensor_asset_id, sensor_date,
                    sensor_date_format, debug=debug)
                features_num.extend(features_num_t)

            if len(stage_variables_cat) > 0 and tags in ['all', 'categorical']:
                if debug:
                    print("Processing {} categorical variables...".format(len(stage_variables_cat)))

                features_cat_t = process_stage_nosrom(
                    agg_simple, agg_adv, stage_sensor_table_cat, hws,
                    stage_variables_cat, sensor_asset_id, sensor_date,
                    sensor_date_format, debug=debug)
                features_cat.extend(features_cat_t)

        # numeric feature merge
        if len(features_num) > 0:
            stage_sensor_table_num = merge_features(features_num, stage_variables_num, sensor_asset_id, sensor_date)
            stage_variables_num = stage_sensor_table_num.columns.tolist()
            if debug:
                print(stage_variables_num)
            stage_variables_num.remove(sensor_asset_id)
            stage_variables_num.remove(sensor_date)

        # categorical feature merge
        if len(features_cat) > 0:
            stage_sensor_table_cat = merge_features(features_cat, stage_variables_cat, sensor_asset_id, sensor_date)
            stage_variables_cat = stage_sensor_table_cat.columns.tolist()
            if debug:
                print(stage_variables_cat)
            stage_variables_cat.remove(sensor_asset_id)
            stage_variables_cat.remove(sensor_date)

    # final merge
    sensor_features_all = pd.merge(stage_sensor_table_num,
                                   stage_sensor_table_cat,
                                   on=[sensor_asset_id, sensor_date],
                                   how='left')
    sensor_features_all = pd.merge(sensor_features_all,
                                   sensors_to_add,
                                   on=[sensor_asset_id, sensor_date],
                                   how='left')

    if mode == 'train':
        # # # # #
        # Generate failure keys and failure targets
        # only needed if we are training a new model

        failure_keys = sensor_features_all[['asset_id', 'datetime']].copy()
        failure_keys.shape

        # Failure window starts before dead period, so failure window runs from:
        # t=-(failure_detection_window+dead_period) to t=-dead_period
        # assuming failure is at time t=0
        #
        # the target_label in the dead period is assigned a value of -1

        failure_target_table = generate_failure_targets_new(failure_table,
                                                        failure_keys,
                                                        failure_detection_window,
                                                        failure_asset_id,
                                                        failure_date,
                                                        failure_id,
                                                        dead_period=dead_period)

        # Note outer join, this is equivalent of a union in set operations
        # Where there are no matching keys, you get NaN values
        FPA_table = pd.merge(failure_target_table, sensor_features_all, on=['asset_id', 'datetime'], how='outer')
        # let's look at some values where we have flagged a failure

        final_columns = FPA_table.columns.tolist()
        final_columns.remove('asset_id')
        final_columns.remove('datetime')
        final_columns.remove('target_label')
        for x in sensors_to_add.columns:
            if x in final_columns:
                final_columns.remove(x)
        self.trained_params['final_columns'] = final_columns
        if debug:
            print(FPA_table.head())
            print(FPA_table.target_label.value_counts())
            print(FPA_table.shape)

        # need to also return column list, and any other parameters to save the model json
        # no -- leave it to the notebook
        return FPA_table
    elif mode == 'deploy':
        # in deploy mode we return a matrix and we further insure order of columns
        final_columns = self.trained_params['final_columns']

        missing_cols = (set(final_columns) - set(sensor_features_all.columns.tolist()))
        if len(missing_cols) > 0:
            raise RuntimeError('Columns produced in feature engineering do not match expected, missing: {}'.format(" ".join(list(missing_cols))))

        sensor_features_all = sensor_features_all.loc[:, final_columns]

        idx_null = (sensor_features_all.isnull().sum(axis=1) > 0)

        if sensor_features_all.iloc[-1,:].isnull().sum() > 0:
            raise RuntimeError('Most recent row of output has at least one null value')
        if idx_null.sum() > 0:
            print('Dropping {} row(s) with at least one null value'.format(idx_null.sum()))
            sensor_features_all.dropna(inplace=True)

        return sensor_features_all
    else:
        raise ValueError('Unknown mode: {}'.format(mode))