In [45]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch.nn as nn
import torch.optim as optim

## Definimos la clase Dataset 

In [46]:
class CustomImageDataset(Dataset):
    def __init__(self, directory, transform=None):
        self.directory = directory
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        # Obtener las clases (subdirectorios) y asignar un número
        self.class_names = sorted(os.listdir(directory))
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.class_names)}
        
        # Recorremos las carpetas y archivos
        for class_name in self.class_names:
            class_path = os.path.join(directory, class_name)
            if os.path.isdir(class_path):
                for file_name in os.listdir(class_path):
                    if file_name.endswith(('.jpg', '.png')):  # Filtra los tipos de archivo
                        self.image_paths.append(os.path.join(class_path, file_name))
                        self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        img = Image.open(img_path)
        
        if self.transform:
            img = self.transform(img)
        
        return img, label

In [47]:
original_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Redimensiona la imagen
    transforms.ToTensor(),  # Convierte la imagen a un tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normaliza la imagen
])

augmented_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Redimensiona la imagen
    transforms.RandomHorizontalFlip(),  # Aplica un flip horizontal aleatorio
    transforms.RandomRotation(30),  # Rota aleatoriamente la imagen entre -30 y 30 grados
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # Cambia aleatoriamente el brillo, el contraste, la saturación y el matiz
    transforms.ToTensor(),  # Convierte la imagen a un tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normaliza la imagen
])

In [48]:
train_original_dataset = CustomImageDataset(directory='archive/train', transform=original_transform)
train_augmented_dataset = CustomImageDataset(directory='archive/train', transform=augmented_transform)

valid_original_dataset = CustomImageDataset(directory='archive/valid', transform=original_transform)

test_original_dataset = CustomImageDataset(directory='archive/test', transform=original_transform)

In [49]:
train_original_loader = DataLoader(dataset=train_original_dataset, batch_size=32, shuffle=True)
train_augmented_loader = DataLoader(dataset=train_augmented_dataset, batch_size=32, shuffle=True)

valid_original_loader = DataLoader(dataset=valid_original_dataset, batch_size=32, shuffle=False)

test_original_loader = DataLoader(dataset=test_original_dataset, batch_size=32, shuffle=False)

In [50]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = (
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
            if in_channels != out_channels
            else nn.Identity()
        )
    
    def forward(self, x):
        shortcut = self.shortcut(x)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += shortcut
        x = F.relu(x)
        return x

class MyResNet(nn.Module):
    def __init__(self, input_shape=(3, 224, 224), num_classes=100):
        super(MyResNet, self).__init__()
        
        self.conv1 = nn.Conv2d(input_shape[0], 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Bloques residuales reducidos
        self.res_block1 = ResidualBlock(32, 64, stride=2)
        self.res_block2 = ResidualBlock(64, 128, stride=2)
        
        # Capa completamente conectada
        self.fc1 = None  # Se inicializará dinámicamente
        self.fc2 = nn.Linear(256, 128)
        self.fc_out = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.res_block1(x)
        x = self.maxpool(x)

        x = self.res_block2(x)
        x = self.maxpool(x)

        x = x.view(x.size(0), -1)  # Aplanar el tensor

        if self.fc1 is None:
            self.fc1 = nn.Linear(x.shape[1], 256).to(x.device)

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc_out(x)
        return x

# Crear el modelo reducido
model = MyResNet(input_shape=(3, 224, 224), num_classes=10)





In [51]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
model.to(device)
print(model)


cpu
MyResNet(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (res_block1): ResidualBlock(
    (conv1): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (shortcut): Conv2d(32, 64, kernel_size=(1, 1), stride=(2, 2))
  )
  (res_block2): ResidualBlock(
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2

In [52]:
# Loss function is CrossEntropyLoss
loss_fn = nn.CrossEntropyLoss()

# Learning rate is 0.001
learning_rate = 1e-3
batch_size = 64
# Optimize using Adam algorithm
optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)

In [53]:
def train_step(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch_idx, (X, y) in enumerate(dataloader):
        # Move tensors to the configured device
        X, y = X.to(device), y.to(device)
        # Compute prediction and loss
        logits = model(X)
        loss = loss_fn(logits, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        
        loss = loss.item()
        current = batch_idx * len(X) + len(X)
        print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [54]:
def validation_step(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            # Move tensors to device
            X, y = X.to(device), y.to(device)
            logits = model(X)
            test_loss += loss_fn(logits, y).item()
            correct += (logits.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Validation Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [55]:
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_step(train_original_loader, model, loss_fn, optimizer)
    validation_step(valid_original_loader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.361611  [   32/ 1535]
loss: 2.364508  [   64/ 1535]
loss: 2.292994  [   96/ 1535]
loss: 2.268211  [  128/ 1535]


KeyboardInterrupt: 