In [None]:
import numpy as np
import pandas as pd
from PIL import Image
import argparse
import torch
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from torch.nn import functional as F
from torchvision import datasets, transforms
from torchvision.utils import save_image
import torchvision.models as models
import matplotlib
import matplotlib.image as image
import matplotlib.pyplot as plt

# Data

In [None]:
file_path = '/kaggle/input/uw-cs480-fall20/'
image_path = '/kaggle/input/uw-cs480-fall20/suffled-images/shuffled-images/'

def load_data():
    train_df = pd.read_csv(file_path + 'train.csv')
    test_df = pd.read_csv(file_path + 'test.csv')
    return train_df, test_df

In [None]:
train_df, test_df = load_data()
data_size = len(train_df)
print(data_size)

# remove free gifts
train_df = train_df[train_df.category != 'Free Gifts']
data_size = len(train_df)
print(data_size)

categories = train_df.category.unique()
category_d = {k: v for v, k in enumerate(categories)}

genders = train_df.gender.unique()
gender_d = {k: v for v, k in enumerate(genders)}

baseColours = train_df.baseColour.unique()
baseColour_d = {k: v for v, k in enumerate(baseColours)}

seasons = train_df.season.unique()
season_d = {k: v for v, k in enumerate(seasons)}

usages = train_df.usage.unique()
usage_d = {k: v for v, k in enumerate(usages)}

data_size = len(train_df)


# training data

train_df.replace(
    {'category': category_d,
     'gender': gender_d,
     'baseColour': baseColour_d,
     'season': season_d,
     'usage': usage_d}
    , inplace=True
)

# testing data

test_df.replace(
    {'category': category_d,
     'gender': gender_d,
     'baseColour': baseColour_d,
     'season': season_d,
     'usage': usage_d}
    , inplace=True
)

preprocess_training = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

preprocess_test = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


class Image_Dataset(Dataset):

    def __init__(self, id_target, folder=image_path, transform=preprocess_training):
        self.id_target = id_target
        self.folder = folder
        self.transform = transform

    def __len__(self):
        return len(self.id_target)

    def __getitem__(self, idx):
        #if torch.is_tensor(idx):
        #    idx = idx.tolist()

        img_name = self.folder + str(id_target[idx][0]) + '.jpg'
        image = Image.open(img_name)

        #if self.transform:
        result = self.transform(image)

        return result, id_target[idx][1]


id_target = train_df[['id', 'category']].values

splits = np.array_split(id_target, 5)
#training_data_size = (data_size//5) * 4

train_data1 = np.concatenate(np.delete(splits, 4, 0))
validation_data1 = splits[4]

train_data2 = np.concatenate(np.delete(splits, 3, 0))
validation_data2 = splits[3]

train_data3 = np.concatenate(np.delete(splits, 2, 0))
validation_data3 = splits[2]

train_data4 = np.concatenate(np.delete(splits, 1, 0))
validation_data4 = splits[1]

train_data5 = np.concatenate(np.delete(splits, 0, 0))
validation_data5 = splits[0]

In [None]:
# script parameters
batch_size = 64
log_interval = 100

# run on GPU if possible
cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")

# create data loaders
kwargs = {'num_workers': 2, 'pin_memory': True} if cuda else {}


train1 = Image_Dataset(train_data1)
validation1 = Image_Dataset(validation_data1)

train2 = Image_Dataset(train_data2)
validation2 = Image_Dataset(validation_data2)

train3 = Image_Dataset(train_data3)
validation3 = Image_Dataset(validation_data3)

train4 = Image_Dataset(train_data4)
validation4 = Image_Dataset(validation_data4)

train5 = Image_Dataset(train_data5)
validation5 = Image_Dataset(validation_data5)


train_loader1 = DataLoader(train1, batch_size=batch_size, shuffle=True, **kwargs)
validation_loader1 = DataLoader(validation1, batch_size=batch_size, shuffle=True, **kwargs)

train_loader2 = DataLoader(train2, batch_size=batch_size, shuffle=True, **kwargs)
validation_loader2 = DataLoader(validation2, batch_size=batch_size, shuffle=True, **kwargs)

train_loader3 = DataLoader(train3, batch_size=batch_size, shuffle=True, **kwargs)
validation_loader3 = DataLoader(validation3, batch_size=batch_size, shuffle=True, **kwargs)

train_loader4 = DataLoader(train4, batch_size=batch_size, shuffle=True, **kwargs)
validation_loader4 = DataLoader(validation4, batch_size=batch_size, shuffle=True, **kwargs)

train_loader5 = DataLoader(train5, batch_size=batch_size, shuffle=True, **kwargs)
validation_loader5 = DataLoader(validation5, batch_size=batch_size, shuffle=True, **kwargs)

# Models

In [None]:
num_classes = 26

assert(len(categories) == num_classes)

model_image1 = models.resnet50(num_classes=num_classes).to(device)
model_image2 = models.resnet50(num_classes=num_classes).to(device)
model_image3 = models.resnet50(num_classes=num_classes).to(device)
model_image4 = models.resnet50(num_classes=num_classes).to(device)
model_image5 = models.resnet50(num_classes=num_classes).to(device)

# Loss Functions

In [None]:
criterion_image1 = nn.CrossEntropyLoss(reduction='sum')
criterion_image2 = nn.CrossEntropyLoss(reduction='sum')
criterion_image3 = nn.CrossEntropyLoss(reduction='sum')
criterion_image4 = nn.CrossEntropyLoss(reduction='sum')
criterion_image5 = nn.CrossEntropyLoss(reduction='sum')

# Optimizers

In [None]:
optimizer_image1 = optim.Adam(model_image1.parameters(), lr=1e-3)
optimizer_image2 = optim.Adam(model_image2.parameters(), lr=1e-3)
optimizer_image3 = optim.Adam(model_image3.parameters(), lr=1e-3)
optimizer_image4 = optim.Adam(model_image4.parameters(), lr=1e-3)
optimizer_image5 = optim.Adam(model_image5.parameters(), lr=1e-3)

# Training

In [None]:
def image_train(epoch, model, optimizer, criterion_image, train_loader):
    model.train()
    total_loss = 0
    for batch_idx, (images, targets) in enumerate(train_loader):
        
        images = images.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        recon_batch = model(images)
        
        loss = criterion_image(recon_batch, targets)
        loss.backward()
        total_loss += loss.item()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(images), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item() / len(images)))

    average_loss = total_loss / len(train_loader.dataset)
    print('====> Epoch: {} Average loss: {:.4f}'.format(
          epoch, average_loss))
    return average_loss

# Testing

In [None]:
def image_test(epoch, model, criterion_image, validation_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for _, (images, targets) in enumerate(validation_loader):
            images = images.to(device)
            targets = targets.to(device)
            recon_batch = model(images)
            test_loss += criterion_image(recon_batch, targets).item()
            
            preds = recon_batch.argmax(dim=1)#, keepdim=True)
            correct += preds.eq(targets).sum().item()
            

    average_test_loss = test_loss / len(validation_loader.dataset)
    test_accuracy = correct / len(validation_loader.dataset)
    print('====> Validation loss: {:.4f}'.format(average_test_loss))
    print('====> Validation accuracy: {:.2f}'.format(test_accuracy))
    return average_test_loss

# Main

## Parameter

In [None]:
epochs = 100

## Model1

In [None]:
# train and test

average_train_losses = []
average_test_losses = []

for epoch in range(1, epochs + 1):
    average_train_loss = image_train(epoch, model_image1, optimizer_image1, criterion_image1, train_loader1)
    average_train_losses.append(average_train_loss)
    average_test_loss = image_test(epoch, model_image1, criterion_image1, train_loader1)

    # save model with best validation loss
    if epoch == 1 or average_test_loss < min(average_test_losses):
        torch.save(model_image1, 'image_classification_model1.pt')

    average_test_losses.append(average_test_loss)
    

# Plot Training Losses
plt.plot(average_train_losses)
plt.title('Train Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Train'], loc='upper right')
plt.show()

# Plot Testing Losses
plt.plot(average_test_losses)
plt.title('Test Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Test'], loc='upper right')
plt.show()

## Model2

In [None]:
# train and test

average_train_losses = []
average_test_losses = []

for epoch in range(1, epochs + 1):
    average_train_loss = image_train(epoch, model_image2, optimizer_image2, criterion_image2, train_loader2)
    average_train_losses.append(average_train_loss)
    average_test_loss = image_test(epoch, model_image2, criterion_image2, train_loader2)

    # save model with best validation loss
    if epoch == 1 or average_test_loss < min(average_test_losses):
        torch.save(model_image2, 'image_classification_model2.pt')

    average_test_losses.append(average_test_loss)
    

# Plot Training Losses
plt.plot(average_train_losses)
plt.title('Train Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Train'], loc='upper right')
plt.show()

# Plot Testing Losses
plt.plot(average_test_losses)
plt.title('Test Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Test'], loc='upper right')
plt.show()

## Model3

In [None]:
# train and test

average_train_losses = []
average_test_losses = []

for epoch in range(1, epochs + 1):
    average_train_loss = image_train(epoch, model_image3, optimizer_image3, criterion_image3, train_loader3)
    average_train_losses.append(average_train_loss)
    average_test_loss = image_test(epoch, model_image3, criterion_image3, train_loader3)

    # save model with best validation loss
    if epoch == 1 or average_test_loss < min(average_test_losses):
        torch.save(model_image3, 'image_classification_model3.pt')

    average_test_losses.append(average_test_loss)
    

# Plot Training Losses
plt.plot(average_train_losses)
plt.title('Train Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Train'], loc='upper right')
plt.show()

# Plot Testing Losses
plt.plot(average_test_losses)
plt.title('Test Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Test'], loc='upper right')
plt.show()

## Model 4

In [None]:
# train and test

average_train_losses = []
average_test_losses = []

for epoch in range(1, epochs + 1):
    average_train_loss = image_train(epoch, model_image4, optimizer_image4, criterion_image4, train_loader4)
    average_train_losses.append(average_train_loss)
    average_test_loss = image_test(epoch, model_image4, criterion_image4, train_loader4)

    # save model with best validation loss
    if epoch == 1 or average_test_loss < min(average_test_losses):
        torch.save(model_image4, 'image_classification_model4.pt')

    average_test_losses.append(average_test_loss)
    

# Plot Training Losses
plt.plot(average_train_losses)
plt.title('Train Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Train'], loc='upper right')
plt.show()

# Plot Testing Losses
plt.plot(average_test_losses)
plt.title('Test Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Test'], loc='upper right')
plt.show()

## Model 5

In [None]:
# train and test

average_train_losses = []
average_test_losses = []

for epoch in range(1, epochs + 1):
    average_train_loss = image_train(epoch, model_image5, optimizer_image5, criterion_image5, train_loader5)
    average_train_losses.append(average_train_loss)
    average_test_loss = image_test(epoch, model_image5, criterion_image5, train_loader5)

    # save model with best validation loss
    if epoch == 1 or average_test_loss < min(average_test_losses):
        torch.save(model_image5, 'image_classification_model5.pt')

    average_test_losses.append(average_test_loss)
    

# Plot Training Losses
plt.plot(average_train_losses)
plt.title('Train Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Train'], loc='upper right')
plt.show()

# Plot Testing Losses
plt.plot(average_test_losses)
plt.title('Test Losses')
plt.ylabel('Cross Entropy')
plt.xlabel('Epoch #')
plt.legend(['Test'], loc='upper right')
plt.show()

# Predict Results for Test Data

In [None]:
model1 = torch.load('image_classification_model1.pt')
model1 = model1.to(device)
model1.eval()

model2 = torch.load('image_classification_model2.pt')
model2 = model2.to(device)
model2.eval()

model3 = torch.load('image_classification_model3.pt')
model3 = model3.to(device)
model3.eval()

model4 = torch.load('image_classification_model4.pt')
model4 = model4.to(device)
model4.eval()

model5 = torch.load('image_classification_model5.pt')
model5 = model5.to(device)
model5.eval()

result1 = []
result2 = []

for id in test_df.id.values:
    img_name = image_path + str(id) + '.jpg'
    image = Image.open(img_name)
    
    tensor_image = preprocess_test(image).unsqueeze(0).to(device)
    
    pred1 = model1(tensor_image)
    pred2 = model2(tensor_image)
    pred3 = model3(tensor_image)
    pred4 = model4(tensor_image)
    pred5 = model5(tensor_image)
    
    prediction1 = (nn.Softmax(dim=1)(pred1) + nn.Softmax(dim=1)(pred2) + nn.Softmax(dim=1)(pred3)
                   + nn.Softmax(dim=1)(pred4) + nn.Softmax(dim=1)(pred5)).argmax(dim=1).item()
    
    labels = [pred1.argmax(dim=1).item(), pred2.argmax(dim=1).item(), pred3.argmax(dim=1).item(),
              pred4.argmax(dim=1).item(), pred5.argmax(dim=1).item()]
    
    unique_labels, counts = np.unique(labels, return_counts=True)
    best_index = np.argmax(counts)
    prediction2 = unique_labels[best_index]
    
    result1.append([id, categories[prediction1]])
    result2.append([id, categories[prediction2]])
    

headers = ['id', 'category']

predict1 = pd.DataFrame(result1, columns=headers)
predict2 = pd.DataFrame(result2, columns=headers)

print(predict1)
print(predict2)

predict1.to_csv('submission1.csv', index=False)
predict2.to_csv('submission2.csv', index=False)