Các lưu ý: 
 - Set random state để có thể tái tạo kết quả 
 - Chia file thành các phần nhỏ 

In [1]:
import numpy as np 
import os
import scipy.io
from collections import Counter
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn 
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset 
from torchinfo import summary

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from sklearn.manifold import TSNE

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

SAMPLE_LENGTH = 1024

SENSOR = 'all'
LOAD_LEVELS = [0,1]
FAULT_TYPE = list(range(10))
NUM_CLASSES = len(FAULT_TYPE)

SIZE_INPUT = int(SAMPLE_LENGTH**(1/2))
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.001

NORMAL_SAMPLES_USED = 800
CLASS_RATIOS = {0: 16, 
        1: 1,
        2: 1,
        3: 1,
        4: 1,
        5: 1,
        6: 1,
        7: 1,
        8: 1,
        9: 1}

# DATA HANDLING
def import_file(bearing = 'DE', fault_type = 0, load_level = 0, sensor = 'all', print_infor = False, sample_length = 1024, base_path = 'CWRU-dataset-main'):
    """
    Import data 

    Args:
        bearing (str): 'DE' or 'FE'
        fault_type (int): label from 0 to 9 
        load_level (int): 0 HP, 1 HP, 2 HP
        sensor (str) : 'DE', 'FE', 'BA', 'all'
        base_path (str): Base path to dataset

        output: data (numpy array)
    """
    if bearing == 'DE': 
        base_data_directory = os.path.join(base_path, '12k_Drive_End_Bearing_Fault_Data/')
    else: 
        base_data_directory = os.path.join(base_path, '12k_Fan_End_Bearing_Fault_Data/')

    fault_dict = {
        0 : 'Normal/',
        1 : 'B/007/',
        2 : 'B/014/',
        3 : 'B/021/',
        4 : 'IR/007/',
        5 : 'IR/014/',
        6 : 'IR/021/',
        7 : 'OR/007/@6/',
        8 : 'OR/014/',
        9 : 'OR/021/@6/'
    }
    load_level_dict = {
        0 : '_0',
        1 : '_1',
        2 : '_2',
    }

    file_path = os.path.join(base_data_directory, fault_dict[fault_type])
    full_file_path = None

    try:
        file_list = os.listdir(file_path)
        for file in file_list: 
            full_path = os.path.join(file_path, file)
            if os.path.isfile(full_path): 
                if load_level_dict[load_level] in file: 
                    full_file_path = full_path

            if full_file_path:
                break
    except Exception as e: 
        print(f'File path not exists: {e}')
        return np.array([])

    if full_file_path is None:
        print(f'No file found for bearing={bearing}, fault_type={fault_type}, load_level={load_level}')
        return np.array([])

    data = np.array([])
    data_import = None
    mat_data = scipy.io.loadmat(full_file_path)
    keys_in_file = list(mat_data.keys())

    for key in keys_in_file: 
        if sensor == 'all' and ('DE' in key or 'FE' in key or 'BA' in key):
            data_import = mat_data[key].flatten()
            length = (len(data_import) // sample_length) * sample_length
            data_import = data_import[:length]
            data = np.append(data, data_import)
            if print_infor: 
                print('================= Import data =====================')
                print(f' - Process file: {full_file_path}')
                print(f' - Key: {key}')
                print(f' - Data length: {len(data)/sample_length}')
                print('===================================================')
        elif sensor in key:
            data = mat_data[key].flatten()
            length = (len(data)//sample_length) * sample_length
            data = data[:length]
            if print_infor: 
                print('================= Import data =====================')
                print(f' - Process file: {full_file_path}')
                print(f' - Key: {key}')
                print(f' - Data length: {len(data)/sample_length}')
                print('===================================================')
            break

    return data

def import_data(bearing = 'DE', load_level = list(range(3)), fault_type=list(range(10)), sensor = 'all', sample_length = 1024, base_path = 'CWRU-dataset-main'):
    """Enhanced import_data with better structure"""
    X_data = []
    Y_data = []

    for load in load_level: 
        for fault_type_label in fault_type:
            data = import_file(bearing, fault_type_label, load, sensor, False, sample_length, base_path)
            if len(data) > 0:
                data_reshaped = data.reshape(-1, sample_length)
                labels = np.full(data_reshaped.shape[0], fault_type_label)
                X_data.append(data_reshaped)
                Y_data.append(labels)

    if not X_data:
        raise ValueError("No data was loaded. Please check your parameters and file paths.")
    
    final_X_data = np.concatenate(X_data, axis=0)
    final_Y_data = np.concatenate(Y_data, axis=0)

    final_X_data = np.reshape(final_X_data, (-1, 1, SIZE_INPUT, SIZE_INPUT))
    
    print('Original data shape:', final_X_data.shape)
    print('Original labels shape:', final_Y_data.shape)

    return final_X_data, final_Y_data

def create_imbalanced_split(X_data, Y_data, normal_samples=None, class_ratios=None, test_size=0.3, random_state=42):
    """
    Create train/test split for imbalanced data
    
    Args:
        X_data: Input features
        Y_data: Labels
        normal_samples: Number of normal class samples to use (None = use all)
        class_ratios: Dictionary defining ratio between classes 
                     e.g., {0: 4, 4: 1, 1: 2, 7: 1} means normal:IR007:B007:OR007 = 4:1:2:1
        test_size: Proportion for test set (default 0.3)
        random_state: Random seed
    
    Returns:
        X_train, X_test, y_train, y_test
    """
    
    # Get class distribution
    unique_classes, class_counts = np.unique(Y_data, return_counts=True)

    # Step 1: Create balanced test set
    min_class_samples = min(class_counts)
    test_samples_per_class = int(min_class_samples * test_size)
    
    X_test_list = []
    y_test_list = []
    X_train_temp_list = []
    y_train_temp_list = []
    
    # Split each class separately to ensure balanced test set
    for cls in unique_classes:
        cls_indices = np.where(Y_data == cls)[0]
        X_cls = X_data[cls_indices]
        y_cls = Y_data[cls_indices]
        
        # Random split for this class
        X_cls_train, X_cls_test, y_cls_train, y_cls_test = train_test_split(
            X_cls, y_cls, test_size=test_samples_per_class, 
            random_state=random_state, stratify=None
        )
        
        X_test_list.append(X_cls_test)
        y_test_list.append(y_cls_test)
        X_train_temp_list.append(X_cls_train)
        y_train_temp_list.append(y_cls_train)
    
    # Combine test data
    X_test = np.concatenate(X_test_list, axis=0)
    y_test = np.concatenate(y_test_list, axis=0)
    
    # Combine remaining training data
    X_train_temp = np.concatenate(X_train_temp_list, axis=0)
    y_train_temp = np.concatenate(y_train_temp_list, axis=0)
    
    # Step 2: Create imbalanced training set based on user specifications
    if class_ratios is None:
        # If no ratios specified, use all remaining training data
        X_train = X_train_temp
        y_train = y_train_temp
    else:
        # Apply class ratios to create imbalanced training set
        X_train_list = []
        y_train_list = []
        
        # Determine number of samples for each class based on ratios
        normal_class = 0  # Assuming class 0 is normal
        
        if normal_samples is None:
            # Use all available normal samples
            normal_indices = np.where(y_train_temp == normal_class)[0]
            normal_samples = len(normal_indices)
        
        # Calculate samples for each class based on ratios
        normal_ratio = class_ratios.get(normal_class, 1)
        
        # First pass: determine actual normal samples to use as base
        normal_indices = np.where(y_train_temp == normal_class)[0]
        available_normal = len(normal_indices)
        actual_normal_samples = min(normal_samples, available_normal)
        
        for cls in unique_classes:
            cls_indices = np.where(y_train_temp == cls)[0]
            available_samples = len(cls_indices)
            
            if cls == normal_class:
                # Use the determined number of normal samples
                target_samples = actual_normal_samples
            else:
                # Calculate based on ratio to normal class
                cls_ratio = class_ratios.get(cls, 0)  # Default to 0 if class not in ratios
                if cls_ratio > 0:
                    target_samples = int((actual_normal_samples * cls_ratio) / normal_ratio)
                    target_samples = min(target_samples, available_samples)
                else:
                    target_samples = 0  # Skip classes not in ratios
            
            # Randomly sample the target number
            if target_samples > 0:
                selected_indices = np.random.choice(cls_indices, target_samples, replace=False)
                X_train_list.append(X_train_temp[selected_indices])
                y_train_list.append(y_train_temp[selected_indices])
        
        X_train = np.concatenate(X_train_list, axis=0)
        y_train = np.concatenate(y_train_list, axis=0)
    
    # Shuffle training data
    train_indices = np.random.permutation(len(X_train))
    X_train = X_train[train_indices]
    y_train = y_train[train_indices]
    
    # Shuffle test data
    test_indices = np.random.permutation(len(X_test))
    X_test = X_test[test_indices]
    y_test = y_test[test_indices]
    
    # Print final distribution
    print('='*50)
    print(f"Final split results:")
    print(f"Test set - Total samples: {len(y_test)}")
    test_unique, test_counts = np.unique(y_test, return_counts=True)
    for cls, count in zip(test_unique, test_counts):
        print(f"  Class {cls}: {count} samples")
    
    print(f"\nTrain set - Total samples: {len(y_train)}")
    train_unique, train_counts = np.unique(y_train, return_counts=True)
    for cls, count in zip(train_unique, train_counts):
        print(f"  Class {cls}: {count} samples")
    
    return X_train, X_test, y_train, y_test

def analyze_class_distribution(y_data, title="Class Distribution"):
    """Analyze and visualize class distribution"""
    unique_classes, counts = np.unique(y_data, return_counts=True)
    
    print('='*50)
    print(f"{title}:")
    for cls, count in zip(unique_classes, counts):
        percentage = (count / len(y_data)) * 100
        print(f"Class {cls}: {count} samples ({percentage:.2f}%)")
    
    return dict(zip(unique_classes, counts))

# Load data
X_data, Y_data = import_data(load_level=LOAD_LEVELS, fault_type=FAULT_TYPE, sensor=SENSOR, sample_length=SAMPLE_LENGTH)

# Analyze original distribution
analyze_class_distribution(Y_data, "Original Data Distribution")

# Create_imbalanced_split
X_train, X_test, y_train, y_test = create_imbalanced_split(
    X_data, Y_data, 
    normal_samples=NORMAL_SAMPLES_USED,  # Use only 500 normal samples
    class_ratios=CLASS_RATIOS
)


Original data shape: (7822, 1, 32, 32)
Original labels shape: (7822,)
Original Data Distribution:
Class 0: 1420 samples (18.15%)
Class 1: 711 samples (9.09%)
Class 2: 711 samples (9.09%)
Class 3: 711 samples (9.09%)
Class 4: 711 samples (9.09%)
Class 5: 708 samples (9.05%)
Class 6: 711 samples (9.09%)
Class 7: 714 samples (9.13%)
Class 8: 711 samples (9.09%)
Class 9: 714 samples (9.13%)
Final split results:
Test set - Total samples: 2120
  Class 0: 212 samples
  Class 1: 212 samples
  Class 2: 212 samples
  Class 3: 212 samples
  Class 4: 212 samples
  Class 5: 212 samples
  Class 6: 212 samples
  Class 7: 212 samples
  Class 8: 212 samples
  Class 9: 212 samples

Train set - Total samples: 1250
  Class 0: 800 samples
  Class 1: 50 samples
  Class 2: 50 samples
  Class 3: 50 samples
  Class 4: 50 samples
  Class 5: 50 samples
  Class 6: 50 samples
  Class 7: 50 samples
  Class 8: 50 samples
  Class 9: 50 samples


In [2]:
from imblearn.over_sampling import SMOTE
print('='*50)
print(f"Shape of X_train before SMOTE: {X_train.shape}")
print(f"Shape of y_train before SMOTE: {y_train.shape}")
analyze_class_distribution(y_train, "Training Data Distribution Before SMOTE")

Shape of X_train before SMOTE: (1250, 1, 32, 32)
Shape of y_train before SMOTE: (1250,)
Training Data Distribution Before SMOTE:
Class 0: 800 samples (64.00%)
Class 1: 50 samples (4.00%)
Class 2: 50 samples (4.00%)
Class 3: 50 samples (4.00%)
Class 4: 50 samples (4.00%)
Class 5: 50 samples (4.00%)
Class 6: 50 samples (4.00%)
Class 7: 50 samples (4.00%)
Class 8: 50 samples (4.00%)
Class 9: 50 samples (4.00%)


{0: 800, 1: 50, 2: 50, 3: 50, 4: 50, 5: 50, 6: 50, 7: 50, 8: 50, 9: 50}

In [5]:
# Reshape X_train for SMOTE
# Original shape: (n_samples, channels, height, width) e.g. (N, 1, 32, 32)
n_samples_train, channels, height, width = X_train.shape
X_train_reshaped = X_train.reshape(n_samples_train, channels * height * width) # (N, 1024)

# Initialize SMOTE
# Option 1: Balance all classes relative to the majority
# smote = SMOTE(random_state=42) # Default is 'auto' which is equivalent to 'not majority'
                                # or use sampling_strategy='all' to make all classes have same count as majority

# Option 2: Specify desired number of samples per class (RECOMMENDED for control)
# First, find out the current counts
train_counts_before_smote = Counter(y_train)
print(f"Counts before SMOTE: {train_counts_before_smote}")

# Example: Upsample all minority classes (1-9) to be, say, 80% of the normal class (0)
# or a fixed number like 500 samples each if available after create_imbalanced_split
# This needs careful thought based on your goals and initial counts from create_imbalanced_split

# Let's assume class 0 is the majority after create_imbalanced_split
# and we want to upsample classes 1-9.
# A common strategy is 'not majority' which upsamples all minority classes to match the majority.
# Or, if you want finer control:
# target_samples_minority = int(train_counts_before_smote[0] * 0.8) # e.g., 80% of normal class
# sampling_strategy_custom = {cls: max(count, target_samples_minority) for cls, count in train_counts_before_smote.items()}
# for cls in range(NUM_CLASSES): # Ensure all classes are in the strategy
#    if cls not in sampling_strategy_custom:
#        sampling_strategy_custom[cls] = train_counts_before_smote.get(cls, 0) # Keep original count if not specified or 0
#    if cls != 0 and cls in train_counts_before_smote: # For minority classes
#        sampling_strategy_custom[cls] = max(train_counts_before_smote[cls], target_samples_minority)
#    elif cls == 0: # Keep majority class as is
#        sampling_strategy_custom[cls] = train_counts_before_smote[0]

# A simpler and often effective strategy: upsample all minority classes to match the majority.
# If class 0 is the majority as per your CLASS_RATIOS, then 'not majority' will upsample 1-9.
smote = SMOTE(sampling_strategy='not majority', random_state=42, k_neighbors=5)
# If you want all classes to have the same number of samples as the *original* majority class:
# smote = SMOTE(sampling_strategy='auto', random_state=42) # 'auto' is equivalent to 'not majority'
# If you want all classes to have the same number of samples (equal to the current majority after create_imbalanced_split):
# smote = SMOTE(sampling_strategy='all', random_state=42)

print(f"Applying SMOTE with strategy: {smote.sampling_strategy}")
X_train_smote, y_train_smote = smote.fit_resample(X_train_reshaped, y_train)

print('='*50)
print(f"Shape of X_train after SMOTE: {X_train_smote.shape}")
print(f"Shape of y_train after SMOTE: {y_train_smote.shape}")
analyze_class_distribution(y_train_smote, "Training Data Distribution After SMOTE")

# Reshape X_train_smote back to original image-like format
X_train_final = X_train_smote.reshape(-1, channels, height, width)

# Now use X_train_final and y_train_smote for your DataLoader
train_dataset = BearingDataset(X_train_final, y_train_smote)
# ... (rest of your code for DataLoader, model training, etc.)

Counts before SMOTE: Counter({0: 800, 6: 50, 9: 50, 3: 50, 2: 50, 4: 50, 5: 50, 1: 50, 8: 50, 7: 50})
Applying SMOTE with strategy: not majority
Shape of X_train after SMOTE: (8000, 1024)
Shape of y_train after SMOTE: (8000,)
Training Data Distribution After SMOTE:
Class 0: 800 samples (10.00%)
Class 1: 800 samples (10.00%)
Class 2: 800 samples (10.00%)
Class 3: 800 samples (10.00%)
Class 4: 800 samples (10.00%)
Class 5: 800 samples (10.00%)
Class 6: 800 samples (10.00%)
Class 7: 800 samples (10.00%)
Class 8: 800 samples (10.00%)
Class 9: 800 samples (10.00%)


In [3]:

class BearingDataset(Dataset): 
    def __init__(self, X_data, Y_data, is_train = True):
        self.data = torch.from_numpy(X_data).float()

        self.labels = torch.from_numpy(Y_data).long()
        self.is_train = is_train

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

train_dataset = BearingDataset(X_train, y_train)
val_dataset = BearingDataset(X_test, y_test, False)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle= True, num_workers= 0) 
val_loader = DataLoader(val_dataset, batch_size= BATCH_SIZE, shuffle=False, num_workers= 0)