# Import Libraries and mount GoogleDrive

In [None]:
import time
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
from PIL import Image
from torchvision.models import efficientnet_b3, EfficientNet_B3_Weights

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
print(torch.cuda.get_device_properties(device))
!nvidia-smi

from google.colab import drive
drive.mount('/content/drive/') # mount your drive
%cd '/content/drive/My Drive/Notability/AI6126 Advanced Computer Vision/' 
# Above is the directory of the unzipped folder 'FashionDataset'


# Preprocessing of Data

In [None]:
# Train Dataset
train_names_file = "./FashionDataset/split/train.txt"
train_names = np.loadtxt(train_names_file, dtype=str)
train_names = ["./FashionDataset/"+name for name in train_names]
# train_names = [name.lstrip('img/') for name in train_names]

train_attr_file = "./FashionDataset/split/train_attr.txt"
train_attr = np.loadtxt(train_attr_file, dtype=int)
train_attr = torch.from_numpy(train_attr)
num_classes = torch.max(train_attr, dim=0).values + 1 # [7, 3, 3, 4, 6, 3]

class_label_count_train = np.zeros((7,6))
for label in range(6):
  for i in range(len(train_attr)):
    class_label = train_attr[i,label]
    class_label_count_train[class_label,label] += 1
    
print(f'Distribution in train attributes: \n {class_label_count_train} \n')

# Val Dataset
val_names_file = "./FashionDataset/split/val.txt"
val_names = np.loadtxt(val_names_file, dtype=str)
val_names = ["./FashionDataset/"+name for name in val_names]
# val_names = [name.lstrip('img/') for name in val_names]

val_attr_file = "./FashionDataset/split/val_attr.txt"
val_attr = np.loadtxt(val_attr_file, dtype=int)
val_attr = torch.from_numpy(val_attr)

class_label_count_val = np.zeros((7,6))
for label in range(6):
  for i in range(len(val_attr)):
    class_label = val_attr[i,label]
    class_label_count_val[class_label,label] += 1
    
print(f'Distribution in val attributes: \n {class_label_count_val} \n')


# Test Dataset
test_names_file = "./FashionDataset/split/test.txt"
test_names = np.loadtxt(test_names_file, dtype=str)
test_names = ["./FashionDataset/"+name for name in test_names]
# test_names = [name.lstrip('img/') for name in test_names]

print(f'Number of class per label: {num_classes}')


In [None]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.Resize((320,320), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.CenterCrop((300,300)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((320,320), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.CenterCrop((300,300)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((320,320), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.CenterCrop((300,300)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

class train_dataset(Dataset):
  def __init__(self, image_dir, attr, transform=None):
    self.attr = attr
    self.image_dir = image_dir
    self.transform = transform

  def __len__(self):
    return len(self.image_dir)
  
  def __getitem__(self, idx):
    image_path = self.image_dir[idx]
    image = Image.open(image_path)
    labels = self.attr[idx]
    if self.transform:
      image = self.transform(image)
    return image, labels

class test_dataset(Dataset):
  def __init__(self, image_dir, transform=None):
    self.image_dir = image_dir
    self.transform = transform

  def __len__(self):
    return len(self.image_dir)
  
  def __getitem__(self, idx):
    image_path = self.image_dir[idx]
    image = Image.open(image_path)
    # self.attr = torch.empty([len(self.images), 6], dtype=torch.int)
    # labels = self.attr[idx]
    if self.transform:
      image = self.transform(image)
    # return image, labels
    return image

train_data = train_dataset(train_names, train_attr, transform = train_transform)
val_data = train_dataset(val_names, val_attr, transform = val_transform)
test_data = test_dataset(test_names, transform = test_transform)

In [None]:
train_dataloader = DataLoader(train_data, batch_size=32, num_workers=4, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=32, num_workers=4, shuffle=False)
test_dataloader = DataLoader(test_data, batch_size=32, num_workers=4, shuffle=False)

# Define Model and Common Functions

In [None]:
class ModifiedEffNet(nn.Module):
  def __init__(self, num_classes, dropout):
    super().__init__()
    self.dropout = dropout
    self.num_classes = num_classes

    self.cat1_n = num_classes[0] # 7
    self.cat2_n = num_classes[1] # 3
    self.cat3_n = num_classes[2] # 3
    self.cat4_n = num_classes[3] # 4
    self.cat5_n = num_classes[4] # 6
    self.cat6_n = num_classes[5] # 3

    self.effnet = efficientnet_b3(weights=EfficientNet_B3_Weights.IMAGENET1K_V1) # 1536 out_features

    self.model_wo_fl = nn.Sequential(*(list(self.effnet.children())[:-1]))

    self.cat1 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat1_n),
        nn.Softmax(dim=1)
    ) # [batch size, 7]
    self.cat2 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat2_n),
        nn.Softmax(dim=1)
    ) # [batch size, 3]
    self.cat3 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat3_n),
        nn.Softmax(dim=1)
    ) # [batch size, 3]
    self.cat4 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat4_n),
        nn.Softmax(dim=1)
    ) # [batch size, 4]
    self.cat5 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat5_n),
        nn.Softmax(dim=1)
    ) # [batch size, 6]
    self.cat6 = nn.Sequential(
        nn.Dropout(p=self.dropout),
        nn.Linear(in_features = 1536, out_features = self.cat6_n),
        nn.Softmax(dim=1)
    ) # [batch size, 3]

  def forward(self, x):
    max_num_subclass = max(self.num_classes) # 7
    x = self.model_wo_fl(x)
    x = torch.flatten(x,1)
    cat1 = self.cat1(x).to(device)
    if cat1.shape[1] != max_num_subclass:
      filler = torch.zeros(cat1.shape[0], (max_num_subclass - cat1.shape[1])).to(device)
      cat1 = torch.cat((cat1, filler), 1) # [batch size, 7]

    cat2 = self.cat2(x).to(device)
    if cat2.shape[1] != max_num_subclass:
      filler = torch.zeros(cat2.shape[0], (max_num_subclass - cat2.shape[1])).to(device)
      cat2 = torch.cat((cat2, filler), 1) # [batch size, 7]

    cat3 = self.cat3(x).to(device)
    if cat3.shape[1] != max_num_subclass:
      filler = torch.zeros(cat3.shape[0], (max_num_subclass - cat3.shape[1])).to(device)
      cat3 = torch.cat((cat3, filler), 1) # [batch size, 7]
      
    cat4 = self.cat4(x).to(device)
    if cat4.shape[1] != max_num_subclass:
      filler = torch.zeros(cat4.shape[0], (max_num_subclass - cat4.shape[1])).to(device)
      cat4 = torch.cat((cat4, filler), 1) # [batch size, 7]
      
    cat5 = self.cat5(x).to(device)
    if cat5.shape[1] != max_num_subclass:
      filler = torch.zeros(cat5.shape[0], (max_num_subclass - cat5.shape[1])).to(device)
      cat5 = torch.cat((cat5, filler), 1) # [batch size, 7]
      
    cat6 = self.cat6(x).to(device)
    if cat6.shape[1] != max_num_subclass:
      filler = torch.zeros(cat6.shape[0], (max_num_subclass - cat6.shape[1])).to(device)
      cat6 = torch.cat((cat6, filler), 1) # [batch size, 7]
      
    return torch.stack([cat1, cat2, cat3, cat4, cat5, cat6], dim=2) # [batch size, 7, 6]

In [None]:
def accuracy(predictions, labels):
  predicted_classes = torch.argmax(predictions, dim=1)
  predictions_wrong = torch.count_nonzero(labels - predicted_classes)
  num_predictions = torch.numel(predicted_classes)
  predictions_correct = num_predictions - predictions_wrong
  return predictions_correct.float() / num_predictions

In [None]:
def train(model, dataloader, optimizer, criterion):
  epoch_loss = 0.0
  epoch_acc = 0.0

  model.train()
  for imgs, labels in dataloader:
    imgs = imgs.to(device)
    labels = labels.to(device) # [batch size, 6]

    optimizer.zero_grad()

    predictions = model(imgs) # [batch size, 7, 6]
    loss = criterion(predictions, labels)
    loss.backward()

    optimizer.step()

    acc = accuracy(predictions, labels)

    epoch_loss += loss.item()
    epoch_acc += acc.item()

  train_loss = epoch_loss / len(train_dataloader)
  train_acc = epoch_acc / len(train_dataloader)

  return train_loss, train_acc

In [None]:
def evaluate(model, dataloader, criterion):
  epoch_loss = 0.0
  epoch_acc = 0.0

  model.eval()
  with torch.no_grad():
    for imgs, labels in dataloader:
      imgs = imgs.to(device)
      labels = labels.to(device) # [batch size, 6]

      predictions = model(imgs)  # [batch size, 7, 6]

      loss = criterion(predictions, labels)
      acc = accuracy(predictions, labels)

      epoch_loss += loss.item()
      epoch_acc += acc.item()
  
  val_loss = epoch_loss / len(val_dataloader)
  val_acc = epoch_acc / len(val_dataloader)

  return val_loss, val_acc

# Train Model and Evaluate

In [None]:
# Parameters
LR = 0.1
NUM_EPOCH = 50
WEIGHT_DECAY = 0.00001
MOMENTUM = 0.9
DROPOUT = 0.2

# define Criterion
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)

# Define model
model = ModifiedEffNet(num_classes, DROPOUT)
model = model.to(device)

# Get number of parameters
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {num_params:,} trainable parameters \n')

# Define optimizer and scheduler
optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, NUM_EPOCH)

# Define statistic variables
stat_train_loss = []
stat_val_loss = []
stat_train_acc = []
stat_val_acc = []
best_val_loss = float('inf')

# Start model training
print(f'Training Model:')

for epoch in range(NUM_EPOCH):
  start_time = time.time()

  # Train model with train_dataset
  train_loss, train_acc = train(model, train_dataloader, optimizer, criterion)

  # Evaluate model against val_dataset
  val_loss, val_acc = evaluate(model, val_dataloader, criterion)

  # Save model with best (smallest) val loss
  if val_loss < best_val_loss:
    best_val_loss = val_loss
    model_name = 'model.pt'
    torch.save(model.state_dict(), model_name)

  end_time = time.time()
  epoch_secs = end_time - start_time

  stat_train_loss.append(train_loss)
  stat_val_loss.append(val_loss)
  stat_train_acc.append(train_acc)
  stat_val_acc.append(val_acc)

  print(f'Epoch {epoch + 1}/{NUM_EPOCH}: Training duration: {epoch_secs:.2f}s')
  print(f'Train Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}% | Val. Loss: {val_loss:.3f} |  Val. Acc: {val_acc*100:.2f}%')

  scheduler.step()

print(f'Training Completed')

# Plot training Curves
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12,16))

ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Loss vs Epoch')
ax1.plot(stat_train_loss, 'ro-', label='Train')
ax1.plot(stat_val_loss, 'bo-', label='Val')
ax1.legend()

ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Accuracy vs Epoch')
ax2.plot(stat_train_acc, 'ro-', label='Train')
ax2.plot(stat_val_acc, 'bo-', label='Val')
ax2.legend()

## Test Trained Model

In [None]:
# Test Model
predictions = torch.Tensor().to(device)
model.load_state_dict(torch.load(model_name))
model.eval()
with torch.no_grad():
  for imgs in test_dataloader:
    imgs = imgs.to(device)

    pred = model(imgs) # [batch size, 7, 6]
    pred = torch.argmax(pred,dim=1) # [batch size, 6]

    predictions = torch.cat((predictions, pred), 0) # [1000, 6]

# Save Prediction    
predictions = predictions.cpu().numpy()
predictions = predictions.astype(int)
file_name = 'prediction.txt' 
np.savetxt(file_name, predictions, fmt='%.1d')
print('Done \n')

# Test Saved Models

In [None]:
## Run the next 2 lines if the model was NOT defined and trained before
# model = ModifiedEffNet(num_classes, 0.2)
# model = model.to(device)

model_name_temp = 'model_LR0.1_EPOCH50_B32_WD1e-5.pt'
model.load_state_dict(torch.load(model_name_temp))
model.eval()

predictions = torch.Tensor().to(device) # Initialize blank tensor

with torch.no_grad():
  for imgs in test_dataloader:
    imgs = imgs.to(device)

    pred = model(imgs) # [batch size, 7, 6]
    pred = torch.argmax(pred,dim=1) # [batch size, 6]

    predictions = torch.cat((predictions, pred), 0) # [1000, 6]

predictions = predictions.cpu().numpy()
predictions = predictions.astype(int)

## Run next 2 lines only if want to save the predictions as a .txt file
# file_name = 'prediction.txt' 
# np.savetxt(file_name, predictions, fmt='%.1d')

print('Done \n')

# Load results
previous_saved_pred = "prediction_LR0.1_EPOCH50_B32_WD1e-5.txt"
previous_saved_pred = np.loadtxt(previous_saved_pred, dtype=int)

# Compare loaded results with predictions from saved model, output should be 0
num_different_pred = np.count_nonzero((predictions - previous_saved_pred))
print(num_different_pred)