# Imports

In [1]:
import requests
from transformers import CLIPProcessor, CLIPModel
from accelerate import Accelerator
import torch
from torchvision import datasets
from torchvision.transforms import v2
from torch.utils.data import Dataset, DataLoader
from torch import nn
import torchmetrics
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd
import os
from PIL import Image
from tqdm import tqdm
from datetime import datetime

# Dataset and dataloaders

In [2]:
device = 'cuda'

In [3]:
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

  return self.fget.__get__(instance, owner)()


In [4]:
class BirdsDataset(Dataset):
    """Birds dataset."""

    def __init__(self, csv_file, root_dir, processor, transforms = None):
        """
        Arguments:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
        """
        self.birds_df = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.processor = processor
        self.transforms = transforms

    def __len__(self):
        return len(self.birds_df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.birds_df.iloc[idx, 0])
        image = Image.open(img_name)
        labels = self.birds_df.iloc[idx, 1:]
        labels = np.array([labels], dtype=float)-1
        inp = processor(text=None, images=image, return_tensors="pt", padding=True)
        inp['pixel_values'] = torch.squeeze(inp['pixel_values'])
        if transforms:
            inp = self.transforms(inp)
        sample = (inp.to(device),labels)
        
        return sample

In [5]:
csv_file = 'C:\\Users\\pierr\\Documents\\INF649-Computer_Vision\\project\\data\\CUB_200_2011\\birds.csv'
root_dir = 'C:\\Users\\pierr\\Documents\\INF649-Computer_Vision\\project'
transforms = v2.Compose([v2.Resize((64,64))])

birds_dataset = BirdsDataset(csv_file=csv_file,
                             root_dir=root_dir,
                             processor = processor,
                             transforms = transforms)

In [6]:
trainset, valset, testset = torch.utils.data.random_split(birds_dataset, (0.8, 0.1, 0.1))

In [7]:
train_dataloader = torch.utils.data.DataLoader(trainset,
                                             batch_size=500, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(valset, batch_size=1, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=True)

# Model and model training

In [8]:
class endClassifier(nn.Module):
    
    def __init__(self, out_dim = 200, input_dim = 512, hidden_dim = 256, dropout = 0.1):
        super().__init__()
        layers = []
        layers.append(nn.Linear(in_features=input_dim, out_features=hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(p=dropout))
        layers.append(nn.Linear(in_features=hidden_dim, out_features=hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(p=dropout))
        layers.append(nn.Linear(in_features=hidden_dim, out_features=out_dim))
        self.layers = nn.Sequential(*layers)
        self.layers.apply(self.init_weights)
        
    def forward(self, inp: torch.Tensor) -> torch.Tensor:
        """
        takes as input CLIP image features (size 512) and returns classification score
        """
        inp = self.layers(inp)
        return inp  
    
    def init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform_(m.weight)
            m.bias.data.fill_(0.01)

In [11]:
class ClassifierModel(nn.Module):
    def __init__(self, clip_model, end_model = None, num_classes = 200, lr= 1e-3):
        """Initializing the model.

        Args:
            clip_model: CLIP model used as backbone for image feature generation
            num_classes (int, optional): number of output classes. Defaults to 200.
            learning_rate (float, optional)
        """
        super().__init__()
        
        print("initializing classifier model...")
        self.num_classes = num_classes
        self.clip_model = clip_model
        
        #freezing CLIP, we only want to train the linear layers we'll add at the end 
        for param in self.clip_model.parameters():
            param.requires_grad = False
            
        #adding linear layers
        if end_model:
            model = end_model
        else:
            model = endClassifier(out_dim=self.num_classes).to(device)
        self.classifier = model
            
        # setting the loss
        self.loss = nn.CrossEntropyLoss()
        
        #for testing
        self.preds = []
        self.targs = []
        print("classifier model initialized !")
    
    def training_step(self, batch) -> torch.Tensor:
        """Training forward pass.

        Args:
            batch ([type]): input batch of images and its corresponding classes.

        Returns:
            loss [torch.Tensor]: training loss value.
        """
        _x, _y = batch
        _y = torch.squeeze(_y).long().to(device)
        _z = self.clip_model.get_image_features(**_x)
        _out = self.classifier(_z)
        _loss = self.loss(_out, _y)

        return _loss
     
    def validation_step(self, batch) -> torch.Tensor:
        """Validation forward step.

        Args:
            batch ([type]): input batch of images and its corresponding classes.
        Returns:
            loss [torch.FloatTensor]: validation loss value.
        """
        _x, _y = batch
        _y = torch.squeeze(_y).long().unsqueeze(0).to(device)
        _z = self.clip_model.get_image_features(**_x)
        _out = self.classifier(_z)
        _loss = self.loss(_out, _y)

        return _loss
    
    def test_step(self, batch) -> None:
        """Test step.

        Args:
            batch ([type]): input batch of images and its corresponding classes.
        """
        _x, _y = batch
        _y = torch.squeeze(_y).long().unsqueeze(0).to(device)
        _z = self.clip_model.get_image_features(**_x)
        _out = self.classifier(_z)
        _loss = self.loss(_out, _y)
        
        self.targs.extend(_y.cpu().numpy())
        self.preds.extend(torch.argmax(_out, dim=1).cpu().numpy())

In [12]:
class Trainer(nn.Module):
    def __init__(self, classifierModel, trainloader, valloader, testloader, lr):
    
        super().__init__()

        self.trainloader = trainloader
        self.valloader = valloader
        self.testloader = testloader
        self.ClassifierModel = classifierModel
        
        #optimizer
        self.learning_rate = lr
        self.optimizer = torch.optim.Adam(self.ClassifierModel.classifier.parameters(), lr=self.learning_rate)
        self.lr_scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=20, gamma=0.4)
        
    def train_one_epoch(self, epoch_index):
        running_loss = 0.
        last_loss = 0.

        # Here, we use enumerate(trainloader) instead of
        # iter(trainloader) so that we can track the batch
        # index and do some intra-epoch reporting
        for i, batch in enumerate(tqdm(self.trainloader)):

            # Zero your gradients for every batch!
            self.optimizer.zero_grad()

            # Compute the loss and its gradients           
            loss = self.ClassifierModel.training_step(batch)
            loss.backward()

            # Adjust learning weights
            self.optimizer.step()
            # Changing learning rate
            self.lr_scheduler.step()
            # Gather data and report
            running_loss += loss.item()
            if i % 10 == 9:
                last_loss = running_loss / 10 # loss per batch
                print('  batch {} loss: {}'.format(i + 1, last_loss))
                running_loss = 0.

        return last_loss
    
    def train_multiple_epochs(self, EPOCHS = 100):
        epoch_number = 0
        best_vloss = 1_000_000.
        for epoch in range(EPOCHS):
            print('EPOCH {}:'.format(epoch_number + 1))

            # Make sure gradient tracking is on, and do a pass over the data
            self.ClassifierModel.classifier.train(True)
            avg_loss = self.train_one_epoch(epoch_number)

            running_vloss = 0.0
            # Set the model to evaluation mode, disabling dropout and using population
            # statistics for batch normalization.
            self.ClassifierModel.classifier.eval()

            # Disable gradient computation and reduce memory consumption.
            print('Epoch {} validation step'.format(epoch_number + 1))
            with torch.no_grad():
                for i, vdata in enumerate(tqdm(self.valloader)):
                    vloss = self.ClassifierModel.validation_step(vdata)
                    running_vloss += vloss

            avg_vloss = running_vloss / (i + 1)
            print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

            # Log the running loss averaged per batch
            # for both training and validation

            # Track best performance, and save the model's state
            if avg_vloss < best_vloss:
                best_vloss = avg_vloss
                ClassifierModel_path = 'models\\BIRBModel_{}_{}'.format(timestamp, epoch_number)
                torch.save(self.ClassifierModel.classifier.state_dict(), ClassifierModel_path)

            epoch_number += 1

    def test(self):       
        correct_predictions = 0
        total_samples = 0
        pred_labels = []
        with torch.no_grad():
            for i, tdata in enumerate(tqdm(self.testloader)):
                self.ClassifierModel.test_step(tdata)
                total_samples+=1
                          
        correct_preds = np.array(birb_model.targs)==np.array(birb_model.preds)
        correct_preds = correct_preds.astype(int)
        correct_predictions = np.sum(correct_preds)
        accuracy = correct_predictions / total_samples
        print(f'Test Accuracy: {accuracy * 100:.2f}%')


In [None]:
birb_model = ClassifierModel(clip_model)

In [None]:
trainer = Trainer(birb_model, train_dataloader, val_dataloader, test_dataloader, 1e-2)

In [None]:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
trainer.train_multiple_epochs()

In [None]:
trainer.test()

### Loading a model for testing

In [17]:
end_model = endClassifier(out_dim=200).to(device)
end_model.load_state_dict(torch.load("models\\BIRBModel_20240318_215922_50"))


<All keys matched successfully>

In [18]:
birb_model = ClassifierModel(clip_model, end_model = end_model)

initializing classifier model...
classifier model initialized !


In [19]:
with torch.no_grad():
    for i, tdata in enumerate(tqdm(test_dataloader)):
        birb_model.test_step(tdata)


100%|██████████| 1178/1178 [00:45<00:00, 25.82it/s]


AttributeError: 'bool' object has no attribute 'sum'

In [29]:
a = np.array(birb_model.targs)==np.array(birb_model.preds)
a.astype(int)
np.sum(a)/1178

0.6774193548387096