# **Zero-fault-shot learning for bearing spall type classification by hybrid approach**

In this notebook the spall type of the processed signals is predicted using a fully connected feed-forward neural network. The notebook loads the signals after the physical domain adaptaion, where they were processed using the "main" matlab code.

In the first code block the notebook imports several libraries where in the second block new functions are defined. In the third block the data is loaded. In the fourth block the fully connected feed-forward neural network is set and its training function is presented. The training phase and the test pashe are presented in the fifth and six code blocks.  

In [1]:
# First code block - Import Python libraries

import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Function
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import random
import matplotlib.pyplot as plt
import warnings
from sklearn.decomposition import PCA
from sklearn.utils import shuffle
import scipy.io
import pdb

import os
from PIL import Image
from torchvision.datasets import VisionDataset
from torchvision.datasets.utils import download_and_extract_archive

warnings.filterwarnings('ignore')

In [2]:
# Second code block - functions

def classification_acc(y_pred, y_true):
    y_pred_tag = torch.softmax(y_pred, dim=1).argmax(dim=1)
    y_true_tag = torch.softmax(y_true, dim=1).argmax(dim=1)

    correct_results_sum = (y_pred_tag == y_true_tag).float().sum()
    acc = correct_results_sum/float(y_true.size(0))*100
    return acc
    
class SimpleDataset(Dataset):
    
    def __init__(self, X_data, y_data):
        #pdb.set_trace()
        #self.X_data = torch.unsqueeze(X_data, dim=1)
        self.X_data = X_data
        self.y_data = y_data
        
    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]
        
    def __len__ (self):
        return len(self.X_data)

def convert_X_y_mat2py(X, y):
    
    X = X.transpose()
    y = y.transpose()
    y = y.squeeze()
    
    return X, y

In [3]:
# Third code block - data loading
BATCH_SIZE = 64
test_set_name = 'MFPT' # 'MFPT', 'PU', 'XJTU_SY', 'PRONOSTIA-FEMTO', 'NBSWT'
datasets_names = ['CWRU', 'MFPT', 'PU', 'XJTU_SY', 'PRONOSTIA-FEMTO', 'NBSWT']
data_path = 'E:\\data\\papers\\zero_fault_shot_learning\\datasets'
processed_data = scipy.io.loadmat(data_path + '\\processed_data.mat')

X_training = processed_data["X_training"]
y_training = processed_data["y_training"]
X_training, y_training = convert_X_y_mat2py(X_training, y_training)

X_CWRU = processed_data["X_CWRU"]
y_CWRU = processed_data["y_CWRU"]
X_CWRU, y_CWRU = convert_X_y_mat2py(X_CWRU, y_CWRU)

X_MFPT = processed_data["X_MFPT"]
y_MFPT = processed_data["y_MFPT"]
X_MFPT, y_MFPT = convert_X_y_mat2py(X_MFPT, y_MFPT)

X_PU = processed_data["X_PU"]
y_PU = processed_data["y_PU"]
X_PU, y_PU = convert_X_y_mat2py(X_PU, y_PU)

X_XJTU_SY = processed_data["X_XJTU_SY"]
y_XJTU_SY = processed_data["y_XJTU_SY"]
X_XJTU_SY, y_XJTU_SY = convert_X_y_mat2py(X_XJTU_SY, y_XJTU_SY)

X_PRONOSTIA_FEMTO = processed_data["X_PRONOSTIA_FEMTO"]
y_PRONOSTIA_FEMTO = processed_data["y_PRONOSTIA_FEMTO"]
X_PRONOSTIA_FEMTO, y_PRONOSTIA_FEMTO = convert_X_y_mat2py(X_PRONOSTIA_FEMTO, y_PRONOSTIA_FEMTO)

X_NBSWT = processed_data["X_NBSWT"]
y_NBSWT = processed_data["y_NBSWT"]
X_NBSWT, y_NBSWT = convert_X_y_mat2py(X_NBSWT, y_NBSWT)

X_training, y_training = shuffle(X_training, y_training, random_state=0)
X_training, X_val, y_training, y_val = train_test_split(X_training, y_training, 
                                                        test_size=0.3, random_state=42)

training_data = SimpleDataset(torch.FloatTensor(X_training), torch.FloatTensor(y_training))
val_data = SimpleDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val))
CWRU_data = SimpleDataset(torch.FloatTensor(X_CWRU), torch.FloatTensor(y_CWRU))
MFPT_data = SimpleDataset(torch.FloatTensor(X_MFPT), torch.FloatTensor(y_MFPT))
PU_data = SimpleDataset(torch.FloatTensor(X_PU), torch.FloatTensor(y_PU))
XJTU_SY_data = SimpleDataset(torch.FloatTensor(X_XJTU_SY), torch.FloatTensor(y_XJTU_SY))
PRONOSTIA_FEMTO_data = SimpleDataset(torch.FloatTensor(X_PRONOSTIA_FEMTO), torch.FloatTensor(y_PRONOSTIA_FEMTO))
NBSWT_data = SimpleDataset(torch.FloatTensor(X_NBSWT), torch.FloatTensor(y_NBSWT))

datasets_list = [CWRU_data, MFPT_data, PU_data, XJTU_SY_data, PRONOSTIA_FEMTO_data, NBSWT_data]
val_datasets_data_list = []
val_datasets_names = []
for ii in range(len(datasets_list)):
    if datasets_names[ii] != test_set_name:
        val_datasets_data_list.append(datasets_list[ii])
        val_datasets_names.append(datasets_names[ii])
    else:
        test_data = datasets_list[ii]

## Creating dataloaders
training_loader = DataLoader(dataset=training_data, batch_size=BATCH_SIZE, shuffle=False)
val_loader = DataLoader(dataset=val_data, batch_size=BATCH_SIZE, shuffle=False)

val_datasets_loader_list = []
for ii in range(len(val_datasets_data_list)):
    val_datasets_loader_list.append(DataLoader(dataset=val_datasets_data_list[ii], 
                                       batch_size=BATCH_SIZE, shuffle=False))
    
test_loader = DataLoader(dataset=test_data, batch_size=BATCH_SIZE, shuffle=False)

print("Trainning / Validation sizes: " + str(len(training_data)) + "/" + str(len(val_data)))
for ii in range(len(val_datasets_data_list)):
    print("Validation dataset " + val_datasets_names[ii] + " size: " + 
          str(len(val_datasets_data_list[ii])))
print("Test size: " + str(len(test_data)))

Trainning / Validation sizes: 10126/4340
Validation dataset CWRU size: 181
Validation dataset PU size: 238
Validation dataset XJTU_SY size: 1563
Validation dataset PRONOSTIA-FEMTO size: 43
Validation dataset NBSWT size: 50
Test size: 17


In [4]:
# Fourth code block - fully connected feed-forward neural network 
class classModel(nn.Module):

    def __init__(self, input_dim):
        super(classModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 10)  # input is (x,y)
        self.fc2 = nn.Linear(10, 10)  # input is (x,y)
        self.fc3 = nn.Linear(10, 10)  # input is (x,y)
        self.fc4 = nn.Linear(10, 4)

        #self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=1.0)
            if module.bias is not None:
                module.bias.data.zero_()
                
    def forward(self, x):
        # Feature Extractor part
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        pred = F.softmax(self.fc4(x))
        
        return pred

class Trainer():
    def __init__(self, class_model, LEARNING_RATE, device="cpu"):
        self.class_model = class_model
        self.criterion = nn.CrossEntropyLoss()
        self.LEARNING_RATE = LEARNING_RATE
        self.optimizer = optim.Adam(self.class_model.parameters(), lr=LEARNING_RATE)
        self.device = device
        
    def train(self, data_loader, eval_data_loader, EPOCHS):   

        self.class_model.zero_grad()
        self.class_model.train()
        
        ## Training Epochs
        for e in range(1, EPOCHS+1):
            epoch_bce_loss = 0
            epoch_acc = 0

            ## Mini-bacth Handling
            for X, y in data_loader:
                X, y = X.to(self.device), y.to(self.device)
                y = nn.functional.one_hot(y.long(), num_classes=4).float()
                self.optimizer.zero_grad()

                # Forward pass
                pred = self.class_model(X)

                # Loss calculations
                bce_loss = self.criterion(pred, y)                        
                loss = bce_loss
                acc = classification_acc(pred, y)

                # Back-propagation
                loss.backward()
                self.optimizer.step()

                # Metrics update
                epoch_bce_loss += bce_loss.item()
                epoch_acc += acc.item()

            print(f'Epoch {e+0:03}: | BCE Loss: {epoch_bce_loss/len(data_loader):.5f} | Train Acc: {epoch_acc/len(data_loader):.3f}')

            ## Evaluating on Target
            self.evaluate(eval_data_loader, data_type="Val")

    def evaluate(self, data_loader, data_type="Val"):
        test_loss = 0
        test_acc = 0
        self.class_model.eval()
        batch_sizes = []
        
        ## Iterate over all dataset
        with torch.no_grad():
            for X_batch, y_batch in data_loader:

                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                y_batch = nn.functional.one_hot(y_batch.long(), num_classes=4).float()
                
                y_pred = self.class_model(X_batch)
                loss = self.criterion(y_pred, y_batch)
                acc = classification_acc(y_pred, y_batch)
                
                batch_sizes.append(len(y_batch))
                
                test_loss += (len(y_batch) / batch_sizes[0]) * loss.item()
                test_acc += (len(y_batch) / batch_sizes[0]) * acc.item()
                
        nrml_factor = sum(batch_sizes) / batch_sizes[0]

        print(data_type + f' | Loss: {test_loss/nrml_factor:.5f} | Acc: {test_acc/nrml_factor:.3f}')
        
        return test_acc/nrml_factor

    def predict(self, data_loader):
        data = torch.empty((1,2))
        #pdb.set_trace()
        preds = torch.empty((0))
        y_real = torch.empty((0))
        self.class_model.eval()
        
        ## Iterate over all dataset
        with torch.no_grad():
            for X_batch, y_batch in data_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)
                
                y_pred = self.class_model(X_batch)

                y_pred_tag = torch.softmax(y_pred, dim=1).argmax(dim=1)

                #data = torch.cat((data, X_batch))
                preds = torch.cat((preds, y_pred_tag))
                y_real = torch.cat((y_real, y_batch))

        return preds, y_real

In [5]:
# Fifth code block - training phase
torch.manual_seed(10)

## Train parameters
EPOCHS = 20
LEARNING_RATE = 0.001
device = 'cpu'

## Creating model
input_dim = X_training.shape[1]
class_model = classModel(input_dim)
class_model.to(device)

## Training model
trainer = Trainer(class_model, LEARNING_RATE)
trainer.train(training_loader, val_loader, EPOCHS)

print()
for ii in range(len(val_datasets_loader_list)):
    trainer.evaluate(val_datasets_loader_list[ii], val_datasets_names[ii])

Epoch 001: | BCE Loss: 1.22814 | Train Acc: 80.130
Val | Loss: 0.83989 | Acc: 100.000
Epoch 002: | BCE Loss: 0.76254 | Train Acc: 100.000
Val | Loss: 0.74727 | Acc: 100.000
Epoch 003: | BCE Loss: 0.74594 | Train Acc: 100.000
Val | Loss: 0.74495 | Acc: 100.000
Epoch 004: | BCE Loss: 0.74467 | Train Acc: 100.000
Val | Loss: 0.74433 | Acc: 100.000
Epoch 005: | BCE Loss: 0.74425 | Train Acc: 100.000
Val | Loss: 0.74408 | Acc: 100.000
Epoch 006: | BCE Loss: 0.74405 | Train Acc: 100.000
Val | Loss: 0.74395 | Acc: 100.000
Epoch 007: | BCE Loss: 0.74394 | Train Acc: 100.000
Val | Loss: 0.74387 | Acc: 100.000
Epoch 008: | BCE Loss: 0.74387 | Train Acc: 100.000
Val | Loss: 0.74382 | Acc: 100.000
Epoch 009: | BCE Loss: 0.74383 | Train Acc: 100.000
Val | Loss: 0.74379 | Acc: 100.000
Epoch 010: | BCE Loss: 0.74380 | Train Acc: 100.000
Val | Loss: 0.74377 | Acc: 100.000
Epoch 011: | BCE Loss: 0.74377 | Train Acc: 100.000
Val | Loss: 0.74375 | Acc: 100.000
Epoch 012: | BCE Loss: 0.74375 | Train Acc: 

In [6]:
# six code block - test phase

trainer.evaluate(test_loader, "Test")

Test | Loss: 0.74372 | Acc: 100.000


100.0

In [41]:
y_test = y_NBSWT # ['CWRU', 'MFPT', 'PU', 'XJTU_SY', 'PRONOSTIA-FEMTO', 'NBSWT']

In [42]:
y_prd = trainer.predict(val_datasets_loader_list[4]) # val_datasets_loader_list test_loader
y_prd = y_prd[0]

In [43]:
y_prd

tensor([2., 2., 2., 2., 2., 2., 2., 3., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.,
        2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.,
        2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 2.])

In [44]:
#y_test = [1, 0, 1, 1, 0, 1, 0, 0]
#y_prd = [1, 0, 1, 1, 0, 0, 1, 0]

from sklearn.metrics import confusion_matrix

# Calculate confusion matrix
conf_matrix = confusion_matrix(y_test, y_prd)

print("Confusion Matrix:")
print(conf_matrix)

Confusion Matrix:
[[49  1]
 [ 0  0]]
