In [1]:
import tensorflow as tf
import tensorboard
import tensorflow.keras as K
import numpy as np
import pandas as pd
from tensorflow.keras import backend
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras import activations
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.regularizers import l1
from tensorflow.keras.models import model_from_json
import scipy.io
import h5py as h5py

In [None]:
# Custom Metrics [Can be changed]
import tensorflow.keras.backend as K

# Percentage error: AMPE for one output
def AMPE(y_true, y_pred):  
    return 100*K.abs(K.mean((y_pred-y_true)/(1+y_true)))
    #return AME(y_true, y_pred)

# Percentage error: AMPE_all for all outputs
# y= [.1y_12, .2y_18, .3y_24, .4y_30, .5y_36]
def AMPE_all(y_true, y_pred):
    loss = AMPE(y_true[:,0],y_pred[:,0])
    for i in np.arange(y_pred.shape[1]):
        loss = loss + AMPE(y_true[:,i],y_pred[:,i])
    return loss

# Weighted MSE
def WMSE(y_true, y_pred):
    loss = K.mean(K.square((y_pred[:,0]-y_true[:,0])))
    for i in np.arange(y_pred.shape[1]):
        loss = loss + (i+1)*K.mean(K.square((y_pred[:,i]-y_true[:,i])))
    return loss

# Centered Percentage error: CMAPE
def CMAPE(y_true, y_pred):
    return K.sqrt(WMSE(y_true,y_pred)) + .1*AMPE_all(y_true,y_pred)

# RMSE at 36
def RMSE_36(y_true, y_pred):
    return K.sqrt(K.mean(K.square((y_pred[:,7]-y_true[:,7]))))

# AMPE at 36
def AMPE_36(y_true, y_pred):
    return AMPE(y_true[:,4],y_pred[:,7])


# Weighted MAE
def WMAE(y_true, y_pred):
    loss = K.mean(K.abs((y_pred[:,0]-y_true[:,0])))
    for i in np.arange(y_pred.shape[1]):
        loss = loss + (i+1)*K.mean(K.square((y_pred[:,i]-y_true[:,i])))
    return loss

# MSE
def WMSE_yichao(y_true, y_pred):
    for i in np.arange(y_pred.shape[1]):
        if i == 0:
            loss = K.mean(K.square((y_pred[:,0]-y_true[:,0])))
        else:
            loss = loss + K.mean(K.square((y_pred[:,i]-y_true[:,i])))
    return loss

In [None]:
# A way to look at the epoch results
class NBatchLogger(tf.keras.callbacks.Callback):

    def __init__(self, display):
        self.step = 0
        self.display = display
        self.metric_cache = {}
    
    def on_epoch_end(self, epoch, logs={}):
        self.step += 1
        for k in logs.keys():
            if k in logs:
                self.metric_cache[k] = self.metric_cache.get(k, 0) + logs[k]
        if self.step % self.display == 0:
            metrics_log = ''
            for (k, v) in self.metric_cache.items():
                val = v / self.display
                if abs(val) > 1e-3:
                    metrics_log += ' - %s: %.1f' % (k, val)
                else:
                    metrics_log += ' - %s: %.4e' % (k, val)
            print('Epoch {}/{}{}'.format(self.step,
                                          self.params['epochs'],
                                          metrics_log))            
            self.metric_cache.clear()

In [None]:
# Callbacks while training the neural network

# 1. Stop when the validation loss is not improving [give a buffer of 100 epochs]
# Restore the best weights with the best validation loss when the training is over
earlyStop = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=100,
    restore_best_weights=True)

# 2. Tensorboard validation and training performance
plotLogs = tf.keras.callbacks.TensorBoard(log_dir="./logs-chehade2/", histogram_freq=0, write_graph=False, write_images=True,profile_batch = 100000000)

In [2]:
# Load data
import scipy.io
mat = scipy.io.loadmat('ECE5831_dataset.mat')
X_tr = mat['X_tr']
X_tst = mat['X_tst']
Y_tr = mat['Y_tr']
Y_tst = mat['Y_tst']


X_tst = X_tst[(Y_tst[:,4] < 2000),:]
# print(X_tst.shape)
Y_tst = Y_tst[(Y_tst[:,4] < 2000),:]


In [4]:
# # Deep Autoencded Warranty Features
nn_dest = 'Models_2022'

from random import randrange
import matplotlib.pyplot as plt
import io

# Y_tr_0 = Y_tr/(.001+X_tr[:,18].reshape(X_tr.shape[0],1))
# X_tr_0 = X_tr[:,:18]/(.001+X_tr[:,18].reshape(X_tr.shape[0],1))

# Y_tst_0 = Y_tst/(.001+X_tst[:,18].reshape(X_tst.shape[0],1))
# X_tst_0 = X_tst[:,:18]/(.001+X_tst[:,18].reshape(X_tst.shape[0],1))

Y_tr_0 = Y_tr.copy()
X_tr_0 = X_tr[:,:19]
Y_tst_0 = Y_tst.copy()
X_tst_0 = X_tst[:,:19]

from tensorflow.keras import backend as K


# Define a custom layer for using the neurones of a hidden layer as parameters of the Weibull CDF or general CDF
class Lay(layers.Layer):
    
    # Standard function
    def init(self):
        super(Lay,self).__init__()
        
    # Standard function
    def build(self,inputShape):
        super(Lay,self).build(inputShape)
        
    # Custom function
    def call(self,x):
        
        # Saturate the parameters of the Weibull CDF to avoid unstable solutions
        x = K.relu(x, max_value=1000)
        
        # Return the Weibull CDF estimates as survival estimates [bounded between 0 and 1]
        # First given neuron x[:,0:1] is the scale parameter
        # Second given neuron x[:,1:2] is the shape parameter
        # Thus, the input layer to the our custom layer should have at least two neurons [best to be exactly two neurons]
        return (1-K.exp(-K.pow((K.constant([[1/36,1/36,2/36,13/36,19/36,25/36,31/36,37/36]]))*x[:,0:1],x[:,1:2])))


from random import randrange
import matplotlib.pyplot as plt
import io


# Create an empty prediction matrix to store the predictions from the "N" neural networks of maturation level "t"
N = 15 # 15 networks with different with initializations
Y_tr_pred = np.empty([Y_tr_0.shape[0], Y_tr_0.shape[1], N])

# Start training networks for maturation level "t"
for j in np.arange(N):

    # Construct names for the weights and model file names
    w_name = nn_dest + '\\DNN_weights_'  + '_' + str(j) + '.h5'
    mdl_name = nn_dest + '\\DNN_'  + '_' + str(j) + '.json'

    # Create the neural network architecture
    input_tensor = layers.Input(shape=(X_tr_0.shape[1],))

    h1 = layers.Dense(units=128, activation='relu')(input_tensor)
    #h1 = layers.Dropout(0.2)(h1)
    h2 = layers.Dense(units=64, activation='relu')(h1)
    #h2 = layers.Dropout(0.2)(h2)
    h3 = layers.Dense(units=32, activation='relu')(h2)
    #h3 = layers.Dropout(0.2)(h3)

    # Define a custom output = Sigmoid(h3 nodes) * Linear (h3 nodes)
    # This is equivalent of 
    y11 = layers.Dense(units=Y_tr_0.shape[1], activation='sigmoid')(h3)
    y12 = layers.Dense(units=Y_tr_0.shape[1], activation='linear')(h3)

    output_tensor2 = layers.Multiply()([y11,y12])    

    # Define the network inputs and outputs
    network = models.Model(inputs=[input_tensor],outputs=[output_tensor2])

    # Compile the model
    network.compile(loss=WMSE_yichao,
                #optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), # Optimization algorithm
                #optimizer = tf.optimizers.Adam(amsgrad=True),
                optimizer = 'Adam',
                #metrics=[AME,AMPE,MAPE,RMSE,RMSE_36,INCR_trend]) 
                metrics=[RMSE_36,AMPE_36]) 

    # Train the model
    out_batch = NBatchLogger(display=10)
    history = network.fit(X_tr_0,# Features
                    Y_tr_0, # Target vector
                    epochs=10000, # Number of epochs
                    verbose=0, # No output
                    batch_size=np.ceil(Y_tr_0.shape[0]/10).astype(int), # Number of observations per batch
                    #batch_size = 16,
                    validation_split = 0.3,
                    callbacks=[out_batch, earlyStop])#, plotLogs, plot_fig])

    # Save the model
    # serialize model to JSON
    model_json = network.to_json()
    with open(mdl_name, "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    network.save_weights(w_name)
    print("Saved " + mdl_name + " to disk.")
    
