In [2]:
import torch
print(torch.__version__)


2.5.1


In [11]:
import os
import scipy.io
import numpy as np
from scipy.signal.windows import hann
from scipy.fft import fft
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder



In [4]:
# STFT Parameters
fs = 12000
fi = 100
Nseg1 = int(fs / fi)
Nseg2 = Nseg1 // 2
Nseg3 = Nseg1 // 4
Nfft = 256
overlap_ratio = 0.9
target_shape = (128, 64, 3)  #  Add this line


In [5]:
# Fault class label mapping
def get_class_label(fault_type, diameter, folder):
    if folder.lower() == 'normal':
        return 'c0'
    if fault_type == 'B':
        return {'007': 'c1', '014': 'c2', '021': 'c3', '028': 'c10'}.get(diameter, 'Unknown')
    elif fault_type == 'IR':
        return {'007': 'c4', '014': 'c5', '021': 'c6', '028': 'c11'}.get(diameter, 'Unknown')
    elif fault_type == 'OR':
        return {'007': 'c7', '014': 'c8', '021': 'c9', '028': 'c12'}.get(diameter, 'Unknown')
    return 'Unknown'


In [6]:
# STFT computation function
def compute_stft(x, Nseg, Nfft, overlap_ratio=0.9):
    Nover = int(Nseg * overlap_ratio)
    Nstep = Nseg - Nover
    Nframe = (len(x) - Nseg) // Nstep
    X = np.zeros((Nframe, Nfft))
    window = hann(Nseg)
    for n in range(Nframe):
        start = n * Nstep
        end = start + Nseg
        segment = x[start:end] * window
        spectrum = np.abs(fft(segment, Nfft))
        X[n, :] = spectrum
    return X.T


In [7]:
# Function: Pad or truncate tensor to fixed shape
def fix_tensor_shape(tensor, target_shape=(128, 64, 3)):
    h, w, c = tensor.shape
    H, W, C = target_shape

    # Truncate if needed
    tensor = tensor[:H, :W, :C]

    # Pad if smaller
    padded = np.zeros(target_shape)
    padded[:tensor.shape[0], :tensor.shape[1], :tensor.shape[2]] = tensor
    return padded


In [8]:


# Directory with .mat files
base_dir = '/home/rsimhadr/Downloads/CWRU-dataset/12k_Fan_End_Bearing_Fault_Data'
processed_data = []

print(" Starting delta STFT preprocessing...\n")

# Traverse all .mat files
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith('.mat'):
            mat_path = os.path.join(root, file)
            print(f" Processing file: {mat_path}")

            try:
                mat_data = scipy.io.loadmat(mat_path)
                signal_keys = [k for k in mat_data.keys() if '_time' in k and not k.startswith('__')]
                print(f"     Found keys: {signal_keys}")

                rel_path = os.path.relpath(mat_path, base_dir)
                path_parts = rel_path.split(os.sep)

                if len(path_parts) >= 3:
                    fault_type = path_parts[0]
                    diameter = path_parts[1]
                else:
                    fault_type = diameter = 'Unknown'

                folder = path_parts[0]
                class_label = get_class_label(fault_type, diameter, folder)
                print(f"     Fault: {fault_type}, Diameter: {diameter}, Label: {class_label}")

                for key in signal_keys:
                    signal = mat_data[key].squeeze()
                    print(f"      Signal key: {key} | Length: {len(signal)}")

                    # Compute STFTs at multiple resolutions
                    X1 = compute_stft(signal, Nseg1, Nfft)
                    X2 = compute_stft(signal, Nseg2, Nfft)
                    X3 = compute_stft(signal, Nseg3, Nfft)

                    # Align time axis
                    min_frames = min(X1.shape[1], X2.shape[1], X3.shape[1])
                    X1, X2, X3 = X1[:, :min_frames], X2[:, :min_frames], X3[:, :min_frames]

                    # Compute delta STFTs
                    D1 = X1 - X2
                    D2 = X1 - X3
                    D3 = X2 - X3

                    raw_tensor = np.stack([D1[:128], D2[:128], D3[:128]], axis=-1)
                    delta_stft_tensor = fix_tensor_shape(raw_tensor, target_shape=target_shape)

                    # Store sample
                    processed_data.append({
                        "tensor": delta_stft_tensor,
                        "label": class_label,
                        "file_path": mat_path,
                        "key": key
                    })

            except Exception as e:
                print(f" Error processing {mat_path}: {e}")

print(f"\n Finished preprocessing. Total delta STFT samples prepared: {len(processed_data)}")


 Starting delta STFT preprocessing...

 Processing file: /home/rsimhadr/Downloads/CWRU-dataset/12k_Fan_End_Bearing_Fault_Data/B/021/290_0.mat
     Found keys: ['X290_DE_time', 'X290_FE_time', 'X290_BA_time']
     Fault: B, Diameter: 021, Label: c3
      Signal key: X290_DE_time | Length: 121351
      Signal key: X290_FE_time | Length: 121351
      Signal key: X290_BA_time | Length: 121351
 Processing file: /home/rsimhadr/Downloads/CWRU-dataset/12k_Fan_End_Bearing_Fault_Data/B/021/292_2.mat
     Found keys: ['X292_DE_time', 'X292_FE_time', 'X292_BA_time']
     Fault: B, Diameter: 021, Label: c3
      Signal key: X292_DE_time | Length: 121535
      Signal key: X292_FE_time | Length: 121535
      Signal key: X292_BA_time | Length: 121535
 Processing file: /home/rsimhadr/Downloads/CWRU-dataset/12k_Fan_End_Bearing_Fault_Data/B/021/291_1.mat
     Found keys: ['X291_DE_time', 'X291_FE_time', 'X291_BA_time']
     Fault: B, Diameter: 021, Label: c3
      Signal key: X291_DE_time | Length: 12135

In [9]:
# STEP 1: Extract tensors and labels
X = np.array([item["tensor"] for item in processed_data])  # shape: (N, 128, 64, 3)
y = np.array([item["label"] for item in processed_data])   # shape: (N,)

print(f" Raw dataset shape → X: {X.shape}, y: {y.shape}")

# STEP 2: Encode class labels (e.g., 'c0' → 0, 'c1' → 1, ...)
le = LabelEncoder()
y_encoded = le.fit_transform(y)
print(f" Encoded labels: {np.unique(y_encoded)} | Total classes: {len(le.classes_)}")

# STEP 3: Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded, test_size=0.2, stratify=y_encoded, random_state=42
)

# STEP 4: Show final dataset sizes
print(f" Train data shape: X_train = {X_train.shape}, y_train = {y_train.shape}")
print(f" Test  data shape: X_test  = {X_test.shape}, y_test  = {y_test.shape}")

 Raw dataset shape → X: (135, 128, 64, 3), y: (135,)
 Encoded labels: [0 1 2 3 4 5 6 7 8] | Total classes: 9
 Train data shape: X_train = (108, 128, 64, 3), y_train = (108,)
 Test  data shape: X_test  = (27, 128, 64, 3), y_test  = (27,)


In [10]:
labels = [item["label"] for item in processed_data]
unique_classes = sorted(set(labels))

print(f" Unique class labels: {unique_classes}")
print(f" Total number of unique fault classes: {len(unique_classes)}")



 Unique class labels: ['c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9']
 Total number of unique fault classes: 9


In [13]:

# Convert numpy arrays to torch tensors and reorder to (N, C, H, W)
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 3, 1, 2)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).permute(0, 3, 1, 2)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Create datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(" DataLoaders are ready.")


 DataLoaders are ready.


In [14]:
# Check one batch from train_loader
train_batch = next(iter(train_loader))
X_batch, y_batch = train_batch

print(" Train Loader Batch Shape:")
print("X_batch shape:", X_batch.shape)  # (batch_size, 3, 128, 64)
print("y_batch shape:", y_batch.shape)  # (batch_size,)

# Check one batch from test_loader
test_batch = next(iter(test_loader))
X_test_batch, y_test_batch = test_batch

print("\n Test Loader Batch Shape:")
print("X_batch shape:", X_test_batch.shape)
print("y_batch shape:", y_test_batch.shape)


 Train Loader Batch Shape:
X_batch shape: torch.Size([32, 3, 128, 64])
y_batch shape: torch.Size([32])

 Test Loader Batch Shape:
X_batch shape: torch.Size([27, 3, 128, 64])
y_batch shape: torch.Size([27])


In [11]:
print(f"Total samples: {len(processed_data)}")
print(processed_data[0].keys())


Total samples: 135
dict_keys(['tensor', 'label', 'file_path', 'key'])


In [12]:
print(processed_data[0]['tensor'].shape)


(128, 10102, 3)


In [13]:
labels = [d['label'] for d in processed_data]
unique_labels = sorted(set(labels))
print(f"Number of unique labels: {len(unique_labels)}")
print("Labels:", unique_labels)


Number of unique labels: 9
Labels: ['c1', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9']
