#**Machine Remaining Useful Life Estimation Using Deep CNN**

Predicting the Machine Remaining Useful Life using Deep Convolutional Neural Network (DCNN)

##Imports:

Imoprt the necessary files and packages.

In [None]:
from google.colab import files
from google.colab import drive
drive.mount('/content/drive')

In [None]:

from PreProcessing import *
from model import *
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.metrics import mean_squared_error
import torch.nn as nn
import matplotlib.pyplot as plt

device = "cuda:0" if torch.cuda.is_available() else "cpu"
device = torch.device(device)


##Load N-CMAPSS Dataset

In [None]:
#X  will be in the form pandas DataFrame, and the target Y will be a numpy array
(X_dev,y_dev),(X_test,y_test) = load_NCMAPSS_data() 


In [None]:
#Normalize the data
X_dev_normalized,X_test_normalized = data_normalization(X_dev,X_test)

##Prepare the Data for training

In [None]:

# Implement a custom Dataset  for NCMAPPS Dataset :

class NCMAPPS(Dataset):

    def __init__(self,x,y,history_len):
        # Initialize data
        '''
        parameters:
        x : dataframe that from which we took the sensors measurments only
        y : target rul
        history_len : the length of the sequennce to be considered for each sample
        '''
        self.history_len = history_len
        c_sensors = x.columns[4:-1] # Extract the sonsors reading
        # the coloumns with indecies [0-3] are auxiliary variables

        self.x = x[c_sensors]
        self.x= torch.tensor(self.x.to_numpy()).float() # Convert to numpy array for easier handling and then for a tensor
        self.y= torch.tensor(y)                                                     
        self.n_samples = x.shape[0]
        
    # support indexing such that dataset[i] can be used to get i-th sample
    def __getitem__(self, index):
        # return the sequence from the index up to the index + history_len 
        # and the target will be the rul at time == index + history_len

        return self.x[index: index+self.history_len, :], self.y[index+self.history_len, :]

    # we can call len(dataset) to return the size
    def __len__(self):
        return self.n_samples-self.history_len



def train_validation_split(X_dev,y_dev):
  '''
  parameters:
  X_dev : Development data set(features matrix) which has to be splitted into train and validation sets
  y_dev : Development targets to be spliited into train and validation sets

  this function took the development set and split it into train and validation
  it return the data for two random units as a validation set and the remaining units for the train
  '''
  units = np.unique(X_dev.unit) # units within the dataset

  i,j = np.random.randint(1,np.max(units)+1,2) # generate two random numbers within the range(1,unit with the maximum number)

  index = (X_dev.unit==i) + (X_dev.unit == j) # index to selects a two random units as a validation set
  
  #Train set
  X_train = X_dev[~index]
  y_train = y_dev[~index]

  #Validation
  X_validation = X_dev[index]
  y_validation = y_dev[index]

  return (X_train,y_train),(X_validation,y_validation)

In [None]:
(X_train,y_train),(X_val,y_val) = train_validation_split(X_dev_normalized,y_dev)
train_set = NCMAPPS(X_train,y_train,50)
val_set = NCMAPPS(X_val,y_val,50)
batch_size= 1024
# Load whole dataset with DataLoader
train_loader = DataLoader(dataset=train_set,
                          batch_size=batch_size,
                          shuffle=False)

val_loader = DataLoader(dataset=val_set,
                          batch_size=batch_size,
                          shuffle=False)

## Define and Initiate the model

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class DCNN(nn.Module):
    
    def __init__(self,num_features,conv1_o,conv1_kh,conv2_o,conv2_kh,fc1_o):

      '''
      Parameters:
      num_features : number of the input features of the data
      conv1_o : number of the output channels of the first convolutional layer
      conv2_o : // //         //                //    second  //
      conv1_kh : the height of the kerenel for the first convolutional layer
      conv2_kh : //   //    //                  // seconnd //
      fc1_o : the number of the output features of the 1st fully connected layer


      '''
      super(DCNN, self).__init__()
      self.num_features = num_features 
      self.conv1_o = conv1_o 
      self.conv1_kh = conv1_kh 
      self.conv2_o =conv2_o  
      self.conv2_kh = conv2_kh
        
      self.dropout = nn.Dropout(p = 0.2)\

      self.conv1 = nn.Sequential(
                                   nn.Conv2d(1, self.conv1_o, 
                                                kernel_size=(self.conv1_kh, self.num_features)),
                                   nn.ReLU(), 
                                    nn.Conv2d(self.conv1_o, self.conv2_o, 
                                                kernel_size=(self.conv2_kh, 1)) 
                                   )
      self.max_pooling_layer= nn.MaxPool2d(4,stride=2, padding= 2)
        
      
      self.output_features = self.conv2_o
        
      self.output = nn.Sequential(
                                    nn.Linear(self.output_features, fc1_o),
                                    nn.ReLU(), 
                                    nn.Linear(fc1_o, 1)
                                    ) 
                                    
        
        
    def forward(self, X):
        """
        Parameters:
        X (tensor) [batch_size, time_steps, num_features] # Shaped like image
        """
        batch_size = X.size(0)
        
        # Convolutional Layer
        
        out = X.unsqueeze(1) # [batch_size, num_channels=1, time_steps, num_features]
        
        out = F.relu(self.conv1(out)) # [batch_size, conv2_out_channels, shrinked_time_steps, 1]
        
        out= self.max_pooling_layer(out)
        
        out = self.dropout(out)

        out = torch.squeeze(out, 3) #
        
        
        out = out.permute(0, 2, 1) # 
        

        out= F.relu(out)
        
        out = out[:, -1, :] 
        out = self.dropout(out)
        
        
         
        # Output Layer
        output = F.relu(self.output(out)) 
        
        
        return output

In [None]:
# intiate an object from the model
model = DCNN(num_features=27,conv1_o=16,conv2_o=32,conv1_kh=14,conv2_kh=14,fc1_o=16).to(device)


## Train the model

In [None]:
# The function that performs the training
def training_loop(n_epochs, optimiser, model, loss_fn,batch_size, train_loader,val_loader ):
  '''
  Parameters:
  n_epochs :  number of epochs
  optimiser : optimiser used for training
  model : the model to be trained 
  loss_fn : loss function used to train the model
  batch_size : the batch size 
  train_loader : dataloader object to load the training data during the training
  val_loader : dataloader object to load the validation data during the training
  '''
    
  train_curve = [] # list to save the mean of the training loss for each epoch
  #the elements of this list will be the value of the training loss for each epoch

  val_curve = [] # list to save the mean of the validation loss for each epoch 
  

  for epoch in range(0, n_epochs):

    train_loss = [] #list to save the mean of the training loss for each iteration
    #the elements of this list will be the value of the training loss for iteration
  
    val_loss = [] # list to save the mean of the training loss for each iteration
    
    model.train(True)
    for i ,(features,targets) in enumerate(train_loader):
      features= features.to(device)
      targets= targets.to(device)
      output_train = model.forward(features) # forwards pass
      targets = torch.tensor(targets,dtype=torch.float32)
      loss_train = loss_fn(output_train, targets) # calculate the loss
      optimiser.zero_grad() # set gradients to zero
      loss_train.backward() # backwards pass
      optimiser.step() # update model parameters
      
      train_loss.append(loss_train.item()) #append the value of the training loss of the i-th iteration

    model.train(False) # to test the model on the validation set

    for i ,(features,targets) in enumerate(val_loader):
        features= features.to(device)
        targets= targets.to(device)
        output_val = model.forward(features) # forwards pass
        targets = torch.tensor(targets,dtype=torch.float32)
        loss_val = loss_fn(output_val, targets) # calculate loss
        val_loss.append(loss_val.item()) #append the value of the validation loss of the i-th iteration

        
        
    print(f"Epoch {epoch}, Training loss {np.mean(train_loss):.4f},"
                  f" Validation loss {np.mean(val_loss):.4f}")
    
    val_curve.append(np.mean(val_loss)) #append the value of the training loss of the epoch 
    train_curve.append(np.mean(train_loss)) # #append the value of the validation loss of the epoch
    

    
  return (train_curve,val_curve)

In [None]:
#Define the parameters for training 
batch_size= 1024
n_epochs = 2
loss_fn = nn.MSELoss()
optim = torch.optim.Adam(model.parameters())

#DataLoaders
train_loader = DataLoader(dataset=train_set,
                          batch_size=batch_size,
                          shuffle=False)

val_loader = DataLoader(dataset=val_set,
                          batch_size=batch_size,
                          shuffle=False)

In [None]:
#Train the model

(training_curve,validation_curve) = training_loop(n_epochs=n_epochs,optimiser=optim,model=model,
                                      loss_fn=loss_fn,batch_size=batch_size,train_loader=train_loader,val_loader=val_loader)

## Prediction and Evaluation


In [None]:
def predict(model,data_set):
  '''
  function to predict the rul 
  parameter:
  model : the trained model to predcit the output
  data_set : data_set object to predict the ouput for it
  '''
  batch_size = 1024
  y_pred = np.zeros((len(data_set)+data_set.history_len,1)) # vector to save the predicted output

  data_loader = DataLoader(dataset = data_set,batch_size = batch_size,shuffle = False)
  for i,(x,y) in enumerate(data_loader):
    model.train(False)
    x = x.to(device)
    y_p = model.forward(x)
    y_pred[i*batch_size:(i*batch_size+y_p.shape[0])] = y_p.cpu().detach().numpy()
  
  return y_pred

In [None]:
#------------First Evaluate on the Devlopment set (train and validation)-------------#
dev_set = NCMAPPS(X_dev_normalized,y_dev,100)
y_pred1 = predict(model1,dev_set)

mse_train = mean_squared_error(y_dev,y_pred)
rmse_train = mean_squared_error(y_dev,y_pred,squared=False)

print(f'The mean squared error for train set      :  {np.mean(mse_train):.4f}')
print(f'The root mean squared error for train set :  {np.mean(rmse_train):.4f}')

In [None]:
#Plot the prediction for a specific unit

unit = 8 # For the dev set it range from 1 to 9 
index = X_dev_normalized.unit==unit


plt.plot(y_dev[index],label='True Rul')
plt.plot(y_pred1[index],label='Predicted Rul')
plt.legend()
plt.xlabel('Cycles')
plt.ylabel('RUL')
plt.title('Predcited Vs True Rul')
plt.show()

In [None]:
#---------------Evalute on Test set -----------------------#
test_set = NCMAPPS(X_test_normalized,y_test,100)
y_pred = predict(model,test_set)

mse_test = mean_squared_error(y_test,y_pred)
rmse_test = mean_squared_error(y_test,y_pred,squared=False)

print(f'The mean squared error for test set      :  {np.mean(mse_test):.4f}')
print(f'The root mean squared error for test set :  {np.mean(rmse_test):.4f}')

In [None]:
#Plot the prediction for a specific unit

unit = 10 # For test set it range from 10 to 15 
index = X_test_normalized.unit==unit

plt.plot(y_test[index],label='True Rul')
plt.plot(y_pred[index],label='Predicted Rul')
plt.xlabel('Cycles')
plt.ylabel('RUL')
plt.title('Predcited Vs True Rul')
plt.show()