In [1]:
import pandas as pd
import os
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from torch import nn
from tqdm import tqdm

In [2]:
class LegoDataset(Dataset):
    """Custom dataset class for Lego Minifigure dataset."""
    
    def __init__(self, img_dir, test=False, transform=None, target_transform=None):
        """initialize a LegoDataset instance
        
        Keyword arguments:
        img_dir -- the path to the root image directory of test and train data
        test -- True to load test data (default: False)
        transform -- transform to apply to X
        target_transform -- transform to apply to y"""
        
        self.transform = transform
        self.target_transform = target_transform
        self.test = test
        self.img_dir = os.path.join(img_dir, "test/" if test else "train/")
        self.full_df = None
        
        # read path names and class names from csv files
        meta_df = pd.read_csv(os.path.join(self.img_dir, "metadata.csv"))
        if self.test:
            test_df = pd.read_csv(os.path.join(self.img_dir, "test.csv"))
            self.full_df = test_df.merge(meta_df, on="class_id")
        else:
            train_df = pd.read_csv(os.path.join(self.img_dir, "index.csv"))
            self.full_df = train_df.merge(meta_df, on="class_id")
    
    def __len__(self):
        return len(self.full_df)
    
    def __getitem__(self, idx):
        row = self.full_df.iloc[idx]
        image = read_image(os.path.join(self.img_dir, row["path"])).float()
        label = row["class_id"] - 1
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [3]:
# load datasets for getting mean and std for standardization
def get_mean_std(size=[224, 224]):
    transform = torchvision.transforms.Compose([
        torchvision.transforms.Resize(size)
    ])
    train_dataset = LegoDataset("data/", test=False, transform=transform)

    # get mean and std of dataset
    loader = DataLoader(train_dataset, batch_size=len(train_dataset))
    data = next(iter(loader))
    return data[0].mean(), data[0].std()

In [4]:
# prep data for real
input_size = [64, 64]
mean, std = get_mean_std(input_size)
transform = torchvision.transforms.Compose([
    torchvision.transforms.Resize(input_size),
    torchvision.transforms.Normalize(mean, std)
])
train_dataset = LegoDataset("data/", test=False, transform=transform)
test_dataset = LegoDataset("data/", test=True, transform=transform)


In [5]:
# simple conv net
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        
        # conv_stack
        self.conv_stack = nn.Sequential(
            nn.Conv2d(3, 16, 5),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),
            nn.Conv2d(16, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),
            nn.Conv2d(32, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d((2, 2))
        )
        
        self.flatten = nn.Flatten()
        
        # 2 dense layers
        self.dense_stack = nn.Sequential(
            nn.Linear(1152, 100),
            nn.ReLU(),
            nn.Linear(100, 36)
        )
        
    
    def forward(self, x):
        x = self.conv_stack(x)
        x = self.flatten(x)
        x = self.dense_stack(x)
        return x
    
    def train(self, optimizer, loss_fn, train_loader, test_loader, n_epochs):
        for e in range(n_epochs):
            total = 0
            correct = 0
            for X, y in train_loader:
                # get preds
                preds = self.forward(X)
                loss = loss_fn(preds, y)
                
                # backprop
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                # collect stats
                max_preds = preds.argmax(1)
                total += y.size()[0]
                correct += sum(max_preds == y)
            test_total = 0
            test_correct = 0
            
            # get test acc
            with torch.no_grad():
                for X, y in test_loader:
                    preds = model(X)
                    max_preds = preds.argmax(1)
                    test_total += y.size()[0]
                    test_correct += sum(max_preds == y)
            
            train_acc = correct / total
            test_acc = test_correct / test_total
            # print out stats
            print(f"epoch: {e}\t train_acc: {train_acc:.3f}\t test_acc: {test_acc:.3f}")
                
                

In [6]:
# run everything!
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16)
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=16)
model = ConvNet()

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
print(model)

ConvNet(
  (conv_stack): Sequential(
    (0): Conv2d(3, 16, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (dense_stack): Sequential(
    (0): Linear(in_features=1152, out_features=100, bias=True)
    (1): ReLU()
    (2): Linear(in_features=100, out_features=36, bias=True)
  )
)


In [7]:
model.train(optimizer, loss_fn, train_loader, test_loader, 50)

IndexError: Target 36 is out of bounds.

In [None]:
# lets try ResNet!
resnet = torchvision.models.resnet18(pretrained=True)
print(resnet)

In [None]:
# new transform
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# data aug for training
data_aug = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomApply([
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(),
        transforms.RandomRotation(45)
    ]),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# data loaders
train_dataset = LegoDataset("data/", test=False, transform=data_aug)
test_dataset = LegoDataset("data/", test=True, transform=preprocess)

train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16)
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=16)


In [None]:
# disable grad for all parameters
for param in resnet.parameters():
    param.requires_grad = False
    
# replace fc layer
resnet.fc = nn.Linear(resnet.fc.in_features, out_features=36, bias=True)

# set up params to update
params_to_update = resnet.fc.parameters()

n_epochs = 100

loss_fn = nn.CrossEntropyLoss()
optim = torch.optim.SGD(params_to_update, lr=0.01, momentum=0.9)
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, n_epochs)


train_accs = []
test_accs = []

for e in range(n_epochs):
    train_total = 0
    train_correct = 0
    test_total = 0
    test_correct = 0
    
    resnet.train()
    for X, y in train_loader:
        # forward pass
        preds = resnet.forward(X)
        loss = loss_fn(preds, y)
        
        # backward pass
        optim.zero_grad()
        loss.backward()
        optim.step()
        
        # collect stats
        preds_max = preds.argmax(1)
        train_total += y.size()[0]
        train_correct += sum(preds_max == y)
    
    # update scheduler
    lr_scheduler.step()
    
    train_acc = train_correct / train_total
    train_accs.append(train_acc)
    
    resnet.eval()
    with torch.no_grad():
        for X, y in test_loader:
            # forward pass
            preds = resnet.forward(X)

            # collect stats
            preds_max = preds.argmax(1)
            test_total += y.size()[0]
            test_correct += sum(preds_max == y)
    
    test_acc = test_correct / test_total
    test_accs.append(test_acc)
    print(f"epoch: {e}\t train_acc: {train_acc:.3f}\t test_acc: {test_acc:.3f}")


In [None]:
from matplotlib import pyplot as plt

xs = [i for i in range(n_epochs)]

plt.plot(xs, train_accs, color='orange', label="train_acc")
plt.plot(xs, test_accs, color='blue', label='test_acc')
plt.legend(loc='lower right')
plt.title('test and train acc vs epochs')
plt.show()