In [1]:
'''
Created on 30 July 2019 for Dish IOT demo

Regression models: How many more cycles an in-service device will last before it fails?

'''
import pandas as pd
import numpy as np
import matplotlib
# matplotlib.use("Agg")
import matplotlib.pyplot as plt
import os
from sklearn import preprocessing

# Setting seed for reproducibility - do this before importing keras.models.Sequential
np.random.seed(20190801)
PYTHONHASHSEED = 8012019
import keras
import keras.backend as K
from keras.layers.core import Activation
from keras.models import Sequential,load_model
from keras.layers import Dense, Dropout, LSTM




# define path to save model
# model_path = '../Output/regression_model.h5'


In [2]:
# Mount the data location
dbutils.fs.unmount("/mnt/t3diiotdatastorage")

In [3]:
dbutils.fs.mount(
  source = "wasbs://production@t3diiotdatastorage.blob.core.windows.net",
  mount_point = "/mnt/t3diiotdatastorage",
  extra_configs = {"fs.azure.account.key.t3diiotdatastorage.blob.core.windows.net":dbutils.secrets.get(scope = "azure-key-vault-secrets", key = "processedtraindataaccountkey")})

In [4]:
dbutils.fs.ls("/mnt/t3diiotdatastorage")

In [6]:
# spark_df = spark.read.csv("/mnt/t3diiotdatastorage/Device Data with Timestamps.csv", inferSchema=True, header=True)
spark_df = spark.read.csv("/mnt/t3diiotdatastorage/predict-transform-output.csv", inferSchema=True, header=True)

In [8]:
##################################
# Data Ingestion
##################################

# train_df = pd.read_csv('../Dataset/predict-transform-output.csv')
df = spark_df.toPandas()
df.columns = ['unit_number', 'device_timestamp', 'latency', 'voltage', 'availBw', 'vlanId', 'erroredSeconds',
                     'cpu', 'unique_sequence', 'rul']
df = train_df.sort_values(['unit_number','unique_sequence'])
df.head()

In [9]:
# 
from sklearn.model_selection import KFold
k = 5
kf = KFold(n_splits=k, random_state=201809, shuffle=True)
unit_number = df.unit_number.unique()
kf.get_n_splits(unit_number)

fold_indices = []
for train_indices, test_indices in kf.split(unit_number):
    fold_indices.append((train_indices, test_indices))

# just build for one fold first; then we can put in a loop
train_ids = unit_number[fold_indices[0][0]]
test_ids = unit_number[fold_indices[0][1]]

train_df = df[df['unit_number'].isin(train_ids)]
test_df = df[df['unit_number'].isin(test_ids)]

train_df = train_df.sort_values(['unit_number', 'unique_sequence'])
test_df = test_df.sort_values(['unit_number', 'unique_sequence'])

In [10]:
# Lets check number of unique devices - Train set
print(train_df.shape)
print(len(train_df['unit_number'].unique()))
print(train_df['unit_number'].unique())
print(train_df.head())

# Lets check number of unique devices - Test set
print(test_df.shape)
print(len(test_df['unit_number'].unique()))
print(test_df['unit_number'].unique())
print(test_df.head())

In [11]:
##################################
# Data Preprocessing
##################################

#######
# TRAIN
#######

# generate label columns for training data just in case we need to create binary classification model
# answer the question: is a specific engine going to fail within w1 cycles?
w1 = 30
w0 = 15
train_df['label1'] = np.where(train_df['rul'] <= w1, 1, 0 )
train_df['label2'] = train_df['label1']
train_df.loc[train_df['rul'] <= w0, 'label2'] = 2
train_df.head()

In [12]:
# generate label columns w0 and w1 for test data
test_df['label1'] = np.where(test_df['rul'] <= w1, 1, 0 )
test_df['label2'] = test_df['label1']
test_df.loc[test_df['rul'] <= w0, 'label2'] = 2

In [13]:
# MinMax normalization (from 0 to 1)
train_df['unique_sequence_norm'] = train_df['unique_sequence']
cols_normalize = train_df.columns.difference(['unit_number', 'device_timestamp', 'unique_sequence', 'rul','label1','label2'])

min_max_scaler = preprocessing.MinMaxScaler()
norm_train_df = pd.DataFrame(min_max_scaler.fit_transform(train_df[cols_normalize]), 
                             columns=cols_normalize, 
                             index=train_df.index)
join_df = train_df[train_df.columns.difference(cols_normalize)].join(norm_train_df)
train_df = join_df.reindex(columns = train_df.columns)
train_df.head()

In [14]:
# MinMax normalization (from 0 to 1) for test data
test_df['unique_sequence_norm'] = test_df['unique_sequence']
norm_test_df = pd.DataFrame(min_max_scaler.transform(test_df[cols_normalize]), 
                            columns=cols_normalize, 
                            index=test_df.index)
test_join_df = test_df[test_df.columns.difference(cols_normalize)].join(norm_test_df)
test_df = test_join_df.reindex(columns = test_df.columns)
test_df = test_df.reset_index(drop=True)
test_df.head()

In [15]:
# pick a large window size of 100 cycles
sequence_length = 100

# function to reshape features into (samples, time steps, features) 
def gen_sequence(id_df, seq_length, seq_cols):
    """ Only sequences that meet the window-length are considered, no padding is used. This means for testing
    we need to drop those which are below the window-length. An alternative would be to pad sequences so that
    we can use shorter ones """
    # for one id I put all the rows in a single matrix
    data_matrix = id_df[seq_cols].values
    num_elements = data_matrix.shape[0]
    # Iterate over two lists in parallel.
    # For example id1 have 192 rows and sequence_length is equal to 50
    # so zip iterate over two following list of numbers (0,112),(50,192)
    # 0 50 -> from row 0 to row 50
    # 1 51 -> from row 1 to row 51
    # 2 52 -> from row 2 to row 52
    # ...
    # 111 191 -> from row 111 to 191
    for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
        yield data_matrix[start:stop, :]

In [16]:
# pick the feature columns 
sequence_cols = ['latency', 'voltage', 'availBw', 'vlanId', 'erroredSeconds', 'cpu', 'unique_sequence_norm']
sequence_cols

In [17]:
# TODO for debug 
# val is a list of 192 - 50 = 142 bi-dimensional array (50 rows x 25 columns)
# val=list(gen_sequence(train_df[train_df['unit_number']==1], sequence_length, sequence_cols))
# print(len(val))

# generator for the sequences
# transform each id of the train dataset in a sequence
seq_gen = (list(gen_sequence(train_df[train_df['unit_number']==id], sequence_length, sequence_cols)) 
           for id in train_df['unit_number'].unique())

# generate sequences and convert to numpy array
seq_array = np.concatenate(list(seq_gen)).astype(np.float32)
print(seq_array.shape)

In [18]:
# function to generate labels
def gen_labels(id_df, seq_length, label):
    """ Only sequences that meet the window-length are considered, no padding is used. This means for testing
    we need to drop those which are below the window-length. An alternative would be to pad sequences so that
    we can use shorter ones """
    # For one id I put all the labels in a single matrix.
    # For example:
    # [[1]
    # [4]
    # [1]
    # [5]
    # [9]
    # ...
    # [200]] 
    data_matrix = id_df[label].values
    num_elements = data_matrix.shape[0]
    # I have to remove the first seq_length labels
    # because for one id the first sequence of seq_length size have as target
    # the last label (the previus ones are discarded).
    # All the next id's sequences will have associated step by step one label as target.
    return data_matrix[seq_length:num_elements, :]

# generate labels
label_gen = [gen_labels(train_df[train_df['unit_number']==id], sequence_length, ['rul']) 
             for id in train_df['unit_number'].unique()]

label_array = np.concatenate(label_gen).astype(np.float32)
label_array.shape

In [19]:
# generator for the sequences in test set
# transform each id of the test dataset in a sequence
seq_gen_test = (list(gen_sequence(test_df[test_df['unit_number']==id], sequence_length, sequence_cols)) 
           for id in test_df['unit_number'].unique())

# generate sequences and convert to numpy array
seq_array_test = np.concatenate(list(seq_gen_test)).astype(np.float32)
print(seq_array_test.shape)
# generate labels
label_gen_test = [gen_labels(test_df[test_df['unit_number']==id], sequence_length, ['rul']) 
             for id in test_df['unit_number'].unique()]

label_array_test = np.concatenate(label_gen_test).astype(np.float32)
print(label_array_test.shape)

In [20]:
def r2_keras(y_true, y_pred):
    """Coefficient of Determination 
    """
    SS_res =  K.sum(K.square( y_true - y_pred ))
    SS_tot = K.sum(K.square( y_true - K.mean(y_true) ) )
    return ( 1 - SS_res/(SS_tot + K.epsilon()) )

In [21]:
##################################
# Modeling
##################################

# Next, we build a deep network. 
# The first layer is an LSTM layer with 100 units followed by another LSTM layer with 50 units. 
# Dropout is also applied after each LSTM layer to control overfitting. 
# Final layer is a Dense output layer with single unit and linear activation since this is a regression problem.
nb_features = seq_array.shape[2]
nb_out = label_array.shape[1]

model = Sequential()
model.add(LSTM(
         input_shape=(sequence_length, nb_features),
         units=100,
         return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(
          units=50,
          return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(units=nb_out))
model.add(Activation("linear"))
model.compile(loss='mean_squared_error', optimizer='rmsprop',metrics=['mae',r2_keras])

print(model.summary())

# fit the network
# history = model.fit(seq_array, label_array, epochs=100, batch_size=100, validation_split=0.1, verbose=2,
#           callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='min'),
#                        keras.callbacks.ModelCheckpoint(model_path,monitor='val_loss', save_best_only=True, mode='min', verbose=0)]
#           )

# history = model.fit(seq_array, label_array, epochs=100, batch_size=100, validation_split=0.1, verbose=2,
#           callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='min')]
#           )

history = model.fit(seq_array, label_array, epochs=100, batch_size=100, validation_data=(seq_array_test, label_array_test), verbose=2,
          callbacks = [keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='min')]
          )

# list all data in history
print(history.history.keys())

In [22]:
# define path to save model
model_path = "/mnt/t3diiotdatastorage/rul_model.h5"
model_path =  
dbutils.fs.ls("/FileStore/tables/rul_model.h5")
model.save(model_path)

In [23]:
model_path = "rul_model.h5"
model.save(model_path)

In [24]:
dbutils.fs.ls("/mnt/t3diiotdatastorage")

In [25]:
# training metrics
train_scores = model.evaluate(seq_array, label_array, verbose=1, batch_size=200)

In [26]:
# training metrics
print('\nTrain MAE: {}'.format(train_scores[1]))
print('\nTrain R^2: {}'.format(train_scores[2]))

In [27]:
# testing metrics
test_scores = model.evaluate(seq_array_test, label_array_test, verbose=1, batch_size=200)

In [28]:
# testing metrics
print('\nTest MAE: {}'.format(test_scores[1]))
print('\nTest R^2: {}'.format(test_scores[2]))

In [29]:
dbutils.fs.ls("/mnt/t3diiotdatastorage")

In [30]:
dbutils.fs.ls("/FileStore/tables/")