In [6]:
import torch
import torch.nn as nn
from sklearn.metrics import classification_report
import torch.optim as optim
import gc
from tqdm import tqdm
import matplotlib.pyplot as plt
import multiprocessing
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


In [7]:

class MyDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform

        self.images = []
        self.labels = []
        self.class_to_idx = {}

        self.class_names = sorted(os.listdir(self.root_dir))

        for i, cls_name in enumerate(self.class_names):
            cls_path = os.path.join(self.root_dir,cls_name)
            for img_name in os.listdir(cls_path):
                img_path=os.path.join(cls_path,img_name)
                self.images.append(img_path)
                self.labels.append(i)
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        img_path=self.images[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label
    def get_class_names(self):
        return self.class_names

In [8]:
class CNN(nn.Module):
    def __init__ (self, num_classes):
        super(CNN, self).__init__()
        self.conv_layers = nn.Sequential(
            #(64,64)
            nn.Conv2d(1,8,3, padding =2 ), #(66)
            nn.MaxPool2d(2,2), #33
            nn.ReLU(),
            nn.BatchNorm2d(8),
            
            nn.Conv2d(8,32,3, padding =2 ),#35
            nn.MaxPool2d(2,2),#17
            nn.ReLU(),
            nn.BatchNorm2d(32),
            
            nn.Conv2d(32,64,3, padding =2 ),#19
            nn.MaxPool2d(2,2),#9
            nn.ReLU(),
            nn.BatchNorm2d(64),
            
            nn.Conv2d(64,128,3, padding =2 ),#11
            nn.MaxPool2d(2,2),#5
            nn.ReLU(),
            nn.BatchNorm2d(128)

        )
        self.fc_layers =nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*5*5,128),
            nn.Dropout(0.5),
            nn.Linear(128,256),
            nn.Dropout(0.5),
            nn.Linear(256,512),
            nn.Dropout(0.5),
            nn.Linear(512,num_classes)
        
        )
    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x


In [9]:
def evaluate_model(model, test_loader, device, verbose =True):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _,predicted = outputs.max(1)
            total +=labels.size(0)
            correct += predicted.eq(labels).sum().item()
            del images, labels, outputs, predicted
            torch.cuda.empty_cache()
            gc.collect()
        acc = 100.0 * correct/total
    # if verbose:
    #     print ("Test Accuracy: ",acc)
    return acc

In [10]:
def generate_classification_report(model, test_loader, class_names, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for image, label in test_loader:
            image, label = image.to(device), label.to(device)
            outputs = model(image)
            _, predicted = outputs.max(1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(label.cpu().numpy())
            del image, label, predicted, outputs
            torch.cuda.empty_cache()
            gc.collect()
    print(classification_report(all_labels,all_preds,target_names=class_names))

In [11]:
def plot_metrics(losses, train_accs, test_accs):
    epochs = range(1, len(losses) + 1)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, losses, 'r-o')
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')

    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accs, 'b-o', label='Train Acc')
    plt.plot(epochs, test_accs, 'g-o', label='Test Acc')
    plt.title('Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.savefig("training_metrics.png")  # Lưu biểu đồ nếu cần
    plt.show()

In [12]:
def train_model (model, train_loader, test_loader, device, epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    model.to(device)

    train_losses = []
    train_accuracies = []
    test_accuracies = []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct, total =0,0

        for image, label in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            image, label = image.to(device), label.to(device)
            optimizer.zero_grad()
            outputs = model(image)
            loss = criterion (outputs,label)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _,predicted = outputs.max(1)
            total += label.size(0)
            correct += predicted.eq(label).sum().item()

            del image, label, loss, outputs, predicted
            torch.cuda.empty_cache()
            gc.collect()

        train_acc = 100.0 *correct/total
        train_losses.append(running_loss)
        train_accuracies.append(train_acc)

        test_acc = evaluate_model(model, test_loader, device, verbose=True)
        test_accuracies.append(test_acc)

        print(f"Epoch {epoch+1}: Loss={running_loss}, Train Acc={train_acc}%, Test Acc={test_acc}%")
        torch.cuda.empty_cache()
        gc.collect()
    plot_metrics(train_losses, train_accuracies, test_accuracies)



In [13]:
device = torch.device("cuda"if torch.cuda.is_available() else "cpu")
print(device)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Grayscale(num_output_channels=1)
])

cpu


In [None]:


train_dataset_good = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\train\Good",transform=transform)
test_dataset_good = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\test\good",transform=transform)

train_dataloader_good = DataLoader(train_dataset_good, batch_size = 32, shuffle=True, drop_last = True, num_workers=3)
test_dataloader_good = DataLoader(test_dataset_good, batch_size = 32, num_workers=3)
class_names =train_dataset_good.get_class_names()
num_classes = len(train_dataset_good.get_class_names())

print(num_classes)

model = CNN(num_classes = num_classes)

train_model(model, train_dataloader_good, test_dataloader_good, device, epochs=15)
generate_classification_report (model, test_dataloader_good,class_names, device)
torch.save(model.state_dict(),r"E:\KPDL-CK\models\CNN\good.pth")
print("✅ Mô hình đã được lưu thành công.")

2


Epoch 1/15:   0%|          | 0/1875 [00:00<?, ?it/s]

In [None]:


train_dataset_metal = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\train\Metal_contamination",transform=transform)
test_dataset_metal = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\test\metal",transform=transform)

train_dataloader_metal = DataLoader(train_dataset_metal, batch_size = 32, shuffle=True, drop_last = True, num_workers=3)
test_dataloader_metal = DataLoader(test_dataset_metal, batch_size = 32, num_workers=3)

class_names =train_dataset_metal.get_class_names()

num_classes = 2

print(num_classes)

model_metal = CNN(num_classes = num_classes)

train_model(model_metal, train_dataloader_metal, test_dataloader_metal, device, epochs=10)
generate_classification_report (model_metal, test_dataloader_metal,class_names, device)
torch.save(model_metal.state_dict(),r"E:\KPDL-CK\models\CNN\metal.pth")
print("✅ Mô hình đã được lưu thành công.")

In [None]:


train_dataset_cut = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\train\cut",transform=transform)
test_dataset_cut = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\test\cut",transform=transform)

train_dataloader_cut = DataLoader(train_dataset_cut, batch_size = 32, shuffle=True, drop_last = True, num_workers=3)
test_dataloader_cut = DataLoader(test_dataset_cut, batch_size = 32, num_workers=3)

class_names =train_dataset_cut.get_class_names()

num_classes = 2
print(num_classes)

model_cut = CNN(num_classes = num_classes)

train_model(model_cut, train_dataloader_cut, test_dataloader_cut, device, epochs=15)
generate_classification_report (model_cut, test_dataloader_cut,class_names, device)
torch.save(model_cut.state_dict(),r"E:\KPDL-CK\models\CNN\cut.pth")
print("✅ Mô hình đã được lưu thành công.")

In [None]:


train_dataset_thread = MyDataset(root_dir=r"data/processed/dataset/train/Thread",transform=transform)
test_dataset_thread = MyDataset(root_dir=r"E:\KPDL-CK\data\processed\dataset\test\thread",transform=transform)

train_dataloader_thread = DataLoader(train_dataset_thread, batch_size = 32, shuffle=True, drop_last = True, num_workers=3)
test_dataloader_thread = DataLoader(test_dataset_thread, batch_size = 32, num_workers=3)

class_names =train_dataset_thread.get_class_names()

num_classes = 2
print(num_classes)

model_thread = CNN(num_classes = num_classes)

train_model(model_thread, train_dataloader_thread, test_dataloader_thread, device, epochs=15)
generate_classification_report (model_thread, test_dataloader_thread,class_names, device)
torch.save(model_thread.state_dict(),"thread.pth")
print("✅ Mô hình đã được lưu thành công.")

In [None]:
import zipfile
import os
from IPython.display import FileLink

def zip_dir(directory = os.curdir, file_name = 'directory.zip'):
    """
    zip all the files in a directory
    
    Parameters
    _____
    directory: str
        directory needs to be zipped, defualt is current working directory
        
    file_name: str
        the name of the zipped file (including .zip), default is 'directory.zip'
        
    Returns
    _____
    Creates a hyperlink, which can be used to download the zip file)
    """
    os.chdir(directory)
    zip_ref = zipfile.ZipFile(file_name, mode='w')
    for folder, _, files in os.walk(directory):
        for file in files:
            if file_name in file:
                pass
            else:
                zip_ref.write(os.path.join(folder, file))

    return FileLink(file_name)
zip_dir()
