In [1]:
import os
import random
import shutil
import copy
import time
import torch
import torchvision
from torchvision import datasets, models, transforms
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.utils.data import DataLoader, Subset
import math
from sklearn.metrics import confusion_matrix

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Ruta de la carpeta principal
main_folder = "/home/xnmaster/dataset"

input_size = 50

In [4]:
# Just normalization
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

print("Initializing Datasets and Dataloaders...")

Initializing Datasets and Dataloaders...


In [5]:
# Batch size para el entrenamiento (cambia según la cantidad de memoria disponible)
batch_size = 8

# Crear datasets de entrenamiento y validación
image_datasets = {x: datasets.ImageFolder(os.path.join(main_folder, x), data_transforms[x]) for x in ['train', 'val', 'test']}

"""
# Generar los índices para el subconjunto
subset_indices_train = torch.randperm(len(image_datasets['train']))[:int(0.1*len(image_datasets['train']))]
subset_indices_val = torch.randperm(len(image_datasets['val']))[:int(0.1*len(image_datasets['val']))]

# Crear subconjuntos
train_data_subset = Subset(image_datasets['train'], subset_indices_train)
val_data_subset = Subset(image_datasets['val'], subset_indices_val)


# Crear dataloaders de entrenamiento y validación
dataloaders_dict = {
    'train': DataLoader(train_data_subset, batch_size=batch_size, shuffle=True, num_workers=4),
    'val': DataLoader(val_data_subset, batch_size=batch_size, shuffle=True, num_workers=4)
}
"""
dataloaders_dict = {x: DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in ['train', 'val','test']}

In [6]:
"""
# Define el modelo del Transformer con Attention y Positional Encoding
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_inputs, hidden_dim, num_classes, input_size):
        super(TransformerModel, self).__init__()
        self.input_dim = input_dim
        self.num_inputs = num_inputs
        self.hidden_dim = hidden_dim
        self.num_classes = num_classes

        self.positional_encoding = self.generate_positional_encoding(input_size, input_dim)

        self.query_fc = nn.Linear(input_dim, hidden_dim)
        self.keys_fc = nn.Linear(input_dim, hidden_dim)
        self.values_fc = nn.Linear(hidden_dim, 1)

        self.encoder = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=4)
        self.classifier = nn.Linear(hidden_dim, num_classes)

    def generate_positional_encoding(self, input_size, hidden_dim):
        positional_encoding = torch.zeros(1, input_size, hidden_dim)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * -(math.log(10000.0) / hidden_dim))
        position = torch.arange(0, input_size).unsqueeze(1).float()
        positional_encoding[:, :, 0::2] = torch.sin(position * div_term[:hidden_dim // 2])
        if hidden_dim % 2 == 1:  # hidden_dim is odd
            positional_encoding[:, :, 1::2] = torch.cos(position * div_term[:hidden_dim // 2])
        else:  # hidden_dim is even
            positional_encoding[:, :, 1::2] = torch.cos(position * div_term[1:hidden_dim // 2 + 1])
        return positional_encoding.to(device)

    def scoring_additive(self, query, keys):
        query = query.repeat(1, self.num_inputs, 1)
        query = torch.tanh(self.query_fc(query))

        keys = torch.tanh(self.keys_fc(keys))

        score = torch.tanh(query + keys)
        score = self.values_fc(score)
        return score

    def forward(self, x):
        batch_size, num_inputs, input_dim, _ = x.size()
        x = x.view(batch_size, input_dim, -1) #x = x.view(batch_size, num_inputs * input_dim, -1)
        positional_encoding = self.positional_encoding[:, :input_dim, :].repeat(batch_size, 1, 1)

        keys = self.scoring_additive(x, positional_encoding)
        keys = keys.view(batch_size, -1)

        keys = self.encoder(keys.unsqueeze(-1)).squeeze()

        keys = keys.view(batch_size, num_inputs, -1)

        keys = keys.max(dim=1)[0]

        out = self.classifier(keys)
        return out
"""
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_inputs, hidden_dim, num_classes):
        super(TransformerModel, self).__init__()
        self.input_dim = input_dim
        self.num_inputs = num_inputs
        self.hidden_dim = hidden_dim
        self.num_classes = num_classes

        self.positional_encoding = self.generate_positional_encoding(self.num_inputs, self.hidden_dim)

        self.query_fc = nn.Linear(input_dim, hidden_dim)
        self.keys_fc = nn.Linear(input_dim, hidden_dim)
        self.values_fc = nn.Linear(hidden_dim, 1)

        self.encoder = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=4)
        self.classifier = nn.Linear(hidden_dim, num_classes)

    def generate_positional_encoding(self, input_size, hidden_dim):
        positional_encoding = torch.zeros(input_size, hidden_dim)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * -(math.log(10000.0) / hidden_dim))
        position = torch.arange(0, input_size).unsqueeze(1).float()
        positional_encoding[:, 0::2] = torch.sin(position * div_term[:hidden_dim // 2])
        if hidden_dim % 2 == 1:  # hidden_dim is odd
            positional_encoding[:, 1::2] = torch.cos(position * div_term[:hidden_dim // 2])
        else:  # hidden_dim is even
            positional_encoding[:, 1::2] = torch.cos(position * div_term[1:hidden_dim // 2 + 1])

        return positional_encoding.to(device)

    def scoring_additive(self, query, keys):
        query = query.repeat(1, self.num_inputs, 1)
        query = torch.tanh(self.query_fc(query))

        keys = torch.tanh(self.keys_fc(keys))

        score = torch.tanh(query + keys)
        score = self.values_fc(score)
        return score

    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, self.num_inputs, -1)

        positional_encoding = self.positional_encoding.unsqueeze(0).repeat(batch_size, 1, 1)

        x = x + positional_encoding

        scores = self.scoring_additive(x, x)
        attention_weights = torch.softmax(scores, dim=-1)

        encoded = self.encoder(x, attention_weights)

        output = encoded.mean(dim=1)
        output = self.classifier(output)
        return output

In [7]:
# Función de entrenamiento del modelo
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    acc_history = {"train": [], "val": []}
    losses = {"train": [], "val": []}

    best_acc = 0.0
    best_model_wts = None

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data).item()

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

            acc_history[phase].append(epoch_acc)
            
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)
    return model, acc_history, losses

In [8]:
# Definir los parámetros del modelo y entrenar
#input_dim = input_size * input_size
#num_inputs = 1
hidden_dim = 256
num_classes = 2


In [11]:

model = TransformerModel(input_dim=3, num_inputs=1, hidden_dim=hidden_dim, num_classes=num_classes) #input_size=input_size
model = model.to(device)

RuntimeError: The expanded size of the tensor (128) must match the existing size (127) at non-singleton dimension 1.  Target sizes: [1, 128].  Tensor sizes: [127]

In [95]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10

In [96]:
model, acc_history, losses = train_model(model, dataloaders_dict, criterion, optimizer, num_epochs)

Epoch 0/9
----------


RuntimeError: shape '[8, -1, 256]' is invalid for input of size 3600

In [None]:
subset_indices_test = torch.randperm(len(image_datasets['test']))[:int(0.1*len(image_datasets['test']))]

test_data_subset = torch.utils.data.Subset(image_datasets['test'], subset_indices_test)

test_dataloader = torch.utils.data.DataLoader(test_data_subset, batch_size=batch_size, shuffle=True, num_workers=4)

In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    all_labels = []
    all_preds = []

    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data).item()

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    test_loss = running_loss / len(dataloader.dataset)
    test_acc = running_corrects / len(dataloader.dataset)

    conf_matrix = confusion_matrix(all_labels, all_preds)

    return test_loss, test_acc, conf_matrix

In [None]:
test_loss, test_accuracy, confusion_matrix = evaluate_model(model, test_dataloader)

print('Test Loss: {:.4f}, Test Accuracy: {:.4f}'.format(test_loss, test_accuracy))
print('Confusion Matrix: \n{}'.format(confusion_matrix))