# Collaborative RUL Estimation of Turbofan Engines using NCMAPSS

In [34]:
# Datasets
import h5py
import math
import pandas as pd
from pandas import DataFrame
# Helper libraries
import numpy as np
from numpy.lib.stride_tricks import as_strided
from itertools import chain
import os
import random
import time
import itertools
import pyarrow as pa
import pyarrow.parquet as pq
# Import TensorFlow
import tensorflow as tf
#Keras
from keras.models import Sequential,load_model
from keras.models import model_from_json
from keras import optimizers
import keras.backend as K
from tensorflow import keras
from keras.optimizers import SGD
from keras import layers
from keras import models
from keras import regularizers
from tensorflow.keras import layers
from tensorflow import  keras
import keras
import keras.backend as K
#Matplot
import matplotlib.pyplot as plt
import matplotlib
from matplotlib import gridspec
#sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression
from itertools import product
from sklearn.decomposition import PCA
from sklearn.utils import shuffle


## Create a strategy to distribute the variables and the graph

In [None]:
strategy = tf.distribute.MirroredStrategy()

In [None]:
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
tf.compat.v1.enable_eager_execution()

## Auxiliar Functions

In [None]:
def split_sequences(input_data, sequence_length, stride = 1, option = None):
    """
     
    """
    X = list()
    
    for i in range(0,len(input_data),stride):
        # find the end of this pattern
        end_ix = i + sequence_length
        
        # check if we are beyond the dataset
        if end_ix > len(input_data):
            break
        
        # gather input and output parts of the pattern
        if option=='last':
            seq_x = input_data[end_ix-1, :]
        elif option=='next':
            seq_x = input_data[end_ix, :]
        else:
            seq_x = input_data[i:end_ix, :]
        X.append(seq_x)
    
    return np.array(X)
def sequence_generator(input_data, units, cycles, sequence_length=10,stride = 1, option=None):
    """
     # Generates dataset with windows of sequence_length      
    """  
    X = list()
    unit_num=[]
    c_num =[]
    for i, elem_u in enumerate(list(np.unique(units))):
        mask = np.ravel(units==elem_u)
        c_mask = cycles[mask]
        x_unit = input_data[mask]
        for j in np.unique(c_mask):
            mask = np.ravel(c_mask==j)
            seq_x_u = split_sequences(x_unit[mask],sequence_length, stride, option)
            X.append(seq_x_u)
            unit_num.extend(np.ones(len(seq_x_u),dtype = int)*elem_u)
            c_num.extend(np.ones(len(seq_x_u),dtype = int)*j)
    
    return np.vstack(X),np.array(unit_num).reshape(-1,1),np.array(c_num).reshape(-1,1)

In [45]:
def labelDistribution(NODES):
    """
        This function is used to distribute the indexes of Training and Eval datasets
    """
    global SHUFFLE_IDX
    global TRAINING_IDX
    global VAL_IDX
    global M_LABELS
    global MAX_DATASET_SIZE
    global WINDOW_LEN
    global STRIDE
    
    for FC in range(0, NODES):
        with h5py.File("FC"+str(FC+1)+"/FC"+str(FC+1)+'_dev'+".h5", 'r') as hdf:
            Y_train = np.array(hdf.get('Y_dev'), dtype='float32')
            A_train = np.array(hdf.get('A_dev'), dtype='float32')
            
            units_train=A_train[:,0].reshape(-1,1)
            cycles_train=A_train[:,1].reshape(-1,1)
            
            # Create Windows for Labels
            Y_windows,_,_=sequence_generator(Y_train,units_train,cycles_train,sequence_length=WINDOW_LEN,option='last',stride = STRIDE)
            
            shuffle_idx = np.random.permutation(Y_windows.shape[0]) 
            TRAINING_IDX[FC], VAL_IDX[FC] = shuffle_idx[Y_windows.shape[0]//10:],shuffle_idx[:Y_windows.shape[0]//10]
            M_LABELS[FC] = Y_train[TRAINING_IDX[FC]]
            if (MAX_DATASET_SIZE<Y_train[TRAINING_IDX[FC]].size):
                MAX_DATASET_SIZE=Y_train[TRAINING_IDX[FC]].size

## Splitting data by Flight Class

In [6]:
# Hyperparameters
NODES = 3
WINDOW_LEN = 50
STRIDE = 5
BATCH_SIZE = 128

In [36]:
LAYERS = [64,32]
LEARNING_RATE = 0.001
BATCH_SIZE = 128
EPOCHS = 100

In [7]:
# Global Scaler Variables
SCALER_X = MinMaxScaler()
SCALER_W = MinMaxScaler()
SCALER_Y = MinMaxScaler(feature_range=(0,1))

In [8]:
# Global Sync Variables
M_LABELS=[*range(0,NODES)]
SHUFFLE_IDX = {}
TRAINING_IDX = {}
VAL_IDX = {}
MAX_DATASET_SIZE=0

### Distribute Indexes (Training and Eval) and Labeles per Replica

In [9]:
labelDistribution(NODES)

In [10]:
M=np.empty((MAX_DATASET_SIZE,NODES))
M[:] = np.nan
for x in range(0,NODES):
    M[:len(M_LABELS[x]),x]=list(chain.from_iterable(M_LABELS[x]))
M

array([[13., 22., 56.],
       [94., 27., 12.],
       [31., 36.,  6.],
       ...,
       [nan, nan, 58.],
       [nan, nan, 64.],
       [nan, nan, 66.]])

In [46]:
def fn_data_partition(NODES, evaluation=False):
    
    # Global Variables
    global SCALER_X
    global SCALER_W
    global SCALER_Y
    global WINDOW_LEN
    global STRIDE
    global BATCH_SIZE
    global TRAINING_IDX
    global VAL_IDX
    
    distributedData=[]
    for FC in range(0, NODES):
        DATA_REPLICA_ID = []
        # Load data DEV
        with h5py.File("FC"+str(FC+1)+"/FC"+str(FC+1)+'_dev'+".h5", 'r') as hdf:
            # Development set
            W_train = np.array(hdf.get('W_dev'), dtype='float32')             # W
            X_s_train = np.array(hdf.get('X_s_dev'), dtype='float32')         # X_s
            X_v_train = np.array(hdf.get('X_v_dev'), dtype='float32')         # X_v
            T_train = np.array(hdf.get('T_dev'), dtype='float32')             # T
            Y_train = np.array(hdf.get('Y_dev'), dtype='float32')             # RUL                  
            A_train = np.array(hdf.get('A_dev'), dtype='float32')

            # Varnams
            W_var = np.array(hdf.get('W_var'))
            X_s_var = np.array(hdf.get('X_s_var'))  
            X_v_var = np.array(hdf.get('X_v_var')) 
            T_var = np.array(hdf.get('T_var'))
            A_var = np.array(hdf.get('A_var'))

            # from np.array to list dtype U4/U5
            W_var = list(np.array(W_var, dtype='U20'))
            X_s_var = list(np.array(X_s_var, dtype='U20'))  
            X_v_var = list(np.array(X_v_var, dtype='U20')) 
            T_var = list(np.array(T_var, dtype='U20'))
            A_var = list(np.array(A_var, dtype='U20'))

            units_train=A_train[:,0].reshape(-1,1)
            cycles_train=A_train[:,1].reshape(-1,1)
            fc_train = A_train[:,2].reshape(-1,1)
            hi_train = A_train[:,-1]
                
            X_s_train = SCALER_X.fit_transform(X_s_train)
            W_train = SCALER_W.fit_transform(W_train)
            Y_train = SCALER_Y.fit_transform(Y_train)
                
            X_windows, U_windows, C_windows=sequence_generator(X_s_train,units_train,cycles_train,sequence_length=WINDOW_LEN,stride = STRIDE)
            W_windows,_,_=sequence_generator(W_train,units_train,cycles_train,sequence_length=WINDOW_LEN,stride = STRIDE)
            Y_windows,_,_=sequence_generator(Y_train,units_train,cycles_train,sequence_length=WINDOW_LEN,option='last',stride = STRIDE)
            
            if evaluation:
                DATA_REPLICA_ID = tf.data.Dataset.from_tensor_slices(((X_windows[VAL_IDX[FC]],W_windows[VAL_IDX[FC]]),Y_windows[VAL_IDX[FC]]))
                DATA_REPLICA_ID = DATA_REPLICA_ID.batch(BATCH_SIZE)
            else: 
                DATA_REPLICA_ID = tf.data.Dataset.from_tensor_slices(((X_windows[TRAINING_IDX[FC]],W_windows[TRAINING_IDX[FC]]),Y_windows[TRAINING_IDX[FC]]))
                DATA_REPLICA_ID = DATA_REPLICA_ID.batch(BATCH_SIZE) 
        
        distributedData.append(DATA_REPLICA_ID)
    return distributedData

In [47]:
distributedDataTrain = fn_data_partition(NODES,False)
distributedDataTest = fn_data_partition(NODES,True)

In [48]:
def value_fn_train(ctx):
    return distributedDataTrain[ctx.replica_id_in_sync_group]
def value_fn_test(ctx):
    return distributedDataTest[ctx.replica_id_in_sync_group]

In [49]:
distributed_values_train = strategy.experimental_distribute_values_from_function(value_fn_train)
distributed_values_test = strategy.experimental_distribute_values_from_function(value_fn_test)

In [50]:
local_result_train = strategy.experimental_local_results(distributed_values_train)[0]
local_result_test = strategy.experimental_local_results(distributed_values_test)[0]

# Create Model

In [51]:
#RUL MODEL
def predictor(t=64,
      feature_X_in=14,
      feature_W_in=4,
      feature_H_in=1,
      feature_out_size=1,
      activation='relu',
      filter = [10,10,1],
      filter_size = 10,
      useH=True):
    
    '''
    useH: if True, use H as input
        [X,W,H] -> Y 
    else:
        [X,W] -> Y
    '''

    x_in=layers.Input(shape=(t,feature_X_in),name="X_in")
    w_in = layers.Input(shape=(t,feature_W_in),name="W_in")
    
    
    if useH:
      h_in = layers.Input(shape=(t,feature_H_in),name="H_in")
      # h_in = layers.Input(shape=(1,1),name="H_in")
      x = tf.concat([x_in,w_in, h_in],-1)
    else: 
      x = tf.concat([x_in,w_in],-1)
      
    for i in filter:
      x = layers.Conv1D(i,filter_size,1,padding='same',activation = activation)(x)
      # x = layers.BatchNormalization()(x)
      
    x = layers.Flatten()(x)
    y = layers.Dense(50,activation = activation)(x)
    y = layers.Dense(feature_out_size,activation = 'linear')(y)

    if useH:
      model = models.Model([x_in,w_in,h_in], y)
    else:
      model = models.Model([x_in,w_in], y)

    return model

In [52]:
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

## Define the loss function

Recall that the loss function consists of one or two parts:

  * The **prediction loss** measures how far off the model's predictions are from the training labels for a batch of training examples. It is computed for each labeled example and then reduced across the batch by computing the average value.
  * Optionally, **regularization loss** terms can be added to the prediction loss, to steer the model away from overfitting the training data. A common choice is L2 regularization, which adds a small fixed multiple of the sum of squares of all model weights, independent of the number of examples. The model above uses L2 regularization to demonstrate its handling in the training loop below.

For training on a single machine with a single GPU/CPU, this works as follows:

  * The prediction loss is computed for each example in the batch, summed across the batch, and then divided by the batch size.
  * The regularization loss is added to the prediction loss.
  * The gradient of the total loss is computed w.r.t. each model weight, and the optimizer updates each model weight from the corresponding gradient.

With `tf.distribute.Strategy`, the input batch is split between replicas.
For example, let's say you have 4 GPUs, each with one replica of the model. One batch of 256 input examples is distributed evenly across the 4 replicas, so each replica gets a batch of size 64: We have `256 = 4*64`, or generally `GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA`.

Each replica computes the loss from the training examples it gets and computes the gradients of the loss w.r.t. each model weight. The optimizer takes care that these **gradients are summed up across replicas** before using them to update the copies of the model weights on each replica.

*So, how should the loss be calculated when using a `tf.distribute.Strategy`?*

  * Each replica computes the prediction loss for all examples distributed to it, sums up the results and divides them by `num_replicas_in_sync * BATCH_SIZE_PER_REPLICA`, or equivently, `GLOBAL_BATCH_SIZE`.
  * Each replica compues the regularization loss(es) and divides them by
  `num_replicas_in_sync`.

Compared to non-distributed training, all per-replica loss terms are scaled down by a factor of `1/num_replicas_in_sync`. On the other hand, all loss terms -- or rather, their gradients -- are summed across that number of replicas before the optimizer applies them. In effect, the optimizer on each replica uses the same gradients as if a non-distributed computation with `GLOBAL_BATCH_SIZE` had happened. This is consistent with the distributed and undistributed behavior of Keras `Model.fit`. See the [Distributed training with Keras](./keras.ipynb) tutorial on how a larger gloabl batch size enables to scale up the learning rate.

In [53]:
with strategy.scope():
    def compute_loss_batch(labels, predictions, model_losses):
        per_example_loss = (labels - predictions)**2  # Sample error
        loss = tf.math.sqrt(tf.nn.compute_average_loss(per_example_loss)) # Batch Error
        return loss

In [54]:
with strategy.scope():
    test_mae = tf.keras.metrics.MeanAbsoluteError()
    train_rmse = tf.keras.metrics.RootMeanSquaredError()
    test_rmse = tf.keras.metrics.RootMeanSquaredError()

In [55]:
# A model, an optimizer, and a checkpoint must be created under `strategy.scope`.
with strategy.scope():
    model = predictor(t=50,useH=False,filter = LAYERS)
    optimizer = SGD(lr=LEARNING_RATE)
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

  super().__init__(name, **kwargs)


## Auxiliar Funcions for Training

In [56]:
def train_step_batch(inputs):
    input_signals, labels = inputs
    with tf.GradientTape() as tape:
        predictions = model(input_signals, training=True)
        loss = compute_loss_batch(labels, predictions, model.losses) # Batch Error
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_rmse.update_state(labels, predictions)
    return loss
def train_step_sample(inputs):
    input_signals, labels = inputs
    with tf.GradientTape() as tape:
        predictions = model(input_signals, training=True)
    return predictions

def compute_loss_fedLabSync(inputs, collaborativePredictions):
    input_signals, labels = inputs
    with tf.GradientTape() as tape:
        predictions = model(input_signals, training=True)
        loss = compute_loss_batch(labels, (collaborativePredictions+predictions)/2, model.losses) # Batch Error
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_rmse.update_state(labels, (collaborativePredictions+predictions)/2)
    return loss
    
def test_step(inputs):
    input_signals, labels = inputs
    predictions = model(input_signals, training=False)
    
    t_loss = tf.math.abs(labels-predictions)
    test_mae.update_state(labels, predictions)
    test_rmse.update_state(labels, predictions)

In [57]:
# with the distributed input.
@tf.function
def distributed_train_step_batch(dataset_inputs):
    per_replica_losses = strategy.run(train_step_batch, args=(dataset_inputs,)) 
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
@tf.function
def collaborative_predictions(dataset_inputs):
    per_replica_predictions= strategy.run(train_step_sample, args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_predictions,  
                         axis=None)
@tf.function
def local_collaborative_loss(collaborative_predictions, dataset_inputs):
    return strategy.run(compute_loss_fedLabSync, args=(dataset_inputs, collaborative_predictions,))

@tf.function
def distributed_test_step_batch(dataset_inputs):
    return strategy.run(test_step, args=(dataset_inputs,))

# Training Procedure

In [58]:
for epoch in range(EPOCHS):
    # TRAIN LOOP
    total_loss = 0.0
    num_batches = 0
    for x in local_result_train:
        colaboratePrediction = collaborative_predictions(x)
        total_loss += local_collaborative_loss(colaboratePrediction, x)
        #total_loss += distributed_train_step_batch(x)
        num_batches += 1
    train_loss = total_loss / num_batches
    
    # TEST LOOP
    for x in local_result_test:
        distributed_test_step_batch(x)

    if epoch % 2 == 0:
        checkpoint.save(checkpoint_prefix)

    template = ("Epoch {}, Train_RMSE: {}, Test MAE: {}, "
              "Test_RMSE: {}")
    print(template.format(epoch + 1, train_rmse.result(), test_mae.result(),
                         test_rmse.result()))
    test_mae.reset_states()
    train_rmse.reset_states()
    test_rmse.reset_states()

Epoch 1, Train_RMSE: 0.15882033109664917, Test MAE: 0.0967913568019867, Test_RMSE: 0.1195615828037262
Epoch 2, Train_RMSE: 0.11171253025531769, Test MAE: 0.08590613305568695, Test_RMSE: 0.10475905984640121
Epoch 3, Train_RMSE: 0.10062603652477264, Test MAE: 0.08051487803459167, Test_RMSE: 0.0972210168838501
Epoch 4, Train_RMSE: 0.09497213363647461, Test MAE: 0.07765178382396698, Test_RMSE: 0.09344189614057541


KeyboardInterrupt: 