In [1]:
# Readme
# !pip --no-cache-dir install -r requirements.txt # to install the dependencies
# !source data.sh # to preprocess the data (download, unzip, and move to new folder)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import sys

import torch
from torch import nn, optim
import torch.nn.functional as F
import torchvision
from torchvision.models import resnet50
import torchvision.transforms as transforms
from torchvision import datasets
from tqdm.auto import tqdm
from sklearn.metrics import classification_report

print('Python Version:', sys.version)
print("PyTorch version:", torch.__version__)
print("Torchvision version:", torchvision.__version__)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
train_dataset = datasets.ImageFolder(root='data/images/train', 
                                    transform=transforms.Compose([
                                        transforms.Resize(224),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

                                    ])) 

valid_dataset = datasets.ImageFolder(root='data/images/valid', 
                                    transform=transforms.Compose([
                                        transforms.Resize(224),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

                                    ]))

test_dataset = datasets.ImageFolder(root='data/images/test', 
                                    transform=transforms.Compose([
                                        transforms.Resize(224),
                                        transforms.ToTensor(),
                                        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                                    ]))  

# ResNetDefectClassifier

In [None]:
class ResNetDefectClassifier(nn.Module):
    def __init__(self, trainable=False):
        super(ResNetDefectClassifier, self).__init__()
        self.resnet = resnet50(weights='IMAGENET1K_V2')
        self.fc1 = nn.Linear(1000, 128)
        self.act = nn.ReLU()
        self.fc2 = nn.Linear(128, 4)
        for param in self.resnet.parameters():
            param.requires_grad = trainable
    
    def forward(self, x):
        x = self.resnet(x)
        x = self.fc1(x)
        x = self.act(x)
        x = self.fc2(x)
        return x

# CNNDefectClassifier

In [None]:
class CNNDefectClassifier(nn.Module):
    def __init__(self):
        super(CNNDefectClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool4 = nn.MaxPool2d(2, 2)
        self.conv5 = nn.Conv2d(128, 32, 1, padding=0)
        
        self.fc1 = nn.Linear(6272, 2048)
        self.fc2 = nn.Linear(2048, 256)
        self.fc3 = nn.Linear(256, 64)
        self.fc4 = nn.Linear(64, 4)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        x = self.pool4(F.relu(self.conv4(x)))
        x = F.relu(self.conv5(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x


# Train

In [None]:
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.stop = True
        return self.stop
    
def train(model, train_dataset, val_dataset, batch_size=32, num_epochs=30, learning_rate = 0.001, patience=3, device=device, save=True, save_name='model'):
    train_dataloader = torch.utils.data.DataLoader(train_dataset,         
                                           batch_size=batch_size,
                                           shuffle=True)    

    val_dataloader = torch.utils.data.DataLoader(val_dataset,         
                                           batch_size=batch_size,
                                           shuffle=True)   

    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(lr=learning_rate, params=model.parameters())
    early_stopping = EarlyStopping(patience=3)
    
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        
        progress_bar = tqdm(train_dataloader, desc="Training", unit="batch")
        for inputs, labels in progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                loss.backward()
                optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        epoch_loss = running_loss / len(train_dataloader.dataset)
        epoch_acc = running_corrects.double() / len(train_dataloader.dataset)

        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_acc)
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}")

        # Validation
        model.eval()
        val_loss = 0.0
        val_corrects = 0
        
        progress_bar = tqdm(val_dataloader, desc="Validating", unit="batch")
        with torch.no_grad():
            for inputs, labels in progress_bar:
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                val_corrects += torch.sum(preds == labels.data)
    
        val_loss = val_loss / len(val_dataloader.dataset)
        val_acc = val_corrects.double() / len(val_dataloader.dataset)

        val_losses.append(val_loss)
        val_accuracies.append(val_acc)
        print(f"Validation Epoch {epoch + 1}/{num_epochs}, Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")
        if early_stopping(val_loss):
            print("Early stopping triggered. Stopping training.")
            break
    if save == True:
        os.makedirs('model', exist_ok=True)
        torch.save(model.state_dict(), f'model/{save_name}-defect-classifier.pth')
    return model, (train_losses, train_accuracies, val_losses, val_accuracies)

def predict(model, test_dataset, device=device):
    test_dataloader = torch.utils.data.DataLoader(test_dataset,         
                                           batch_size=1,
                                           shuffle=True)  
    criterion = torch.nn.CrossEntropyLoss().to(device)
    model.eval()
    test_loss = 0.0
    test_corrects = 0
    y_pred = []
    y_true = []
    progress_bar = tqdm(test_dataloader, desc="Evaluating", unit="batch")
    with torch.no_grad():
        for inputs, labels in progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_pred.append(int(preds.data.cpu()[0]))
            y_true.append(int(labels.data.cpu()[0]))
            loss = criterion(outputs, labels)

            test_loss += loss.item() * inputs.size(0)
            test_corrects += torch.sum(preds == labels.data)
            
    print(classification_report(y_true, y_pred))
    test_loss = test_loss / len(test_dataloader.dataset)
    test_acc = test_corrects.double() / len(test_dataloader.dataset)
    print(f"Evaluating Loss: {test_loss:.4f}, Acc: {test_acc:.4f}")
    return (test_acc, test_loss)


In [None]:
def plot_loss_accuracy_epoch(train_losses, train_accuracies, val_losses, val_accuracies):
    fig, axs = plt.subplots(1, 2, figsize=(15, 5)) 
    axs[0].plot(train_losses, label="Training Loss")
    axs[0].plot(val_losses, label="Validation Loss")
    axs[0].set_xlabel("Epoch")
    axs[0].set_ylabel("Loss")
    axs[0].legend()
    axs[0].set_title("Loss vs. Epoch")

    axs[1].plot([i.item() for i in train_accuracies], label="Training Accuracy")
    axs[1].plot([i.item() for i in val_accuracies], label="Validation Accuracy")
    axs[1].set_xlabel("Epoch")
    axs[1].set_ylabel("Accuracy")
    axs[1].legend()
    axs[1].set_title("Accuracy vs. Epoch")

    plt.tight_layout()
    plt.show()

# ResNetDefectClassifier Tuning

## Batch Size [4, 16, 32, 64] & Learning Rate [0.001, 0.0005, 0.0001]

In [None]:
batch_sizes = [4, 16, 32, 64]

batch_resnet_model = {}
batch_resnet_loss = {}
batch_resnet_test = {}

for batch_size in batch_sizes:
    resnet = ResNetDefectClassifier(trainable=True)

    resnet, loss = train(resnet, train_dataset, valid_dataset, batch_size=batch_size, learning_rate=0.001, patience=5)
    test_result = predict(resnet, test_dataset)
    
    batch_resnet_model[batch_size] = resnet
    batch_resnet_loss[batch_size] = loss
    batch_resnet_test[batch_size] = test_result

In [None]:
accuracies = {lr: acc.item() for lr, (acc, loss) in batch_resnet_test.items()}
losses = {lr: loss for lr, (acc, loss) in batch_resnet_test.items()}

pos = np.arange(len(accuracies))
width = 0.35 

fig, ax = plt.subplots(figsize=(10,5))

bar1 = plt.bar(pos, accuracies.values(), width, alpha=0.7, color='b', label='Accuracy') 
bar2 = plt.bar([p + width for p in pos], losses.values(), width, alpha=0.7, color='r', label='Loss')

ax.set_xticks([p + 0.5 * width for p in pos])
ax.set_xticklabels(accuracies.keys())

plt.xlim(min(pos)-width, max(pos)+width*2)
plt.ylim([0, max(max(accuracies.values()), max(losses.values())) + 0.1])

plt.legend(loc='upper left')
plt.grid()
plt.title('Accuracy and Loss at Different Batch Size')
plt.xlabel('Batch Size')
plt.ylabel('Score')

def autolabel(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width() / 2., 1.05 * height,
                '%.4f' % float(height),
                ha='center', va='bottom')

autolabel(bar1)
autolabel(bar2)

plt.show()


In [None]:
lr = [0.0005, 0.0001]

lr_resnet_model = {}
lr_resnet_loss = {}
lr_resnet_test = {}

for rate in lr:
    resnet = ResNetDefectClassifier(trainable=True)

    resnet, loss = train(resnet, train_dataset, valid_dataset, batch_size=64, learning_rate=rate, patience=5)
    test_result = predict(resnet, test_dataset)
    
    lr_resnet_model[rate] = resnet
    lr_resnet_loss[rate] = loss
    lr_resnet_test[rate] = test_result

In [None]:
plot_loss_accuracy_epoch(*lr_resnet_loss[0.001])

In [None]:
plot_loss_accuracy_epoch(*lr_resnet_loss[0.0001])

# CNNDefectClassifier Tuning

## Batch Size [4, 16, 32, 64, 128] & Learning Rate [0.001, 0.0005, 0.0001]

In [None]:
batch_sizes = [4, 16, 32, 64, 128]

batch_cnn_model = {}
batch_cnn_loss = {}
batch_cnn_test = {}

for batch_size in batch_sizes:
    cnn = CNNDefectClassifier()

    cnn, loss = train(cnn, train_dataset, valid_dataset, batch_size=batch_size, learning_rate=0.001, patience=5)
    test_result = predict(cnn, test_dataset)

    batch_cnn_model[batch_size] = cnn
    batch_cnn_loss[batch_size] = loss
    batch_cnn_test[batch_size] = test_result

In [None]:
accuracies = {lr: acc.item() for lr, (acc, loss) in batch_cnn_test.items()}
losses = {lr: loss for lr, (acc, loss) in batch_cnn_test.items()}

pos = np.arange(len(accuracies))
width = 0.35 

fig, ax = plt.subplots(figsize=(10,5))

bar1 = plt.bar(pos, accuracies.values(), width, alpha=0.7, color='b', label='Accuracy') 
bar2 = plt.bar([p + width for p in pos], losses.values(), width, alpha=0.7, color='r', label='Loss')

ax.set_xticks([p + 0.5 * width for p in pos])
ax.set_xticklabels(accuracies.keys())

plt.xlim(min(pos)-width, max(pos)+width*2)
plt.ylim([0, max(max(accuracies.values()), max(losses.values())) + 0.1])

plt.legend(loc='upper left')
plt.grid()
plt.title('Accuracy and Loss at Different Batch Size')
plt.xlabel('Batch Size')
plt.ylabel('Score')

def autolabel(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width() / 2., 1.05 * height,
                '%.4f' % float(height),
                ha='center', va='bottom')

autolabel(bar1)
autolabel(bar2)

plt.show()


In [None]:
lr = [0.001, 0.0005, 0.0001]

lr_cnn_model = {}
lr_cnn_loss = {}
lr_cnn_test = {}

for rate in lr:
    cnn = CNNDefectClassifier()

    cnn, loss = train(cnn, train_dataset, valid_dataset, batch_size=128, learning_rate=rate, patience=5)
    test_result = predict(cnn, test_dataset)

    lr_cnn_model[rate] = cnn
    lr_cnn_loss[rate] = loss
    lr_cnn_test[rate] = test_result

In [None]:
accuracies = {lr: acc.item() for lr, (acc, loss) in lr_cnn_test.items()}
losses = {lr: loss for lr, (acc, loss) in lr_cnn_test.items()}

pos = np.arange(len(accuracies))
width = 0.35 

ddfig, ax = plt.subplots(figsize=(10,5))

bar1 = plt.bar(pos, accuracies.values(), width, alpha=0.7, color='b', label='Accuracy') 
bar2 = plt.bar([p + width for p in pos], losses.values(), width, alpha=0.7, color='r', label='Loss')

ax.set_xticks([p + 0.5 * width for p in pos])
ax.set_xticklabels(accuracies.keys())

plt.xlim(min(pos)-width, max(pos)+width*2)
plt.ylim([0, max(max(accuracies.values()), max(losses.values())) + 0.1])

plt.legend(loc='upper left')
plt.grid()
plt.title('Accuracy and Loss at Different Learning Rate')
plt.xlabel('Learning Rate')
plt.ylabel('Score')

def autolabel(bars):
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width() / 2., 1.05 * height,
                '%.4f' % float(height),
                ha='center', va='bottom')

autolabel(bar1)
autolabel(bar2)

plt.show()

In [None]:
plot_loss_accuracy_epoch(*lr_cnn_loss[0.001])

In [None]:
plot_loss_accuracy_epoch(*lr_cnn_loss[0.0001])