## Import Statements

In [1]:
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import optuna

## Load Datasets

In [64]:
# define the directory where the pickle files are stored
folder_path = '../batched_data_pickle_files/'

# define the filenames for the pickle files
file_names = ['processed_train_data.pkl', 'processed_train_targets.pkl', 'processed_test_data.pkl', 'true_rul.pkl']

# loop through each file and load its contents as arrays
for file_name in file_names:
    file_path = os.path.join(folder_path, file_name)
    
    # read the pickle file
    with open(file_path, 'rb') as file:
        data = pickle.load(file)
    
    # ensure data is a numpy array, if not convert it
    if isinstance(data, np.ndarray):
        globals()[file_name.replace('.pkl', '')] = data
    else:
        globals()[file_name.replace('.pkl', '')] = np.array(data)

print("Processed Train Data Shape:", processed_train_data.shape)
print("Processed Train Target Shape:", processed_train_targets.shape)
print("Processed Test Data Shape:", processed_test_data.shape)
print("True RUL Shape:", true_rul.shape)


Processed Train Data Shape: (17731, 30, 14)
Processed Train Target Shape: (17731,)
Processed Test Data Shape: (100, 30, 14)
True RUL Shape: (100,)


## Process Datasets and Build SDAE

In [65]:
# first, we assign the processed data to X and y
# X contains the features (input data) and y contains the target variable (processed_train_targets)
X = processed_train_data
y = processed_train_targets

# convert the numpy arrays to PyTorch tensors, since PyTorch models expect inputs as tensors
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)

# split the data into training and validation sets
# here, we use sklearn's train_test_split to create a training set and a validation set
# 80% of the data goes into the training set, and 20% goes into the validation set
X_train, X_val, y_train, y_val = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# create DataLoader objects to handle batching during training and evaluation
# DataLoader is used to load data in batches during training, which is more memory-efficient and faster
# here, we create datasets using TensorDataset, which combines input (X) and target (y) tensors into a dataset
# then, we create the DataLoader objects, specifying batch sizes and whether to shuffle the data

train_dataset = TensorDataset(X_train, y_train)  # create training dataset
val_dataset = TensorDataset(X_val, y_val)  # create validation dataset

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)  # shuffle for training set to ensure randomness
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)  # no shuffling for validation set

# define the Stacked Denoising Autoencoder (SDAE) model class
# SDAE is a type of autoencoder that adds noise to the input during training and forces the network to denoise it
# this helps the model learn more robust features and avoid overfitting

class SDAE(nn.Module):
    def __init__(self, input_dim, hidden_dims, dropout_rate):
        super(SDAE, self).__init__()

        # input_dim is the number of features in the input data
        # hidden_dims is a list of integers representing the size of each hidden layer in the encoder/decoder
        # dropout_rate is the dropout probability to prevent overfitting

        self.input_dim = input_dim  # save input dimension for reference
        self.hidden_dims = hidden_dims  # save hidden layer dimensions for reference

        # initialize the encoder layers
        # the encoder maps the input data to a smaller representation (latent space)
        encoder_layers = []
        for i in range(len(hidden_dims)):
            # the first layer will take the input data, the rest will take the output of the previous layer
            if i == 0:
                encoder_layers.append(nn.Linear(input_dim, hidden_dims[i]))  # fully connected layer
            else:
                encoder_layers.append(nn.Linear(hidden_dims[i-1], hidden_dims[i]))  # another fully connected layer
            encoder_layers.append(nn.ReLU())  # apply ReLU activation after each layer
            encoder_layers.append(nn.Dropout(dropout_rate))  # apply dropout after each layer for regularization

        # define the encoder as a sequence of layers
        self.encoder = nn.Sequential(*encoder_layers)

        # initialize the decoder layers
        # the decoder takes the encoded (compressed) representation and reconstructs the input data
        decoder_layers = []
        for i in range(len(hidden_dims)-1, -1, -1):
            # reverse the encoder layers to reconstruct the data
            if i == 0:
                decoder_layers.append(nn.Linear(hidden_dims[i], input_dim))  # final layer to map to input dimension
            else:
                decoder_layers.append(nn.Linear(hidden_dims[i], hidden_dims[i-1]))  # intermediate layers
            decoder_layers.append(nn.ReLU())  # ReLU activation after each layer
            decoder_layers.append(nn.Dropout(dropout_rate))  # dropout for regularization

        # define the decoder as a sequence of layers
        self.decoder = nn.Sequential(*decoder_layers)

        # define the predictor (regressor)
        # the predictor will take the output of the encoder and map it to a single value (the predicted RUL)
        self.predictor = nn.Linear(hidden_dims[-1], 1)  # output layer with a single unit (for regression)

    # the forward method defines the forward pass of the network
    def forward(self, x):
        # flatten the input data to a 2D tensor of shape [batch_size, input_dim]
        # this is necessary because the Linear layers expect 2D input (batch_size x features)
        x = x.view(x.size(0), -1)  # flatten the input (keeping the batch size dimension intact)

        # pass the input through the encoder to get the encoded (latent) representation
        encoded = self.encoder(x)

        # pass the encoded representation through the decoder to reconstruct the input (for autoencoding)
        decoded = self.decoder(encoded)

        # pass the encoded representation through the predictor to get the predicted RUL
        prediction = self.predictor(encoded)

        # return both the reconstructed input (for training the autoencoder) and the predicted RUL
        return decoded, prediction.squeeze()  # squeeze to remove the extra dimension from the output

# define a function to add noise to the input data
# this is part of the denoising autoencoder approach
# the idea is to train the model to reconstruct the original input from noisy data, making it more robust
def add_noise(x, noise_factor=0.3):
    # generate random noise with the same shape as the input x
    noise = torch.randn_like(x) * noise_factor
    # add the noise to the input data and return the noisy version
    return x + noise

## Build Model

In [20]:
# rmse loss function
# this function calculates rmse (root mean square error) between the predicted and true values
# we use this for regression tasks, like predicting the remaining useful life (rul)
def rmse_loss(predictions, targets):
    return torch.sqrt(nn.MSELoss()(predictions, targets))  # mse loss is calculated first, then we take the square root

# train function for training the model with noisy inputs
# this function will be used during optuna's hyperparameter optimization
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    model.to(device)  # move the model to the selected device (GPU or CPU)
    
    for epoch in range(num_epochs):
        model.train()  # set model to training mode
        train_loss = 0.0  # we'll accumulate the training loss here
        
        # loop through the training batches
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)  # move data to device
            batch_x_noisy = add_noise(batch_x)  # add noise to the input data for the autoencoder

            optimizer.zero_grad()  # reset gradients from the previous step
            decoded, prediction = model(batch_x_noisy)  # forward pass through the model

            # loss calculation: we have two parts - reconstruction loss and prediction loss
            loss = criterion(decoded, batch_x.view(batch_x.size(0), -1)) + criterion(prediction, batch_y)
            
            loss.backward()  # backpropagate the gradients
            optimizer.step()  # update the model weights
            train_loss += loss.item()  # accumulate the training loss

        # validation phase to evaluate performance on the validation set
        model.eval()  # set the model to evaluation mode (disables dropout, etc.)
        val_loss = 0.0  # we'll accumulate the validation loss here
        
        with torch.no_grad():  # no need to compute gradients during validation
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)  # move data to device
                decoded, prediction = model(batch_x)  # forward pass through the model

                # calculate the loss for both the reconstruction and prediction parts
                loss = criterion(decoded, batch_x.view(batch_x.size(0), -1)) + criterion(prediction, batch_y)
                val_loss += loss.item()  # accumulate the validation loss

        val_loss /= len(val_loader)  # average the validation loss over all batches
    return val_loss  # return the validation loss to Optuna

# objective function for optuna's optimization loop
# this is what optuna uses to evaluate different hyperparameter combinations
def objective(trial):
    # define the hyperparameter search space
    hidden_dims = [trial.suggest_int('hidden_dim_1', 64, 256, step=32),
                   trial.suggest_int('hidden_dim_2', 32, 128, step=32),
                   trial.suggest_int('hidden_dim_3', 16, 128, step=32)]
    
    # suggest a dropout rate between 0 and 0.3 for regularization
    dropout_rate = trial.suggest_uniform('dropout_rate', 0.0, 0.3)
    
    # suggest a learning rate using a log-uniform distribution
    # this explores learning rates from 1e-5 to 1e-3
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-3)

    # create the model using the suggested hyperparameters
    model = SDAE(input_dim=30 * 14, hidden_dims=hidden_dims, dropout_rate=dropout_rate)

    # use rmse loss and adam optimizer
    criterion = rmse_loss
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # select the device (GPU or CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # train the model and get the validation loss
    val_loss = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=20, device=device)

    # return the validation loss to Optuna so it can decide whether this trial is good or not
    return val_loss

# create an Optuna study to minimize the validation loss
study = optuna.create_study(direction='minimize')

# optimize for 100 trials (this will try different hyperparameter combinations)
study.optimize(objective, n_trials=100)

# print the best hyperparameters and the validation loss of the best trial
print("Best Hyperparameters:", study.best_params)
print("Best Validation RMSE:", study.best_value)

# once the best trial is found, extract the best hyperparameters
best_trial = study.best_trial
best_hidden_dims = [best_trial.params['hidden_dim_1'], best_trial.params['hidden_dim_2'], best_trial.params['hidden_dim_3']]
best_dropout_rate = best_trial.params['dropout_rate']
best_lr = best_trial.params['learning_rate']

# create a new model with the best hyperparameters
best_model = SDAE(input_dim=30 * 14, hidden_dims=best_hidden_dims, dropout_rate=best_dropout_rate)

# create the optimizer and loss function for the final model
optimizer = optim.Adam(best_model.parameters(), lr=best_lr)
criterion = rmse_loss

# train the final model on the full training data with the best hyperparameters
train_model(best_model, train_loader, val_loader, criterion, optimizer, num_epochs=20, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# save the best model to a file so we can load it later
torch.save(best_model.state_dict(), 'best_sdae_model_optuna_second.pth')
print("Best model saved to 'best_sdae_model_optuna_second.pth'")


[I 2024-11-14 04:04:56,892] A new study created in memory with name: no-name-3b6a0f5b-ac39-493f-a284-b8b3457e4fe7
  dropout_rate = trial.suggest_uniform('dropout_rate', 0.0, 0.3)
  learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-3)
[I 2024-11-14 04:05:13,336] Trial 0 finished with value: 16.609278917312622 and parameters: {'hidden_dim_1': 160, 'hidden_dim_2': 64, 'hidden_dim_3': 80, 'dropout_rate': 0.15922720170060026, 'learning_rate': 0.00022075549335688925}. Best is trial 0 with value: 16.609278917312622.
[I 2024-11-14 04:05:31,304] Trial 1 finished with value: 15.158706222261701 and parameters: {'hidden_dim_1': 224, 'hidden_dim_2': 96, 'hidden_dim_3': 48, 'dropout_rate': 0.09451900214052883, 'learning_rate': 0.0005878405836620545}. Best is trial 1 with value: 15.158706222261701.
[I 2024-11-14 04:05:46,922] Trial 2 finished with value: 34.661446707589285 and parameters: {'hidden_dim_1': 64, 'hidden_dim_2': 96, 'hidden_dim_3': 112, 'dropout_rate': 0.1998339468504805

Best Hyperparameters: {'hidden_dim_1': 64, 'hidden_dim_2': 64, 'hidden_dim_3': 80, 'dropout_rate': 0.05155521210111333, 'learning_rate': 0.0009057370211032963}
Best Validation RMSE: 14.186381884983607
Best model saved to 'best_sdae_model_optuna_second.pth'


## Evaluate Model

In [89]:
# test data (processed test data and true rul values)
X_test = processed_test_data  # processed test data
y_test = true_rul  # true remaining useful life values

# convert the test data and labels to pytorch tensors
X_test_tensor = torch.FloatTensor(X_test)  # convert test data to float tensor
y_test_tensor = torch.FloatTensor(y_test)  # convert true rul values to float tensor

# create a DataLoader for the test data (used for batch processing)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)  # combine the features and labels into a dataset
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)  # create the DataLoader for test data (no shuffling)

# define the best hyperparameters found from optuna
best_hidden_dims = [64, 64, 80] 
best_dropout_rate = 0.05155521210111333 

# instantiate the model with the best hyperparameters
best_model = SDAE(input_dim=30 * 14, hidden_dims=best_hidden_dims, dropout_rate=best_dropout_rate)

# load the best model weights saved after training with optuna
best_model.load_state_dict(torch.load('best_sdae_model_optuna_second.pth', map_location='cpu'))  # use 'cuda' if GPU use

# define a function to get predictions from the model
def get_predictions(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # use GPU if available, otherwise use CPU
    model.to(device)  # move the model to the selected device (GPU/CPU)
    model.eval()  # set the model to evaluation mode (disables dropout layers, etc.)
    
    all_predictions = []  # this will hold the predictions from all batches
    
    with torch.no_grad():  # turn off gradient computation since we are just doing inference
        for batch_x, _ in test_loader:  # loop through the batches in the test_loader
            batch_x = batch_x.to(device)  # move input batch to the same device as the model
            _, predictions = model(batch_x)  # forward pass through the model to get predictions
            all_predictions.append(predictions.cpu().numpy())  # store predictions (move them to CPU and convert to numpy)

    # concatenate all predictions into a single array (flatten list of arrays)
    all_predictions = np.concatenate(all_predictions, axis=0)
    
    return all_predictions  # return the final array of predictions

# get the model's predictions on the test set
predictions = get_predictions(best_model, test_loader)

# print or inspect the predictions
print(predictions[:10])  # print the first 10 predictions


[108.270035 123.95801   46.380363  89.536026 101.55478  105.04238
  88.46287   98.11394  117.86893   71.83512 ]


  best_model.load_state_dict(torch.load('best_sdae_model_optuna_second.pth', map_location='cpu'))  # Use 'cuda' if running on GPU


In [90]:
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error

def evaluate_rul_metrics(true, predicted):
    
    true = np.array(true)
    predicted = np.array(predicted)
    
    mae = float(mean_absolute_error(true, predicted))
    mse = float(mean_squared_error(true, predicted))
    rmse = float(np.sqrt(mse))
    mape = float(np.mean(np.abs((true - predicted) / true)) * 100)
    
    return {
        "MAE": mae,
        "MSE": mse,
        "RMSE": rmse,
        "MAPE": mape
    }
    
metrics = evaluate_rul_metrics(true_rul, predictions)
metrics

{'MAE': 10.44703010559082,
 'MSE': 205.7282444988759,
 'RMSE': 14.343229918636734,
 'MAPE': 16.14549992434262}