In [4]:
import clip
import numpy as np
import torch
from PIL import Image
from torch.utils.data import random_split
from tqdm import tqdm

from refcocog import RefCOCOg, RefCOCOgSample

if torch.cuda.is_available():
    device = torch.device("cuda")  # CUDA GPU
    print("[INFO] GPU found, using GPU.")
elif torch.has_mps:
    device = torch.device("mps")  # Apple Silicon
    print("[INFO] MPS found, using MPS.")
else:
    device = torch.device("cpu")
    print("[INFO] No GPU found, using CPU instead.")

clip_model, prep = clip.load("RN50", device=device)

dataset = RefCOCOg(ds_path='dataset/refcocog')


[INFO] MPS found, using MPS.


In [5]:
def training_step(model, dataloader, optimizer, cost_function, device='cuda'):
    model.train()

    for batch in tqdm(dataloader, desc="Performing training step"):

        images, captions = [], []

        for sample in batch:
            sample = RefCOCOgSample(**sample)

            image = Image.open(sample.path)
            image = prep(image).to(device)

            caption = sample.sentences[0]

            images.append(image)
            captions.append(caption)

        captions = clip.tokenize(captions).to(device)
        images = torch.stack(images).to(device)

        image_logits, text_logits = model(images, captions)

        labels = np.arange(images.shape[0])
        labels = torch.from_numpy(labels)

        loss_i = cost_function(image_logits, labels)
        loss_t = cost_function(text_logits, labels)

        loss = (loss_i + loss_t) / 2.0

    loss = loss.mean()

    loss.backward()

    optimizer.step()

    optimizer.zero_grad()

    return loss.item()


In [6]:
def test_step(model, dataloader, cost_function, device='cuda'):
    model.eval()

    with torch.no_grad():

        for batch in tqdm(dataloader, desc="Performing test step"):

            images, captions = [], []

            for sample in batch:
                sample = RefCOCOgSample(**sample)

                image = Image.open(sample.path)
                image = prep(image).to(device)

                caption = sample.sentences[0]

                images.append(image)
                captions.append(caption)

            captions = clip.tokenize(captions).to(device)
            images = torch.stack(images).to(device)

            image_logits, text_logits = model(images, captions)

            labels = np.arange(images.shape[0])
            labels = torch.from_numpy(labels)

            loss_i = cost_function(image_logits, labels)
            loss_t = cost_function(text_logits, labels)

            loss = (loss_i + loss_t) / 2.0

        loss = loss.mean()

    return loss.item()


In [None]:
train_split = 0.8

# randomly discard 80% of the dataset
dataset, _ = random_split(dataset, [int(0.2 * len(dataset)), len(dataset) - int(0.2 * len(dataset))])

train_ds, test_ds = random_split(dataset,
                                 [int(train_split * len(dataset)), len(dataset) - int(train_split * len(dataset))])

print(f"Dataset Size: {len(dataset)}\n---")
print(f"Train size {len(train_ds)}")
print(f"Test Size: {len(test_ds)}")

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=lambda x: x)

test_dl = torch.utils.data.DataLoader(test_ds, batch_size=16, shuffle=True, collate_fn=lambda x: x)

optimizer = torch.optim.Adam(clip_model.parameters(), lr=0.0001, weight_decay=0.000001)

cost_function = torch.nn.CrossEntropyLoss()

epochs = 10

# perform a preliminar step
print('Before training:')
train_loss, train_accuracy = test_step(clip_model, train_dl, cost_function, device=device)
test_loss, test_accuracy = test_step(clip_model, test_dl, cost_function, device=device)

print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
print('\tTest loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
print('-----------------------------------------------------')

# range over the number of epochs
for e in range(epochs):
    train_loss, train_accuracy = training_step(clip_model, train_dl, optimizer, cost_function, device=device)
    test_loss, test_accuracy = test_step(clip_model, test_dl, cost_function, device=device)
    print('Epoch: {:d}'.format(e + 1))
    print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
    print('\tTest loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
    print('-----------------------------------------------------')

# perform final test step and print the final metrics
print('After training:')
train_loss, train_accuracy = test_step(clip_model, train_dl, cost_function, device=device)
test_loss, test_accuracy = test_step(clip_model, test_dl, cost_function, device=device)

print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
print('\tTest loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
print('-----------------------------------------------------')


Dataset Size: 9964
---
Train size 7971
Test Size: 1993
Before training:


Performing test step:   0%|          | 0/499 [00:00<?, ?it/s]