In [39]:
%load_ext autoreload
%autoreload 2

import datetime
import joblib
import json
import keras
import numpy as np
import operator
import os
import pandas as pd
from es_pandas import es_pandas
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
from keras.preprocessing.sequence import TimeseriesGenerator
from sklearn.preprocessing import MinMaxScaler

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [45]:
MODEL_NAME = '20210315-6m1m-conn-ssh'
MODEL_CFG = {
    'sequence_length': 12,
    'prediction_steps': 1,
    'transformation': 'diff',
    'differential': True,
    'data': {
        'in': {
            'index':['ts'],
            'columns':[
                'conn_count_uid_in',
                'ssh_count_uid_in'
            ]
        },
        'out': [
            'conn_count_uid_in',
            'ssh_count_uid_in'
        ]
    }
}
MODEL_FILE = 'mods_models/models/20210315-6m1m-conn-ssh/data_train-seq-12.h5'
SCALER_FILE = 'mods_models/models/20210315-6m1m-conn-ssh/data_train-seq-12.h5.minmaxscaler'
MODEL,SCALER = load_model(MODEL_FILE, SCALER_FILE)

In [46]:
transformations = {
    'diff': {
        'f': operator.sub,
        'f-1': operator.add
    },
    'perc': {
        'f': operator.truediv,
        'f-1': operator.mul
    },
    'log': {
        'f': lambda x,y: np.log(x)-np.log(y),
        'f-1': lambda x,y: np.exp(y)*x,
    }
}


def transform(df, k, t=transformations['diff']):
    if isinstance(df, pd.DataFrame):
        return t['f'](df[k:],df[:-k].values)
    else:
        return t['f'](df[k:],df[:-k])


def inverse_transform(pred, prev, t=transformations['diff']):
    if isinstance(df, pd.DataFrame):
        return t['f-1'](prev.values, pred)
    else:
        return t['f-1'](prev, pred)


def normalize(df, scaler, fit=False, features=None):
    fn = scaler.fit_transform if fit else scaler.transform
    if isinstance(df, pd.DataFrame):
        if features is not None:
            df[features] = fn(df[features])
            return df
        else:
            index = df.index
            columns = df.columns
            df = pd.DataFrame(fn(df), index=index)
            df = df.set_axis(columns, axis=1)
            return fn(df)
    else:
        return fn(df)


def inverse_normalize(df, scaler, features):
    fn = scaler.inverse_transform
    if isinstance(df, pd.DataFrame):
        if features is not None:
            df[features] = fn(df[features].values)
            return df
        else:
            index = df.index
            columns = df.columns
            df = pd.DataFrame(fn(df), index=index)
            df = df.set_axis(columns, axis=1)
            return fn(df)
    else:
        return fn(df.values)


def load_model(model_file, scaler_file):
    model = keras.models.load_model(model_file)
    scaler = joblib.load(scaler_file)
    return model, scaler

In [47]:
features = MODEL_CFG['data']['in']['columns']
features_predicted = MODEL_CFG['data']['out']
context_length = MODEL_CFG['sequence_length'] + (MODEL_CFG['prediction_steps'] if MODEL_CFG['differential'] else 0)

print('features: %s' % features)
print('features_predicted: %s' % features_predicted)
print('context_length: %s' % context_length)
print('sequence_length: %s' % MODEL_CFG['sequence_length'])

features: ['conn_count_uid_in', 'ssh_count_uid_in']
features_predicted: ['conn_count_uid_in', 'ssh_count_uid_in']
context_length: 13
sequence_length: 12


In [80]:
df = pd.DataFrame(
    [
        ['2021-03-15 13:10:00', 17646.0, 1545.0],
        ['2021-03-15 13:20:00', 18395.0, 2701.0],
        ['2021-03-15 13:30:00', 18695.0, 2226.0],
        ['2021-03-15 13:40:00', 17120.0, 539.0],
        ['2021-03-15 13:50:00', 24205.0, 633.0],
        ['2021-03-15 14:00:00', 16657.0, 455.0],
        ['2021-03-15 14:10:00', 16324.0, 565.0],
        ['2021-03-15 14:20:00', 16369.0, 592.0],
        ['2021-03-15 14:30:00', 17659.0, 567.0],
        ['2021-03-15 14:40:00', 17099.0, 585.0],
        ['2021-03-15 14:50:00', 15265.0, 652.0],
        ['2021-03-15 15:00:00', 15450.0, 535.0],
        ['2021-03-15 15:10:00', 15343.0, 544.0]
    ],
    columns=[
        'ts',
        'conn_count_uid_in',
        'ssh_count_uid_in'
    ]
)
df['ts'] = pd.to_datetime(df['ts'])
df = df.set_index('ts')
display(df)

Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-15 13:10:00,17646.0,1545.0
2021-03-15 13:20:00,18395.0,2701.0
2021-03-15 13:30:00,18695.0,2226.0
2021-03-15 13:40:00,17120.0,539.0
2021-03-15 13:50:00,24205.0,633.0
2021-03-15 14:00:00,16657.0,455.0
2021-03-15 14:10:00,16324.0,565.0
2021-03-15 14:20:00,16369.0,592.0
2021-03-15 14:30:00,17659.0,567.0
2021-03-15 14:40:00,17099.0,585.0


In [81]:
t = transformations[MODEL_CFG['transformation']]
df_x = df[1:].copy(deep=True)
df_x[:] = transform(df.to_numpy(), MODEL_CFG['prediction_steps'], t=t)
display(df_x)

Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-15 13:20:00,749.0,1156.0
2021-03-15 13:30:00,300.0,-475.0
2021-03-15 13:40:00,-1575.0,-1687.0
2021-03-15 13:50:00,7085.0,94.0
2021-03-15 14:00:00,-7548.0,-178.0
2021-03-15 14:10:00,-333.0,110.0
2021-03-15 14:20:00,45.0,27.0
2021-03-15 14:30:00,1290.0,-25.0
2021-03-15 14:40:00,-560.0,18.0
2021-03-15 14:50:00,-1834.0,67.0


In [82]:
df_x[:] = normalize(df_x.to_numpy(), SCALER)
display(df_x)

Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-15 13:20:00,0.486044,0.509928
2021-03-15 13:30:00,0.485772,0.496206
2021-03-15 13:40:00,0.484637,0.486009
2021-03-15 13:50:00,0.489881,0.500993
2021-03-15 14:00:00,0.48102,0.498704
2021-03-15 14:10:00,0.485389,0.501127
2021-03-15 14:20:00,0.485618,0.500429
2021-03-15 14:30:00,0.486372,0.499992
2021-03-15 14:40:00,0.485252,0.500353
2021-03-15 14:50:00,0.48448,0.500766


In [87]:
pred = MODEL.predict(df_x.to_numpy()[np.newaxis])
display(pred)

UnknownError:    Fail to find the dnn implementation.
	 [[{{node CudnnRNN}}]]
	 [[model/lstm/PartitionedCall]] [Op:__inference_predict_function_5109]

Function call stack:
predict_function -> predict_function -> predict_function
