In [None]:
import torch
from torch.utils.data import Sampler
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models import resnet50
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os, gc
import numpy as np
import matplotlib.pyplot as plt

global device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
  torch.cuda.empty_cache()
  print("You have %d GPUs" % torch.cuda.device_count())

gc.isenabled()
!mkdir -p /kaggle/working/par_models

In [None]:
IMG_WIDTH = 224
IMG_HEIGHT = 224

def extract_annotation(label_path):
  annotations_list = []

  with open(label_path, 'r') as file:
    lines = file.readlines()
    for line in lines:
        
        line = line.strip()
        elements = line.split(',')
        
        colours = [int(e)-1 if int(e) != -1 else int(e) for e in elements[1:3]]
        
        annotation_tuple = (
            elements[0],  
            int(colours[0]), 
            int(colours[1]), 
            int(elements[3]),  
            int(elements[4]),  
            int(elements[5])  
        )
        
        annotations_list.append(annotation_tuple)

  annotations_list.sort(key=lambda x: x[0])
  return annotations_list

training_folder = '/kaggle/input/par-dataset/training_set/training_set'
validation_folder = '/kaggle/input/par-dataset/validation_set/validation_set'

train_label = '/kaggle/input/par-dataset/training_set(1).txt'
validation_label = '/kaggle/input/par-dataset/validation_set.txt'

annotations_training_list = extract_annotation(train_label)
annotations_validation_list = extract_annotation(validation_label)

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

data_transforms = {
    'train':
    transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.RandomAffine(degrees=30, shear=10),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        normalize
    ]),
    'validation':
    transforms.Compose([
        transforms.Resize((IMG_WIDTH, IMG_HEIGHT)),
        transforms.ToTensor(),
        normalize
    ]),
}


training_image_paths = [os.path.join(training_folder, filename) for filename in os.listdir(training_folder)]
validation_image_paths = [os.path.join(validation_folder, filename) for filename in os.listdir(validation_folder)]


training_image_paths.sort()
validation_image_paths.sort()

print(len(training_image_paths))
print(len(validation_image_paths))
print(len(annotations_training_list))


In [None]:
from collections import defaultdict

class CustomImageDataset(Dataset):

    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels    # lista di tuple (nome, att1, att2, att3, att4, att5)
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)


    def __getitem__(self, idx):
        global device
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        label = torch.tensor(list(label[1:])).to(device)
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

    @staticmethod
    def make_weights(dataset):
        global device
        upper_dict = defaultdict(int)  
        lower_dict = defaultdict(int)
        gender_dict = defaultdict(int)
        bag_dict = defaultdict(int)
        hat_dict = defaultdict(int)

        for label in dataset.labels:
            upper = label[1]
            lower = label[2]
            gender = label[3]
            bag = label[4]
            hat = label[5]

            if upper != -1:
                upper_dict[upper] += 1

            if lower != -1:
                lower_dict[lower] += 1
            
            if gender != -1:
                gender_dict[gender] += 1
            
            if bag != -1:
                bag_dict[bag] += 1
            
            if hat != -1:
                hat_dict[hat] += 1

        key_upper = list(upper_dict.keys())
        key_lower = list(lower_dict.keys())
        key_gender = list(gender_dict.keys())
        key_bag = list(bag_dict.keys())
        key_hat = list(hat_dict.keys())

        key_upper = sorted(key_upper)
        key_lower = sorted(key_lower)
        key_gender = sorted(key_gender)
        key_bag = sorted(key_bag)
        key_hat = sorted(key_hat)

        upper_weights = torch.tensor([1 / upper_dict[key] for key in key_upper]).to(device)
        lower_weights = torch.tensor([1 / lower_dict[key] for key in key_lower]).to(device)
        gender_weights = torch.tensor([1 / gender_dict[key] for key in key_gender]).to(device)
        bag_weights = torch.tensor([1 / bag_dict[key] for key in key_bag]).to(device)
        hat_weights = torch.tensor([1 / hat_dict[key] for key in key_hat]).to(device)

        return upper_weights, lower_weights, gender_weights, bag_weights, hat_weights


class CustomRandomSampler(Sampler):

    def __init__(self, dataset, replacement=False, batch_size=None):
        self.data_source = dataset
        self.replacement = replacement
        self.batch_size = batch_size

    def __iter__(self):
        indices = list(range(len(self.data_source)))

        number_iteration = len(indices) // self.batch_size

        if not self.replacement:
            indices = np.random.permutation(indices).tolist()

        batch = []
        index_seen = set()
        deque = []
        iteration = 1

        for idx in indices:
            if len(deque) > 0:
                t = min(self.batch_size - len(batch), len(deque))
                batch.extend(deque[:t])
                del deque[:t]
                if len(batch) == self.batch_size:
                    deque.append(idx)

            if len(batch) < self.batch_size:
                batch.append(idx)

            if len(batch) == self.batch_size:

                for batch_sample_index, sample_index in enumerate(batch):
                    _, labels = self.data_source[sample_index]

                    for label_index, l in enumerate(labels):

                        if l != -1:
                            index_seen.add(label_index)
                        elif l == -1 and label_index not in index_seen:

                            deque.append(sample_index)

                            # tentative = 0
                            while(True):
                                index_new_sample = np.random.choice(len(self.data_source))
                                _, labels = self.data_source[index_new_sample]
                                # tentative += 1
                                if labels[label_index] != -1:
                                    batch[batch_sample_index] = index_new_sample
                                    break

                yield batch

                if iteration == number_iteration:
                  break

                iteration += 1
                index_seen = set()
                batch = []


training_dataset = CustomImageDataset(training_image_paths, annotations_training_list, transform=data_transforms['train'])
validation_dataset = CustomImageDataset(validation_image_paths, annotations_validation_list, transform=data_transforms['validation'])

batch_size = 32
num_batch = len(training_dataset) // batch_size
sampler = CustomRandomSampler(dataset=training_dataset, batch_size=batch_size, replacement=False)

train_dataloader = DataLoader(training_dataset, batch_sampler = sampler)
val_dataloader = DataLoader(validation_dataset, batch_size = batch_size, shuffle = False)

print('num batch ', num_batch)

In [None]:

class ResNet50Backbone(nn.Module):

    def __init__(self):
        super(ResNet50Backbone, self).__init__() 

        self.model = resnet50(pretrained=True)
        self.model = torch.nn.Sequential(*list(self.model.children())[:-1])

    def forward(self, x):
        return self.model(x)

    def freeze_all(self):
        for param in self.model.parameters():
            param.requires_grad = False
            
    def unfreeze_last_layers(self, num_layers):
        for param in list(self.model.parameters())[-num_layers:]:
            param.requires_grad = True


In [None]:
class AttentionModule(nn.Module):
    #https://github.com/luuuyi/CBAM.PyTorch

    def __init__(self, in_channels):
        super(AttentionModule, self).__init__()

        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        
        self.channel_attention  = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 16, kernel_size=1, padding=0),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // 16, in_channels, kernel_size=1, padding=0),
        )

        self.sigmoid = nn.Sigmoid()

        self.spatial_attention = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=3, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x_spatial = torch.cat([avg_out, max_out], dim=1)
        x_spatial = self.spatial_attention(x_spatial)

        avg_out = self.channel_attention(self.avg_pool(x))
        max_out = self.channel_attention(self.max_pool(x))
        x_channel = avg_out + max_out
        x_channel = self.sigmoid(x_channel)
        
        out = x * x_channel * x_spatial
        return out

In [None]:
class BinaryClassifier(nn.Module):

  def __init__(self):
    super(BinaryClassifier, self).__init__()

    self.block1 = nn.Sequential(nn.Linear(2048, 512), nn.ReLU(), nn.BatchNorm1d(512), nn.Dropout(0.3))
    self.block2 = nn.Sequential(nn.Linear(512, 1), nn.Sigmoid())
  
  def forward(self, x):
    x = self.block1(x)
    x = self.block2(x)
    return x
  

class MultiClassifier(nn.Module):

  def __init__(self):
      super(MultiClassifier, self).__init__()

      self.block1 = nn.Sequential(nn.Linear(2048, 512), nn.ReLU(), nn.BatchNorm1d(512), nn.Dropout(0.3))
      self.block2 = nn.Sequential(nn.Linear(512, 11))

  def forward(self, x):
    x = self.block1(x)
    x = self.block2(x)
    return x




In [None]:
class AttributeRecognitionModel(nn.Module):

    def __init__(self, num_attributes):
        super(AttributeRecognitionModel, self).__init__()

        self.backbone = ResNet50Backbone()
        self.attention_modules = nn.ModuleList([AttentionModule(in_channels=2048) for _ in range(num_attributes)]) 
        binary_classifier = [BinaryClassifier() for _ in range(3)]
        multi_classifier = [MultiClassifier() for _ in range(2)]
        self.classifiers = nn.ModuleList(multi_classifier + binary_classifier)

    def forward(self, x):
        features = self.backbone(x)
        pred_list=[]
        attention_outputs = [attention(features) for attention in self.attention_modules]
        
        for att_output, classifier in zip(attention_outputs, self.classifiers):
            flattened_output = att_output.view(att_output.size(0), -1)
            pred = classifier(flattened_output)
            pred_list.append(pred)
            
        return pred_list

    def freeze_backbone_parameters(self):
      self.backbone.freeze_all()

    def unfreeze_parameters(self):
        for param in self.attention_modules.parameters():
            param.requires_grad = True

        for param in self.classifiers.parameters():
            param.requires_grad = True

    def unfreeze_last_layer_backbone(self, num_layers):
        self.backbone.unfreeze_last_layers(num_layers)



In [None]:
from torch.nn import functional as F

class BinaryAsymmetricLoss(nn.Module):
  # https://arxiv.org/abs/2009.14119

  def __init__(self, gamma_neg: any, gamma_pos: any, eps: float = 1e-8, ignore_index = None) -> None:
    global device
    super(BinaryAsymmetricLoss, self).__init__()
    self.gamma_neg = gamma_neg if isinstance(gamma_neg, torch.Tensor) else torch.tensor(gamma_neg)
    self.gamma_pos = gamma_pos if isinstance(gamma_pos, torch.Tensor) else torch.tensor(gamma_pos)
    self.gamma_neg = self.gamma_neg.to(device)
    self.gamma_pos = self.gamma_pos.to(device)
    self.eps = eps
    self.ignore_index = ignore_index


  def forward(self, prob: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
    global device

    # BINARY NORMALE
    targets = targets.float().to(device)
    
    if self.ignore_index:
      mask = torch.ones_like(targets, dtype=torch.bool)
      mask[targets == self.ignore_index] = 0
      targets = targets[mask]
      prob = prob[mask]
    
    prob = prob.squeeze(1)
    weight = torch.where(targets == 0, self.gamma_neg, self.gamma_pos).to(device)
    loss = F.binary_cross_entropy(prob, targets, weight=weight).to(device)
    return loss
    


In [None]:
import torch
import torch.nn.functional as F

class Accuracy():
    
    def __init__(self, threshold=0.5, is_binary=False, ignore_index=-1):
        self.threshold = threshold
        self.is_binary = is_binary
        self.ignore_index = ignore_index

    def __call__(self, predictions, targets):
        
        if self.is_binary:
            predictions = predictions.squeeze()
            acc = (predictions > self.threshold) == targets.detach()
            mask = targets != self.ignore_index
            acc = acc[mask]
            good_target = mask.float().sum()
            acc = acc.float().sum()
            return acc, good_target

        predictions = F.softmax(predictions, dim=1)
        _, prob_indicies = torch.max(predictions, dim=1)
        mask = targets != self.ignore_index
        acc = prob_indicies == targets.detach()
        acc = acc[mask]
        good_target = mask.float().sum()
        acc = acc.float().sum()
        return acc, good_target


In [None]:
# NUOVO ADDESTRAMENTO

num_attributes = 5
num_layers = 3

model = AttributeRecognitionModel(num_attributes=num_attributes)
model.freeze_backbone_parameters()
model.unfreeze_last_layer_backbone(num_layers)
model.unfreeze_parameters()

upper_weights, lower_weights, gender_weights, bag_weights, hat_weights = CustomImageDataset.make_weights(training_dataset)

loss_list = [nn.CrossEntropyLoss(weight=upper_weights, ignore_index=-1), nn.CrossEntropyLoss(weight=lower_weights, ignore_index=-1), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=gender_weights[0], gamma_pos=gender_weights[1]), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=bag_weights[0], gamma_pos=bag_weights[1]), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=hat_weights[0], gamma_pos=hat_weights[1])]

accuracy_list = [Accuracy(is_binary=False), Accuracy(is_binary=False), Accuracy(is_binary=True), Accuracy(is_binary=True), Accuracy(is_binary=True)]
optimizer = optim.Adam(model.parameters(), lr=1e-2)



In [None]:
# ADDESTRAMENTO DA CHECKPOINT

num_attributes = 5
num_layers = 3

path = os.path.join('/kaggle/input/modello/best_model.pth', 'model_checkpoint.pth')

model = AttributeRecognitionModel(num_attributes=num_attributes)
model.load_state_dict(torch.load(path))

model.freeze_backbone_parameters()
model.unfreeze_last_layer_backbone(num_layers)
model.unfreeze_parameters()

upper_weights, lower_weights, gender_weights, bag_weights, hat_weights = CustomImageDataset.make_weights(training_dataset)

loss_list = [nn.CrossEntropyLoss(weight=upper_weights, ignore_index=-1), nn.CrossEntropyLoss(weight=lower_weights, ignore_index=-1), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=gender_weights[0], gamma_pos=gender_weights[1]), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=bag_weights[0], gamma_pos=bag_weights[1]), BinaryAsymmetricLoss(ignore_index=-1, gamma_neg=hat_weights[0], gamma_pos=hat_weights[1])]

accuracy_list = [Accuracy(is_binary=False), Accuracy(is_binary=False), Accuracy(is_binary=True), Accuracy(is_binary=True), Accuracy(is_binary=True)]

optimizer = optim.Adam(model.parameters(), lr=1e-3)



In [None]:
from tqdm import tqdm
global device 

def one_epoch(model, criterion_list, optimizer, train_loader, val_loader, epoch_num, accuracy_list, num_batch):
  model.cuda()
  model.train()

  task_acc = dict((i, []) for i in range(num_attributes))
  train_loss = torch.tensor(0.0, dtype = torch.float32, device = device)
  
  for i, (images, labels) in tqdm(enumerate(train_loader), desc="epoch {} - train".format(epoch_num)):

    images = images.cuda()
    labels = labels.cuda().long()
    optimizer.zero_grad()

    o = model(images)

    batch_loss = []
    
    for attr_index in range(num_attributes):
      target = labels[:, attr_index]
      attribute_predictions = o[attr_index]
      loss = criterion_list[attr_index](attribute_predictions, target)
      acc, good_target = accuracy_list[attr_index](attribute_predictions, target)
      
      batch_loss.append(loss)
      task_acc[attr_index].append((acc.item(), good_target.item()))

    aggregated_loss = sum(batch_loss).to(device)
    train_loss += aggregated_loss
    aggregated_loss.backward()
    optimizer.step()
    
    del images
    del labels
    gc.collect()
    torch.cuda.empty_cache()

  train_loss = (train_loss / num_batch).item()
  task_acc = [sum(task_acc[i][0]) / sum(task_acc[i][1]) for i in range(num_attributes)]
  train_accuracy = sum(task_acc) / num_attributes
  
  print("Training loss and accuracy : {:.7f}\t{:.4f}".format(train_loss, train_accuracy))
  print("Task upper accuracy : ", task_acc[0])
  print("Task lower accuracy : ", task_acc[1])
  print("Task gender accuracy : ", task_acc[2])
  print("Task bag accuracy : ", task_acc[3])
  print("Task hat accuracy : ", task_acc[4])

  model.eval()
  with torch.no_grad():
    val_loss = []
    task_acc = dict((i, []) for i in range(num_attributes))
    
    for images, labels in tqdm(val_loader, desc="epoch {} - validation".format(epoch_num)):
      images = images.cuda()
      labels = labels.cuda().long()

      o = model(images)

      batch_loss = []
     
      for attr_index in range(num_attributes):
        target = labels[:, attr_index]
        attribute_predictions = o[attr_index]

        loss = criterion_list[attr_index](attribute_predictions, target)
        acc, good_target = accuracy_list[attr_index](attribute_predictions, target)

        batch_loss.append(loss)
        task_acc[attr_index].append((acc.item(), good_target.item()))

      aggregated_loss = sum(batch_loss)
      val_loss.append(aggregated_loss)

    task_acc = [sum(task_acc[i][0]) / sum(task_acc[i][1]) for i in range(num_attributes)]
    val_accuracy = sum(task_acc) / num_attributes
    val_loss = torch.stack(val_loss).mean().item() 
    
    print("Validation loss and accuracy : {:.7f}\t{:.4f}".format(val_loss, val_accuracy))
    print("Task upper accuracy : ", task_acc[0])
    print("Task lower accuracy : ", task_acc[1])
    print("Task gender accuracy : ", task_acc[2])
    print("Task bag accuracy : ", task_acc[3])
    print("Task hat accuracy : ", task_acc[4])

  return val_loss, val_accuracy


EARLY_STOPPIMG_PATIENCE = 5
early_stopping_counter = EARLY_STOPPIMG_PATIENCE

epochs = 10
min_val_loss = 1e10

val_losses = torch.zeros(epochs)
val_accuracies = torch.zeros(epochs)

for e in range(epochs):
  print("EPOCH {}".format(e))

  val_loss, val_accuracy = one_epoch(model, loss_list, optimizer, train_dataloader, val_dataloader, e, accuracy_list, num_batch)

  val_losses[e] = val_loss
  val_accuracies[e] = val_accuracy

  if val_loss < min_val_loss:
    min_val_loss = val_loss
    early_stopping_counter = EARLY_STOPPIMG_PATIENCE 
    torch.save(model.state_dict(), os.path.join('/kaggle/working','par_models','best_model.pth'))
    print("- saved best model: val_loss =", val_loss, "val_accuracy =", val_accuracy)

  if e>0: 
    if val_losses[e] > val_losses[e-1]:
        early_stopping_counter -= 1
    else:
        early_stopping_counter = EARLY_STOPPIMG_PATIENCE 

  if early_stopping_counter == 0: 
      break
