In [None]:
import os
import numpy as np
import scipy.io
import torch
import torchaudio.transforms as T
import matplotlib.pyplot as plt
from torchvision import transforms
from PIL import Image


In [None]:
def load_mat_file(file_path):
    """Load vibration signal from a .mat file, extracting from the correct field."""
    mat_data = scipy.io.loadmat(file_path, struct_as_record=False, squeeze_me=True)
    if 'bearing' in mat_data:
        structured_array = mat_data['bearing']
        if hasattr(structured_array, 'gs') and isinstance(structured_array.gs, np.ndarray):
            signal = structured_array.gs.squeeze()
            if signal.ndim == 1 and signal.size > 0:
                return signal
    return None

def generate_spectrogram(signal, sample_rate=10000, n_fft=256, hop_length=128):
    """Convert a 1D vibration signal to a 2D spectrogram using STFT."""
    spectrogram_transform = T.Spectrogram(n_fft=n_fft, hop_length=hop_length, power=2)
    signal_tensor = torch.tensor(signal, dtype=torch.float32).unsqueeze(0)
    spectrogram = spectrogram_transform(signal_tensor)
    return spectrogram.squeeze().numpy()

def preprocess_spectrogram(spectrogram):
    """Normalize and resize spectrogram to 224x224 pixels."""
    spectrogram = (spectrogram - spectrogram.min()) / (spectrogram.max() - spectrogram.min() + 1e-6)
    spectrogram = (spectrogram * 255).astype(np.uint8)
    image = Image.fromarray(spectrogram)
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    return transform(image)

def process_dataset(dataset_path):
    """Process all .mat files in the dataset directory and convert them to spectrograms."""
    spectrograms = []
    labels = []
    label_map = {'1 - Three Baseline Conditions': 0, 
                '2 - Three Outer Race Fault Conditions': 1, 
                '3 - Seven More Outer Race Fault Conditions': 2, 
                '4 - Seven Inner Race Fault Conditions': 3}  # Mapping fault conditions 

    
    for label, label_id in label_map.items():
        folder_path = os.path.join(dataset_path, label)
        if not os.path.exists(folder_path):
            print(f"Warning: Folder {folder_path} not found.")
            continue
        
        for file in os.listdir(folder_path):
            if file.endswith('.mat'):
                file_path = os.path.join(folder_path, file)
                signal = load_mat_file(file_path)
                if signal is not None:
                    spec = generate_spectrogram(signal)
                    spec_tensor = preprocess_spectrogram(spec)
                    spectrograms.append(spec_tensor)
                    labels.append(label_id)
                else:
                    print(f"Warning: No valid signal found in {file_path}")
    
    if not spectrograms:
        raise RuntimeError("No valid data found in the dataset. Ensure .mat files contain the correct 'bearing' structure.")
    
    return torch.stack(spectrograms), torch.tensor(labels)

In [None]:
dataset_path = "data"
try:
    x_data, y_labels = process_dataset(dataset_path)
    print("Processed Data Shape:", x_data.shape, "Labels Shape:", y_labels.shape)
except RuntimeError as e:
    print("Error:", e)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, Dataset
import os
import numpy as np
from PIL import Image


In [None]:
# Hybrid Model: ResNet-50 + EfficientNet-B0
class HybridResNetEfficientNet(nn.Module):
    def __init__(self, num_classes=4):
        super(HybridResNetEfficientNet, self).__init__()
        
        # Load pre-trained ResNet-50 as feature extractor
        resnet = models.resnet50(pretrained=True)
        self.resnet_features = nn.Sequential(*list(resnet.children())[:-2])  # Remove last FC layer
        for param in self.resnet_features.parameters():
            param.requires_grad = False  # Freeze ResNet weights
        
        # Transition layer to align feature dimensions
        self.transition = nn.Conv2d(in_channels=2048, out_channels=1280, kernel_size=1)
        
        # Load pre-trained EfficientNet-B0 as classifier
        efficientnet = models.efficientnet_b0(pretrained=True)
        self.efficientnet_features = efficientnet.features  # Extract feature layers
        
        # Custom classifier head
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(1280, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        x = self.resnet_features(x)  # Feature extraction via ResNet-50
        x = self.transition(x)  # Dimension alignment
        x = self.efficientnet_features(x)  # Feature extraction via EfficientNet-B0
        x = self.classifier(x)  # Classification head
        return x

# Custom Dataset Class
class VibrationDataset(Dataset):
    def __init__(self, data_tensors, labels):
        self.data_tensors = data_tensors
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        image = self.data_tensors[idx]
        label = self.labels[idx]
        return image, label

# Load and Split Dataset
def prepare_dataloaders(data, labels, batch_size=16):
    dataset = VibrationDataset(data, labels)
    train_size = int(0.8 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])
    
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
    
    return train_loader, val_loader, test_loader

# Training Function
def train_model(model, train_loader, val_loader, num_epochs=20, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_val_loss = float('inf')
    early_stop_count = 0
    
    for epoch in range(num_epochs):
        model.train()
        train_loss, correct, total = 0, 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()
            total += labels.size(0)
        
        train_acc = correct / total
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)
        print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stop_count = 0
            torch.save(model.state_dict(), "best_model.pth")
        else:
            early_stop_count += 1
            if early_stop_count >= 5:
                print("Early stopping triggered.")
                break

# Evaluation Function
def evaluate_model(model, data_loader, criterion, device):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()
            total += labels.size(0)
    
    return total_loss / len(data_loader), correct / total

In [None]:
# Hybrid Model: ResNet-50 + EfficientNet-B0
class HybridResNetEfficientNet(nn.Module):
    def __init__(self, num_classes=4):
        super(HybridResNetEfficientNet, self).__init__()
        
        # Load pre-trained ResNet-50 as feature extractor
        resnet = models.resnet50(pretrained=True)
        self.resnet_features = nn.Sequential(*list(resnet.children())[:-2])  # Remove last FC layer
        for param in self.resnet_features.parameters():
            param.requires_grad = False  # Freeze ResNet weights
        
        # Transition layer to align feature dimensions
        self.transition = nn.Conv2d(in_channels=2048, out_channels=1280, kernel_size=1)
        
        # Load pre-trained EfficientNet-B0 as classifier
        efficientnet = models.efficientnet_b0(pretrained=True)
        self.efficientnet_features = efficientnet.features  # Extract feature layers
        
        # Custom classifier head
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(1280, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        x = self.resnet_features(x)  # Feature extraction via ResNet-50
        x = self.transition(x)  # Dimension alignment
        x = self.efficientnet_features(x)  # Feature extraction via EfficientNet-B0
        x = self.classifier(x)  # Classification head
        return x

# Custom Dataset Class
class VibrationDataset(Dataset):
    def __init__(self, data_tensors, labels):
        self.data_tensors = data_tensors
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        image = self.data_tensors[idx]
        label = self.labels[idx]
        return image, label

# Load and Split Dataset
def prepare_dataloaders(data, labels, batch_size=16):
    dataset = VibrationDataset(data, labels)
    train_size = int(0.8 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])
    
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
    
    return train_loader, val_loader, test_loader

# Training Function
def train_model(model, train_loader, val_loader, num_epochs=20, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_val_loss = float('inf')
    early_stop_count = 0
    
    for epoch in range(num_epochs):
        model.train()
        train_loss, correct, total = 0, 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()
            total += labels.size(0)
        
        train_acc = correct / total
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)
        print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stop_count = 0
            torch.save(model.state_dict(), "best_model.pth")
        else:
            early_stop_count += 1
            if early_stop_count >= 5:
                print("Early stopping triggered.")
                break

# Evaluation Function
def evaluate_model(model, data_loader, criterion, device):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()
            total += labels.size(0)
    
    return total_loss / len(data_loader), correct / total

In [None]:
train_loader, val_loader, test_loader = prepare_dataloaders(x_data, y_labels, batch_size=16)
        
model = HybridResNetEfficientNet(num_classes=4)
train_model(model, train_loader, val_loader, num_epochs=20, lr=0.001)

print("Training complete! Evaluating on test set...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.load_state_dict(torch.load("best_model.pth"))
model.to(device)
criterion = nn.CrossEntropyLoss()
test_loss, test_acc = evaluate_model(model, test_loader, criterion, device)