### Experiments on Effective Data Augmentation Techniques - Lung Cancer Type Classification Problem



https://www.kaggle.com/datasets/mohamedhanyyy/chest-ctscan-images/data


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torchvision.transforms import v2

import numpy as np

from model_CNN import cnn



In [2]:
train_path = './data/train/'
val_path = './data/valid/'

In [3]:
transform_train = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])

transform_val = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])

In [4]:
train_set = datasets.ImageFolder(train_path, transform=transform_train)
val_set = datasets.ImageFolder(val_path, transform=transform_val)

In [5]:
train_loader = torch.utils.data.DataLoader(dataset=train_set, batch_size=8, num_workers=4, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=val_set, batch_size=8, num_workers=4, shuffle=True)
loaders = {
    'train' : train_loader,
    'valid' : val_loader
}

In [6]:
dataset_sizes = {
    'train' : 465, 
    'valid' : 59
}

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

### training

In [8]:
# from torch.autograd import Variable

# def mixup_data(x, y, alpha=1.0, use_cuda=True):
#     '''Returns mixed inputs, pairs of targets, and lambda'''
#     if alpha > 0:
#         lam = np.random.beta(alpha, alpha)
#     else:
#         lam = 1

#     batch_size = x.size()[0]
#     if use_cuda:
#         index = torch.randperm(batch_size).cuda()
#     else:
#         index = torch.randperm(batch_size)

#     mixed_x = lam * x + (1 - lam) * x[index, :]
#     y_a, y_b = y, y[index]
#     return mixed_x, y_a, y_b, lam


# def mixup_criterion(criterion, pred, y_a, y_b, lam):
#     return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [9]:
# # mixup ver
# def train_model(model, criterion, optimizer, num_epochs=10):
#     best_loss = float('inf')
#     for epoch in range(num_epochs):
#         print(f'Epoch {epoch}/{num_epochs - 1}')
#         print('-' * 10)

#         for phase in ['train', 'valid']:
#             if phase == 'train':
#                 model.train()
#             else:
#                 model.eval()

#             running_loss = 0.0
#             running_corrects = 0

#             for inputs, labels in loaders[phase]:
#                 inputs, labels = inputs.to(device), labels.to(device)

#                 # mixup 적용
#                 inputs, targets_a, targets_b, lam = mixup_data(inputs, labels,
#                                                                1.0, True)
#                 inputs, targets_a, targets_b = map(Variable, (inputs,
#                                                               targets_a, targets_b))
#                 # /mixup 적용
#                 optimizer.zero_grad()

#                 with torch.set_grad_enabled(phase == 'train'):
#                     outputs = model(inputs)
#                     _, preds = torch.max(outputs, 1)
#                     loss = criterion(outputs, labels)

#                     if phase == 'train':
#                         loss.backward()
#                         optimizer.step()

#                 running_loss += loss.item() * inputs.size(0)
#                 running_corrects += torch.sum(preds == labels.data)


#             epoch_loss = running_loss / dataset_sizes[phase]
#             epoch_acc = running_corrects.double() / dataset_sizes[phase]

#             if phase == 'valid' and best_loss > epoch_loss:
#                 best_loss = epoch_loss
#                 torch.save(model.state_dict(), './weight/model.pth')


#             print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

#     return model

In [10]:
def train_model(model, criterion, optimizer, num_epochs=10):
    best_loss = float('inf')
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in loaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)


            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            if phase == 'valid' and best_loss > epoch_loss:
                best_loss = epoch_loss
                torch.save(model.state_dict(), './weight/model.pth')

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

    return model

In [11]:
model = cnn().to(device)
print(model)

cnn(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=44944, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=3, bias=True)
)


In [12]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

In [13]:
trained_model = train_model(model, criterion, optimizer,10)

Epoch 0/9
----------
train Loss: 1.0191 Acc: 0.4989
valid Loss: 1.1221 Acc: 0.3559
Epoch 1/9
----------
train Loss: 0.8624 Acc: 0.5570
valid Loss: 0.9459 Acc: 0.5593
Epoch 2/9
----------
train Loss: 0.7096 Acc: 0.6968
valid Loss: 1.0628 Acc: 0.6271
Epoch 3/9
----------
train Loss: 0.5366 Acc: 0.7914
valid Loss: 1.2361 Acc: 0.5763
Epoch 4/9
----------
train Loss: 0.5038 Acc: 0.7892
valid Loss: 1.0060 Acc: 0.5424
Epoch 5/9
----------
train Loss: 0.4307 Acc: 0.8344
valid Loss: 0.7888 Acc: 0.6441
Epoch 6/9
----------
train Loss: 0.3103 Acc: 0.9075
valid Loss: 0.5382 Acc: 0.7458
Epoch 7/9
----------
train Loss: 0.2230 Acc: 0.9204
valid Loss: 0.6673 Acc: 0.6949
Epoch 8/9
----------
train Loss: 0.1583 Acc: 0.9570
valid Loss: 0.8134 Acc: 0.7627
Epoch 9/9
----------
train Loss: 0.2432 Acc: 0.9054
valid Loss: 0.7519 Acc: 0.6780


In [14]:
def test_model(model, dataloader):
    model.eval()
    corrects = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            total += labels.size(0)
            corrects += torch.sum(preds == labels.data)

    accuracy = corrects.double() / total
    print(f'Test Accuracy: {accuracy:.4f}')

In [15]:
test_set = datasets.ImageFolder('./data/test/', transform=transform_val)
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=8, shuffle=False, num_workers=4)
test_model(trained_model, test_dataloader)

Test Accuracy: 0.5096
