#### Torch CNN - CIFAR10

In [1]:
import PIL
import torch
import torchvision
import torch.nn as nn
import torchvision.transforms as transforms

BATCH_SIZE = 128
DOWNLOAD = False
SUBSET = 0

train_transform = transforms.Compose(
    [transforms.RandomHorizontalFlip(p=0.5),
     transforms.RandomAffine(degrees=(-5, 5), translate=(0.1, 0.1), scale=(0.9, 1.1)),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

test_transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Load dataset
train_dataset = torchvision.datasets.CIFAR10(root='data', 
                                            train=True,
                                            download=DOWNLOAD,
                                            transform=train_transform,
                                            )
test_dataset = torchvision.datasets.CIFAR10(root='data', 
                                            train=False,
                                            download=DOWNLOAD,
                                            transform=test_transform,
                                            )
if SUBSET != 0:
    subset_indices = list(range(SUBSET))
    train_set = torch.utils.data.Subset(train_dataset, subset_indices)
    test_set = torch.utils.data.Subset(test_dataset, subset_indices)
    print(f"Using a subset of {SUBSET} samples for training and testing.")
else:
    train_set, test_set = train_dataset, test_dataset
    print("Using the full dataset for training and testing.")

# Create data loaders
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# Class names
CLASSES = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

Using the full dataset for training and testing.
cuda


In [2]:
def save_performance(save_path,
                     train_losses, test_losses, train_errs, 
                     test_errs, train_accs, test_acc, run_times,
                     n_step):
    import os
    import json

    if not os.path.exists(save_path):
        os.makedirs(save_path)

    performance = {
        'train_losses': train_losses,
        'test_losses': test_losses,
        'train_errs': train_errs,
        'test_errs': test_errs,
        'train_accs': train_accs,
        'test_acc': test_acc,
        'run_time': run_times,
        'n_step': n_step,
    }
    with open(f'{save_path}/SGD_{n_step}.json', 'w') as f:
        json.dump(performance, f, indent=4)
    

In [3]:
def evaluate_model(model, criterion, test_loader=test_loader):
    model.eval() 
    _test_acc, _test_err, _test_loss, total_test = 0, 0, 0, 0
    with torch.no_grad(): 
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total_test += labels.size(0)
            _test_acc += (predicted == labels).sum().item()
            _test_err += (predicted != labels).sum().item()
            _test_loss += criterion(outputs, labels).item()

    test_loss = _test_loss / total_test
    test_err = 100 * _test_err / total_test
    test_acc = 100 * _test_acc / total_test

    return test_loss, test_err, test_acc

In [4]:
from models import SimpleCNN
from optim.sgd_sngl import OneStepSGD
from optim.sgd_mult import ManyStepSGD
import time 

def modeling(n_step=2, n_epochs=15, lr=0.133, momentum=0, threshold=90):
    model = SimpleCNN().to(device)
    criterion = nn.CrossEntropyLoss()

    if n_step == 1:
        optimizer = OneStepSGD(model.parameters(), lr=lr, momentum=momentum)
    else:
        optimizer = ManyStepSGD(model.parameters(), lr=lr, momentum=momentum, n_step=n_step)

    train_losses, test_losses, train_errs, test_errs, train_accs, test_accs, run_times = [], [], [], [], [], [], []

    for i, epoch in enumerate(range(n_epochs)):
        print(f"Epoch: {i+1}/{n_epochs}")
        model.train()
        _start = time.time()
        run_time = 0
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            
            # Forward
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward
            loss.backward()
            optimizer.step()

            running_loss = loss.item()
            # Training accuracy
            _, predicted = torch.max(outputs.data, 1)
            _train_err = (predicted != labels).sum().item()
            _train_acc = (predicted == labels).sum().item()
            
            _batch_train_loss = running_loss / labels.size(0)
            _batch_train_acc = 100 * _train_acc / labels.size(0)
            _batch_train_err = 100 * _train_err / labels.size(0)
            
            test_loss, test_err, test_acc = evaluate_model(model, criterion, test_loader)

            train_losses.append(_batch_train_loss)
            train_errs.append(_batch_train_err)
            train_accs.append(_batch_train_acc)
            test_losses.append(test_loss)
            test_errs.append(test_err)
            test_accs.append(test_acc)
            
        run_time = time.time() - _start
        run_times.append(run_time)
        if epoch % 1 == 0:
            print(f'E [{epoch+1}/{n_epochs}]. train_loss_acc: {train_losses[-1]:.4f}, {train_errs[-1]}%, '
                    f'test_acc: {test_accs[-1]:.2f}%')
        if _batch_train_acc >= threshold:
            print(f"Early stopping at epoch {epoch+1} with train acc {_batch_train_acc:.2f}%")
            break
        
    return train_losses, test_losses, train_errs, test_errs, train_accs, test_accs, run_times

In [5]:
from plot import metrics_plot
n_epochs = 50
n_step = 1
lr = 0.133
threshold = 90
train_losses, test_losses, train_errs, test_errs, train_accs, test_accs, run_times = modeling(n_step=n_step, 
                                                                                   n_epochs=n_epochs, 
                                                                                   lr=lr,
                                                                                   threshold=threshold,
                                                                                  )
save_path = 'scores/batch_scores'
save_performance(save_path,
                 train_losses, test_losses, train_errs, 
                 test_errs, train_accs, test_accs, run_times, n_step=n_step,
                 )
metrics_plot(n_epochs, train_losses, test_losses, train_accs, test_accs, train_errs, test_errs)

Epoch: 1/50
E [1/50]. train_loss_acc: 0.0196, 61.25%, test_acc: 42.78%
Epoch: 2/50
E [2/50]. train_loss_acc: 0.0209, 61.25%, test_acc: 47.20%
Epoch: 3/50
E [3/50]. train_loss_acc: 0.0167, 52.5%, test_acc: 59.83%
Epoch: 4/50
E [4/50]. train_loss_acc: 0.0101, 26.25%, test_acc: 66.08%
Epoch: 5/50
E [5/50]. train_loss_acc: 0.0110, 25.0%, test_acc: 68.02%
Epoch: 6/50


KeyboardInterrupt: 

In [None]:
from plot import metrics_plot
n_epochs = 50
n_step = 10
lr = 0.133
threshold = 90
train_losses, test_losses, train_errs, test_errs, train_accs, test_accs, run_times = modeling(n_step=n_step, 
                                                                                   n_epochs=n_epochs, 
                                                                                   lr=lr,
                                                                                   threshold=threshold,
                                                                                  )
save_path = 'scores/batch_scores'
save_performance(save_path,
                 train_losses, test_losses, train_errs, 
                 test_errs, train_accs, test_accs, run_times, n_step=n_step,
                 )
metrics_plot(n_epochs, train_losses, test_losses, train_accs, test_accs, train_errs, test_errs)