<a href="https://colab.research.google.com/github/viktorngkhnh/BearingData/blob/main/BearingData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
import os
import scipy.io
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Mount Drive
drive.mount('/content/drive')

DATA_PATH = '/content/drive/My Drive/Bearing'

if os.path.exists(DATA_PATH):
    print(f"{DATA_PATH}")
    print("Fld:", os.listdir(DATA_PATH))

In [None]:
# --- CELL 2: VISUALIZE RAW DATA (Soi d·ªØ li·ªáu th√¥) ---

def plot_raw_sample(root_dir):
    #Define
    target_folders = ["Normal", "Ball_007", "Ball_014", "Ball_021", "Ball_028"]

    plt.figure(figsize=(15, 10))
    plt.subplots_adjust(hspace=0.4)

    found_count = 0

    # Check fld
    if os.path.exists(root_dir):
        all_folders = os.listdir(root_dir)

        for i, target in enumerate(target_folders):

            folder_name = next((f for f in all_folders if target in f), None)

            if folder_name:
                folder_path = os.path.join(root_dir, folder_name)
                # Get .mat
                files = [f for f in os.listdir(folder_path) if f.endswith('.mat')]

                if files:
                    file_path = os.path.join(folder_path, files[0])
                    try:

                        mat = scipy.io.loadmat(file_path)
                        key = [k for k in mat.keys() if 'DE_time' in k][0]
                        signal = mat[key].flatten()

                        #plt.subplot(2, 2, i + 1)
                        plt.figure(figsize=(15, 5))
                        plt.ylim(-2 , 2)
                        plt.plot(signal[:12000])
                        plt.title(f"M·∫´u: {target} (File: {files[0]})")
                        plt.ylabel("Bi√™n ƒë·ªô (Amplitude)")
                        plt.grid(True)
                        found_count += 1
                    except Exception as e:
                        print(f"L·ªói ƒë·ªçc file {files[0]}: {e}")
            else:
                print(f"‚ö†Ô∏è Kh√¥ng th·∫•y folder n√†o ch·ª©a t√™n '{target}'")

    if found_count == 4:
        print("‚úÖ ƒê√£ v·∫Ω ƒë·ªß 4 lo·∫°i t√≠n hi·ªáu. H√£y quan s√°t h√¨nh b√™n d∆∞·ªõi!")
    else:
        print(f"‚ö†Ô∏è Ch·ªâ t√¨m th·∫•y {found_count}/4 lo·∫°i d·ªØ li·ªáu.")
    plt.show()


plot_raw_sample(DATA_PATH)

In [None]:

WINDOW_SIZE = 2048
STRIDE = 1024
BATCH_SIZE = 32

class CWRU_Raw_Dataset(Dataset):
    def __init__(self, root_dir):
        self.data = []
        self.labels = []
        self.label_map = {"Normal": 0, "Ball_007": 1, "Ball_014": 2, "Ball_021": 3}

        for folder in os.listdir(root_dir):
            folder_path = os.path.join(root_dir, folder)
            if not os.path.isdir(folder_path): continue

            #Label
            label = -1
            for key, val in self.label_map.items():
                if key in folder:
                    label = val
                    break

            if label != -1:
                for file in os.listdir(folder_path):
                    if file.endswith('.mat'):
                        self._load_mat(os.path.join(folder_path, file), label)

        self.data = torch.tensor(np.array(self.data), dtype=torch.float32).unsqueeze(1)
        self.labels = torch.tensor(np.array(self.labels), dtype=torch.long)
        print(f"üì¶ Dataset Info: {self.data.shape} (M·∫´u, K√™nh, ƒê·ªô d√†i)")

    def _load_mat(self, path, label):
        try:
            mat = scipy.io.loadmat(path)
            key = [k for k in mat.keys() if 'DE_time' in k][0]
            sig = mat[key].flatten()
            for i in range(0, len(sig) - WINDOW_SIZE, STRIDE):
                self.data.append(sig[i : i + WINDOW_SIZE])
                self.labels.append(label)
        except: pass

    def __len__(self): return len(self.data)
    def __getitem__(self, idx): return self.data[idx], self.labels[idx]

full_dataset = CWRU_Raw_Dataset(DATA_PATH)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_set, test_set = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)
print("‚úÖ D·ªØ li·ªáu ƒë√£ s·∫µn s√†ng!")

In [None]:

class RawCNN_1D(nn.Module):
    def __init__(self, num_classes=4):
        super(RawCNN_1D, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, 64, stride=2, padding=1)
        self.bn1 = nn.BatchNorm1d(16)
        self.conv2 = nn.Conv1d(16, 32, 32, stride=2, padding=1)
        self.bn2 = nn.BatchNorm1d(32)
        self.conv3 = nn.Conv1d(32, 64, 16, stride=2, padding=1)
        self.bn3 = nn.BatchNorm1d(64)
        self.pool = nn.MaxPool1d(2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(1664, 128) # K√≠ch th∆∞·ªõc ƒë√£ fix chu·∫©n
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RawCNN_1D(num_classes=4).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

print("üöÄ B·∫Øt ƒë·∫ßu Train...")
for epoch in range(15):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Epoch {epoch+1}/15 | Loss: {running_loss/len(train_loader):.4f} | Acc: {100*correct/total:.2f}%")

In [None]:
model.eval()
all_preds = []
all_labels = []

print("üìä ƒêang ch·∫•m thi tr√™n t·∫≠p Test...")
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# T√≠nh Accuracy th·ª±c t·∫ø
acc = 100 * sum(np.array(all_preds) == np.array(all_labels)) / len(all_labels)
print(f"üéØ ƒê·ªò CH√çNH X√ÅC TH·ª∞C T·∫æ (TEST): {acc:.2f}%")

# V·∫Ω Matrix
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=["Normal", "Ball_007", "Ball_014", "Ball_021"],
            yticklabels=["Normal", "Ball_007", "Ball_014", "Ball_021"])
plt.xlabel('D·ª± ƒëo√°n')
plt.ylabel('Th·ª±c t·∫ø')
plt.title('Confusion Matrix (K·∫øt qu·∫£ cu·ªëi c√πng)')
plt.show()

In [None]:
# FAN-END (FE)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import os

#Path
FE_PATH = '/content/drive/My Drive/Bearing/FE_data'


print(f"üïµÔ∏è ƒêang qu√©t d·ªØ li·ªáu Fan-End t·∫°i: {FE_PATH}")
fe_dataset = CWRU_Raw_Dataset(FE_PATH)

if len(fe_dataset) > 0:
    # Batch size train
    fe_loader = DataLoader(fe_dataset, batch_size=32, shuffle=False)


    model.eval() # test
    all_preds = []
    all_labels = []
    correct = 0
    total = 0


    with torch.no_grad():
        for inputs, labels in fe_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # ƒê∆∞a qua model DE c≈©
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())


    acc = 100 * correct / total
    print("="*40)
    print(f"üò± K·∫æT QU·∫¢ FAN-END ACCURACY: {acc:.2f}%")
    print("="*40)


    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Reds',
                xticklabels=["Normal", "Ball_007", "Ball_014", "Ball_021"],
                yticklabels=["Normal", "Ball_007", "Ball_014", "Ball_021"])
    plt.xlabel('Model d·ª± ƒëo√°n (D·ª±a tr√™n ki·∫øn th·ª©c DE)')
    plt.ylabel('Th·ª±c t·∫ø (D·ªØ li·ªáu FE)')
    plt.title(f'K·∫øt qu·∫£ Cross-Domain (Acc: {acc:.2f}%)')
    plt.show()



In [None]:
# Mount Drive
drive.mount('/content/drive')

DATA_PATH = '/content/drive/My Drive/Bearing/FE_data'

if os.path.exists(DATA_PATH):
    print(f"{DATA_PATH}")
    print("Fld:", os.listdir(DATA_PATH))

In [None]:
def plot_raw_sample(root_dir):
    #Define
    target_folders = ["Normal", "Ball_007", "Ball_014", "Ball_021"]

    plt.figure(figsize=(15, 10))
    plt.subplots_adjust(hspace=0.4)

    found_count = 0

    # Check fld
    if os.path.exists(root_dir):
        all_folders = os.listdir(root_dir)

        for i, target in enumerate(target_folders):

            folder_name = next((f for f in all_folders if target in f), None)

            if folder_name:
                folder_path = os.path.join(root_dir, folder_name)
                # Get .mat
                files = [f for f in os.listdir(folder_path) if f.endswith('.mat')]

                if files:
                    file_path = os.path.join(folder_path, files[0])
                    try:

                        mat = scipy.io.loadmat(file_path)
                        key = [k for k in mat.keys() if 'DE_time' in k][0]
                        signal = mat[key].flatten()

                        #plt.subplot(2, 2, i + 1)
                        plt.figure(figsize=(15, 5))
                        plt.ylim(-2 , 2)
                        plt.plot(signal[:12000])
                        plt.title(f"M·∫´u: {target} (File: {files[0]})")
                        plt.ylabel("Bi√™n ƒë·ªô (Amplitude)")
                        plt.grid(True)
                        found_count += 1
                    except Exception as e:
                        print(f"L·ªói ƒë·ªçc file {files[0]}: {e}")
            else:
                print(f"‚ö†Ô∏è Kh√¥ng th·∫•y folder n√†o ch·ª©a t√™n '{target}'")

    if found_count == 4:
        print("‚úÖ ƒê√£ v·∫Ω ƒë·ªß 4 lo·∫°i t√≠n hi·ªáu. H√£y quan s√°t h√¨nh b√™n d∆∞·ªõi!")
    else:
        print(f"‚ö†Ô∏è Ch·ªâ t√¨m th·∫•y {found_count}/4 lo·∫°i d·ªØ li·ªáu.")
    plt.show()


plot_raw_sample(DATA_PATH)