### Importación de paquetes

In [None]:
!conda install --yes -c pytorch pytorch=1.7.1 torchvision cudatoolkit=11.0
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

/bin/bash: line 1: conda: command not found
Collecting ftfy
  Downloading ftfy-6.2.0-py3-none-any.whl (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.4/54.4 kB[0m [31m968.2 kB/s[0m eta [36m0:00:00[0m
Installing collected packages: ftfy
Successfully installed ftfy-6.2.0
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-xdiipqc8
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-xdiipqc8
  Resolved https://github.com/openai/CLIP.git to commit a1d071733d7111c9c014f024669f959182114e33
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->clip==1.0)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->clip==1.0)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.wh

In [None]:
import os

import torch
import torch.nn as nn

import clip
import torch
from torchvision.datasets import ImageFolder
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, Subset, SubsetRandomSampler, DataLoader

import matplotlib.pyplot as plt

In [None]:
torch.autograd.set_detect_anomaly(True)

<torch.autograd.anomaly_mode.set_detect_anomaly at 0x7ac614c213f0>

### Definición de clases y funciones

In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_folder_path, transform=None):
        if transform:
            self.image_folder = ImageFolder(root=image_folder_path, transform=transform)
        else:
            self.image_folder = ImageFolder(root=image_folder_path)

        self.label_mapping = dict((v, k) for k, v in self.image_folder.class_to_idx.items())
        self.text_data = list(map(self.label_mapping.get, self.image_folder.targets))
        self.encoded_text = torch.cat([clip.tokenize(text) for text in self.text_data])

    def __len__(self):
        return len(self.image_folder)

    def __getitem__(self, index):
        # Obtén la imagen y la etiqueta del ImageFolder
        image, label = self.image_folder[index]

        # Obtén el texto correspondiente al índice
        text = self.encoded_text[index]

        # Devuelve la imagen, el texto y la etiqueta
        return image, text, label

In [None]:
# Función de pérdida
def loss_logits(logits, labels):
    """
    logits: Las salidas del modelo (predicciones) para cada clase.
    labels: Las etiquetas verdaderas (números enteros) para cada ejemplo.
    """
    criterion = nn.CrossEntropyLoss()  # Función de pérdida de entropía cruzada
    return criterion(logits, labels)

# Ejemplo de cómo usar la función de pérdida
logits = torch.tensor([[0.8, 0.1, 0.1], [0.2, 0.7, 0.1], [0.3, 0.2, 0.5]])
labels = torch.tensor([0, 1, 2])

loss = loss_logits(logits, labels)
print("Pérdida:", loss.item())

Pérdida: 0.7991690635681152


### Entrenamiento del modelo

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
folder_path = '/content/drive/MyDrive/TFM/Proyecto/Final_Database_mini_prueba/image'
num_epochs = 30
BATCH_SIZE = 32
data_augmentation = True
da = "DA" if data_augmentation else ""

In [None]:
from torchvision.transforms import Resize, Compose, ColorJitter, RandomHorizontalFlip, \
                                   RandomResizedCrop, RandomRotation, Normalize, ToTensor


augmentation = Compose([
    RandomHorizontalFlip(p=0.3),
    RandomRotation(degrees=(0, 45), fill=0),
    RandomResizedCrop(size=(224, 224), scale=(0.2, 1.0), ratio=(0.8, 1.2)),
    ToTensor(),
    Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
])

In [None]:
def train_test_dataloaders(folder_path, data_augmentation=False, test_split=0.1):

    dataset = CustomDataset(folder_path,  transform=preprocess)

    train_idx, test_idx = train_test_split(list(range(len(dataset))), test_size=test_split)
    train_sampler = SubsetRandomSampler(train_idx)

    # test_subset = Subset(dataset, test_idx) # En caso de que quisiéramos un Dataset y no un Dataloader
    test_sampler = SubsetRandomSampler(test_idx)
    test_loader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, sampler=test_sampler)

    # En caso de tener data augmentation, cambiamos el dataset para el Dataloader de train
    if data_augmentation:
      dataset = CustomDataset(folder_path,  transform=augmentation)

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, sampler=train_sampler)

    return train_loader, test_loader

In [None]:
# Descarga el modelo pre-entrenado y procesador de CLIP
device = "cuda" if torch.cuda.is_available() else "cpu"
selected_model = "RN50" # Otros modelos: "ViT-B/32"
model, preprocess = clip.load(selected_model, device)

train_loader, test_loader = train_test_dataloaders(folder_path, data_augmentation, 0.1)

100%|████████████████████████████████████████| 244M/244M [00:02<00:00, 120MiB/s]


In [None]:
lr = 1e-6
model_parameters_file = f"/content/drive/MyDrive/TFM/Proyecto/Scripts/{selected_model}_2pers_lr{f'{lr:.0e}'}_bs{BATCH_SIZE}_{num_epochs}ep{da}.pt"
model_parameters_file

'/content/drive/MyDrive/TFM/Proyecto/Scripts/RN50_2pers_lr1e-06_bs32_30epDA.pt'

In [None]:
# Inicializa el optimizador
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

train_loss = {}
test_loss = {}

# Creamos la lista de descripciones para evaluar el modelo
people_list = [name for name in os.listdir(folder_path) if os.path.isdir(f'{folder_path}/{name}')]
# eval_descriptions = torch.cat([clip.tokenize(f"a photo of a {c}") for c in people_list]).to(device)
eval_descriptions = torch.cat([clip.tokenize(f"{c}") for c in people_list]).to(device)

for epoch in range(num_epochs):

    # Entrena el modelo
    model.train()
    epoch_loss = 0.0
    for images, texts, labels in train_loader:
        optimizer.zero_grad()
        # texts = texts.to(device)
        texts = eval_descriptions.to(device)
        images = images.to(device)
        labels = labels.to(device)

        logits_per_image, logits_per_text = model(images, texts)
        loss = loss_logits(logits_per_image, labels) + loss_logits(logits_per_text.T, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    train_loss[epoch] = epoch_loss

    # Evaluación en el conjunto de prueba
    model.eval()  # Cambiamos al modo de evaluación
    epoch_loss = 0.0
    with torch.no_grad():
        total_correct = 0
        total_samples = 0
        for images, texts, labels in test_loader:  # Itera sobre los datos de prueba
            # texts = texts.to(device)
            texts = eval_descriptions.to(device)
            images = images.to(device)
            labels = labels.to(device)

            logits_per_image, logits_per_text = model(images, texts)
            loss = loss_logits(logits_per_image, labels) + loss_logits(logits_per_text.T, labels)
            epoch_loss += loss.item()

            image_features = model.encode_image(images).float()
            image_features /= image_features.norm(dim=-1, keepdim=True)
            text_features = model.encode_text(eval_descriptions).float()
            text_features /= text_features.norm(dim=-1, keepdim=True)

            probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

            predicted = torch.max(probs, 1).indices
            total_samples += labels.size(0)
            total_correct += (predicted == labels).sum().item()

        test_loss[epoch] = epoch_loss

        accuracy = total_correct / total_samples

        print(f'Epoch [{epoch+1}/{num_epochs}]:')
        print(f'- Loss (training):   {train_loss[epoch]}')
        print(f'- Loss (evaluation): {test_loss[epoch]}')
        print(f'- Accuracy:          {accuracy}')
        print()

# Guarda el modelo entrenado
torch.save(model.state_dict(), model_parameters_file)

ValueError: Expected input batch_size (2) to match target batch_size (32).

In [None]:
plt.plot(*zip(*sorted(train_loss.items())))
plt.plot(*zip(*sorted(test_loss.items())))
plt.show()

### Uso del modelo ya entrenado

In [None]:
import clip
import torch
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from PIL import Image
import numpy as np

# Descarga el modelo pre-entrenado y procesador de CLIP
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load(selected_model, device)

model.load_state_dict(torch.load(model_parameters_file))
model.eval()

read_image = Image.open('/content/drive/MyDrive/TFM/Proyecto/Final_Database_mini_prueba/image/Juan/frame00000.jpg')
image = preprocess(read_image).unsqueeze(0).to(device)
text = clip.tokenize(["Genesis", "Juan"]).to(device)

with torch.no_grad():
    image_features = model.encode_image(image).float()
    text_features = model.encode_text(text).float()

    logits_per_image, logits_per_text = model(image, text)
    probs = logits_per_image.softmax(dim=-1).cpu().numpy()

#read_image.show()
print("Label probs:", probs)
print(logits_per_text)

Label probs: [[0.18 0.82]]
tensor([[18.0000],
        [19.5156]], device='cuda:0', dtype=torch.float16)


In [None]:
text = clip.tokenize(["Gene", "Juan"]).to(device)

with torch.no_grad():
    image_features = model.encode_image(image)
    text_features = model.encode_text(text)

    logits_per_image, logits_per_text = model(image, text)
    probs = logits_per_image.softmax(dim=-1).cpu().numpy()

#read_image.show()
print("Label probs:", probs)
print(logits_per_text)

Label probs: [[0.2069 0.793 ]]
tensor([[18.1719],
        [19.5156]], device='cuda:0', dtype=torch.float16)


In [None]:
text = clip.tokenize(["Genesi", "Jun"]).to(device)

with torch.no_grad():
    image_features = model.encode_image(image)
    text_features = model.encode_text(text)

    logits_per_image, logits_per_text = model(image, text)
    probs = logits_per_image.softmax(dim=-1).cpu().numpy()

#read_image.show()
print("Label probs:", probs)
print(logits_per_text)

Label probs: [[0.4377 0.562 ]]
tensor([[18.4531],
        [18.7031]], device='cuda:0', dtype=torch.float16)


In [None]:
text = clip.tokenize(["Génesis", "Juán"]).to(device)

with torch.no_grad():
    image_features = model.encode_image(image)
    text_features = model.encode_text(text)

    logits_per_image, logits_per_text = model(image, text)
    probs = logits_per_image.softmax(dim=-1).cpu().numpy()

print("Label probs:", probs)
print(logits_per_text)

Label probs: [[0.07477 0.9253 ]]
tensor([[16.6719],
        [19.1875]], device='cuda:0', dtype=torch.float16)
