# Test notebook for tunable model. Tests for both classification and regression using Keras and Tensorflow

We first import the necessary packages

In [1]:
import numpy as np
import math
import tensorflow as tf
import sys

sys.path.append('/Users/davidlaredorazo/Documents/University_of_California/Research/Projects')

from sklearn.preprocessing import StandardScaler, MinMaxScaler

import keras
from keras.models import Sequential, Model
from keras.layers import Dense, Input, Dropout, Reshape, Conv2D, Flatten, MaxPooling2D, LSTM
from keras.optimizers import Adam, SGD
from keras.callbacks import LearningRateScheduler
from keras import backend as K
from keras import regularizers

from numpy.random import seed
from tensorflow import set_random_seed

#Use the data handler to manage all the data preprocessing.
from ann_framework.data_handlers.data_handler_CMAPSS import CMAPSSDataHandler
from ann_framework.data_handlers.data_handler_MNIST import MNISTDataHandler

#Import the tunable model classes
from ann_framework.tunable_model.tunable_model import SequenceTunableModelRegression, SequenceTunableModelClassification

#Import aux functions
import ann_framework.aux_functions as aux_functions


l2_lambda_regularization = 0.20

Using TensorFlow backend.


### Define keras example models

In [2]:
def keras_test_model_regression(input_shape, output_size=1):
    #Create a sequential model
    model = Sequential()
    
    #Add the layers for the model
    model.add(Dense(20, input_dim=input_shape, activation='relu', kernel_initializer='glorot_normal',
                    kernel_regularizer=regularizers.l2(l2_lambda_regularization), name='fc1'))
    model.add(Dense(output_size, activation='linear', kernel_initializer='glorot_normal', 
                    kernel_regularizer=regularizers.l2(l2_lambda_regularization), name='out'))
    
    return model


def keras_test_model_classification(input_shape, output_size=1):
    #Create a sequential model
    model = Sequential()
    
    #Add the layers for the model
    model.add(Dense(80, input_dim=input_shape, activation='tanh', kernel_initializer='glorot_normal', name='fc1'))
    model.add(Dense(output_size, activation='softmax', kernel_initializer='glorot_normal', name='out'))
    
    return model


def get_keras_compiled_model(input_shape, model_type, output_size=1):
    
    K.clear_session()

    #Shared parameters for the models
    optimizer = Adam(lr=0.001, beta_1=0.5)
    
    if model_type == "regression":
        
        #Regression model
        lossFunction = "mean_squared_error"
        metrics = ["mse"]
        model = keras_test_model_regression(input_shape, output_size)
    else:
    
        #Classification model
        lossFunction = "categorical_crossentropy"
        metrics = ["accuracy"]
        model = keras_test_model_classification(input_shape, output_size)
    
    model.compile(optimizer = optimizer, loss = lossFunction, metrics = metrics)

    return model

### Define tensorflow example models

In [None]:
def create_placeholders(input_shape, output_shape):
    
    X = tf.placeholder(tf.float32, shape=(None,input_shape), name="X")
    y = tf.placeholder(tf.float32, shape=None, name="y")
    
    return X, y


def tf_model_classification(X, output_size=1):
    
    A1 = tf.layers.dense(X, 80, activation=tf.nn.tanh, 
                         kernel_initializer=tf.contrib.layers.xavier_initializer(uniform=False), name="fc1")
    logits = tf.layers.dense(A1, output_size, activation=None, 
                         kernel_initializer=tf.contrib.layers.xavier_initializer(uniform=False), name="fc2")
    y = tf.nn.softmax(logits)
    
    return logits, y 


def tf_model_regression(X, output_size=1):
    
    A1 = tf.layers.dense(X, 20, activation=tf.nn.relu, 
                         kernel_initializer=tf.contrib.layers.xavier_initializer(uniform=False), 
                         kernel_regularizer=tf.contrib.layers.l2_regularizer(l2_lambda_regularization), name="fc1")
    y = tf.layers.dense(A1, output_size, activation=None, kernel_initializer=tf.contrib.layers.xavier_initializer(uniform=False),
                        kernel_regularizer=tf.contrib.layers.l2_regularizer(l2_lambda_regularization), name="out")
    
    return y


def get_tf_compiled_model(num_features, model_type, output_size=1):
    
    tf.reset_default_graph()
    
    X, y = create_placeholders(num_features, output_size)
    
    if model_type == "regression":
        
        #Regression model
        y_pred = tf_model_regression(X, output_size)
        cost = tf.losses.mean_squared_error(y_pred, y)
    else:
    
        #Classification model
        logits, y_pred = tf_model_classification(X, output_size)
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=y, logits=logits))
    
    reg_cost = tf.losses.get_regularization_loss()
    total_cost = cost + reg_cost

    optimizer = tf.train.AdamOptimizer(learning_rate=0.001, beta1=0.5).minimize(total_cost)
    
    return {'X_placeholder':X, 'y_placeholder':y, 'y_pred':y_pred, 'cost':cost, 'total_cost':total_cost, 'optimizer':optimizer}

## Create data handlers

In [3]:
"""Load CMAPSS data handler"""

#Selected as per CNN paper
features = ['T2', 'T24', 'T30', 'T50', 'P2', 'P15', 'P30', 'Nf', 'Nc', 'epr', 'Ps30', 'phi', 'NRf', 'NRc', 
                     'BPR', 'farB', 'htBleed', 'Nf_dmd', 'PCNfR_dmd', 'W31', 'W32']
selected_indices = np.array([2, 3, 4, 7, 8, 9, 11, 12, 13, 14, 15, 17, 20, 21])
selected_features = list(features[i] for i in selected_indices-1)
data_folder = '../../NASA_RUL_(CMAPS)/CMAPSSData'

window_size = 30
window_stride = 1
max_rul = 125

dHandler_cmaps = CMAPSSDataHandler(data_folder, 1, selected_features, max_rul, window_size, window_stride)
min_max_scaler = MinMaxScaler()

"""Load MNIST data handler"""
dHandler_mnist = MNISTDataHandler()

## Test TunableModel with regression

### Test TunableModel Keras

In [4]:
num_features = len(selected_features)
shape = len(selected_features)*window_size

#Get keras model
model = get_keras_compiled_model(shape, "regression", output_size=1)


tModel = SequenceTunableModelRegression('ModelRUL_Keras', model, lib_type='keras', data_handler=dHandler_cmaps)

tModel.data_handler.data_scaler = None
tModel.data_scaler = min_max_scaler

In [16]:
tModel.load_data(unroll=True, verbose=1, cross_validation_ratio=0.2, reload_data=False)
tModel.print_data()

#Train the model
tModel.epochs = 10
tModel.train_model(verbose=1)
tModel.evaluate_model(['rhs', 'rmse'], round=2)
print("\nTest Scores")
print(tModel.scores)

Using previously loaded data
Printing shapes

Training data (X, y)
(14400, 420)
(14400, 1)
Cross-Validation data (X, y)
(20, 420)
(20, 1)
Testing data (X, y)
(100, 420)
(100, 1)
Printing first 5 elements

Training data (X, y)
[[0.22299652 0.33979294 0.1771978  ... 0.2        0.82051282 0.71623349]
 [0.19860627 0.42500664 0.21350078 ... 0.2        0.88034188 0.74023576]
 [0.10452962 0.45872047 0.32790424 ... 0.1        0.70940171 0.71921602]
 [0.14982578 0.3480223  0.27138932 ... 0.3        0.74358974 0.78000284]
 [0.16724739 0.21210512 0.39776295 ... 0.1        0.62393162 0.7994603 ]]
[[125.]
 [125.]
 [125.]
 [125.]
 [125.]]
Cross-Validation data (X, y)
[[0.49477352 0.73161667 0.75706436 ... 0.8        0.17094017 0.38488851]
 [0.45644599 0.54977436 0.57339089 ... 0.3        0.54700855 0.36940775]
 [0.51567944 0.54473055 0.66365777 ... 0.8        0.31623932 0.34995029]
 [0.31010453 0.54207592 0.44525118 ... 0.4        0.70940171 0.75770487]
 [0.41114983 0.55428723 0.43524333 ... 0.3    

### Test TunableModel Tensorflow

In [None]:
num_features = len(selected_features)
shape = len(selected_features)*window_size

#Get tensorflow model
model = get_tf_compiled_model(shape, "regression", output_size=1)


tModel = SequenceTunableModelRegression('ModelRUL_tensorflow', model, lib_type='tensorflow', data_handler=dHandler_cmaps)

tModel.data_handler.data_scaler = None
tModel.data_scaler = min_max_scaler

tModel.load_data(unroll=True, verbose=1, cross_validation_ratio=0)
tModel.print_data()

sess = tf.Session()

#Train the model
tModel.epochs = 10
tModel.train_model(tf_session=sess, get_minibatches_function_handle=aux_functions.get_minibatches, verbose=1)
tModel.evaluate_model(['rhs', 'rmse'], tf_session=sess, round=2)
print("\nTest Scores")
print(tModel.scores)

## Test TunableModel with classification

### Test TunableModel Keras

In [17]:
input_dim = 784

#Get keras model
model = get_keras_compiled_model(input_dim, "classification", output_size=10)


tModel = SequenceTunableModelClassification('ModelMNIST_Keras', model, lib_type='keras', data_handler=dHandler_mnist)

#tModel.data_handler.data_scaler = None
#tModel.data_scaler = min_max_scaler

In [19]:
tModel.load_data(unroll=True, verbose=1, cross_validation_ratio=0, reload_data=False)
tModel.print_data()

#Train the model
tModel.epochs = 10
tModel.train_model(verbose=1)
tModel.evaluate_model()
print("\nTest Scores")
print(tModel.scores)

Using previously loaded data
Printing shapes

Training data (X, y)
(60000, 784)
(60000, 10)
Testing data (X, y)
(10000, 784)
(10000, 10)
Printing first 5 elements

Training data (X, y)
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]
Testing data (X, y)
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]]
training without cv
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10

Test Scores
{'loss': 0.09461064505837857, 'score_1': 0.9706}


### Test TunableModel Tensorflow

In [None]:
input_dim = 784

#Get tensorflow model
model = get_tf_compiled_model(input_dim, "classification", output_size=10)


tModel = SequenceTunableModelClassification('ModelMNIST_Tensorflow', model, lib_type='tensorflow', data_handler=dHandler_mnist)

tModel.data_handler.data_scaler = None
tModel.data_scaler = min_max_scaler

tModel.load_data(unroll=True, verbose=1, cross_validation_ratio=0)
tModel.print_data()

sess = tf.Session()

#Train the model
tModel.epochs = 10
tModel.train_model(tf_session=sess, get_minibatches_function_handle=aux_functions.get_minibatches, verbose=1)
tModel.evaluate_model(tf_session=sess, metrics=['accuracy'])
print("\nTest Scores")
print(tModel.scores)