 El objetivo de este cuaderno es aprender los conceptos básicos de la programación de redes usando Torch a través de una pequeña demo de un clasificador con el conjunto de datos MNIST.

 [Link al dataset](https://github.com/teavanist/MNIST-JPG)


In [None]:
import torch
import numpy as np
import os
import cv2

# Dataset: https://github.com/teavanist/MNIST-JPG

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('device:', device)

El código tendrá 5 partes bien diferenciadas:
- Carga del *Dataset* y *Dataloaders*
- Creación del modelo
- Creación del *Training Loop*
- Entrenamiento
- Test

In [62]:
# Dataset

batch_size = 16

# Vamos a crear una clase dataset propia

from torch.utils.data import Dataset
from torchvision import transforms

class MNISTDataset(Dataset): # Heredamos de la clase dataset de pytorch
    def __init__(self, root, transform=None,partition = 'Train'): # Constructor de la clase
        self.root = root
        self.transform = transform
        self.partition = partition
        assert self.partition in ['Train','Test'], 'Partition must be Train or Test'
        self.paths = self.get_filepaths(root,partition)
        
        
    def get_filepaths(self,root,partition): # Función que devuelve los paths de las imágenes
        if partition == 'Train':
            root = os.path.join(root,partition)
            folders = os.listdir(root) # Lista con los nombres de los archivos
            folders = [os.path.join(root,file) for file in folders] # Añadimos a los nombres de los ar archivos la ruta a la carpeta 
            images = []
            for folder in folders:
                files = os.listdir(folder)
                files = [os.path.join(folder,file) for file in files]
                images += files
                
            return images
        if partition == 'Test':
            root = os.path.join(root,partition)
            folders = os.listdir(root) # Lista con los nombres de los archivos
            folders = [os.path.join(root,file) for file in folders] # Añadimos a los nombres de los ar archivos la ruta a la carpeta 
            images = []
            for folder in folders:
                files = os.listdir(folder)
                files = [os.path.join(folder,file) for file in files]
                images += files
        
            return images
        
    def __len__(self): # Devuelve el número de elementos del dataset
        return len(self.paths)
    
    def __getitem__(self, idx): # Devuelve un elemento concreto del dataset
        img_path = self.paths[idx]
        # leemos la imagen. Solo tiene un canal
        img = cv2.imread(img_path,0)
        img = img/255.0
    
        # Para la label de cada imagen tendremos que extraerla del path, es el nombre de la carpeta que la contiene
        
        label = int(img_path.split('\\')[-2]) # Si estamos en linux cambiar '\\' por '/'
        
        if self.transform:
            img = self.transform(img)
            
        return img, label
    
        
        
        
        

In [None]:
path_root = r'C:\Users\jisal\Desktop\Practicas_VC\Demo_TOrch\Data'

# Una vez definida la clase, crear el dataset es tan sencillo como instanciar la clase
transforms = transforms.Compose(
    [transforms.ToTensor()]
    )

train_dataset = MNISTDataset(path_root,partition='Train',transform=transforms)
test_dataset = MNISTDataset(path_root,partition='Test',transform=transforms)
 
# Podemos dividir el dataset de entrenamiento en train y validation1
# Hallamos la longitud de los datasets de train y test
val_partition = 0.1
n_val = int(len(train_dataset)*val_partition)
n_train = int(len(train_dataset)*(1-val_partition))

train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [n_train,n_val])

# Ahora construimos los dataloaders, que son los que se encargan de cargar los datos en el modelo
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) # drop_last elimina los batches que no tienen el tamaño especificado
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=True)


print('Longitud del dataset de entrenamiento:',len(train_dataset), 'Número de batches:',len(train_loader))
print('Longitud del dataset de validación:',len(val_dataset), 'Número de batches:',len(val_loader))
print('Longitud del dataset de test:',len(test_dataset), 'Número de batches:',len(test_loader))


In [None]:
import matplotlib.pyplot as plt

for x,y in train_loader:
    print(x.shape)
    print(y)
    # vamos a mostrar 3 imágenes para asegurar que hemos cargado correctamente los datos
    fig, axs = plt.subplots(1,3)

    for i in range(3):
        # Transform the tensor to numpy
        image = np.array(x[i])
        image = np.reshape(image, (image.shape[1], image.shape[2])) # Eliminamos el canal para mostrar bien la imagen
        axs[i].imshow(image, cmap='gray')
        axs[i].set_title(y[i].numpy())
        axs[i].axis('off')
        
    plt.show()
    break

In [78]:
# Creación del modelo

import torch.nn as nn
import torch.nn.functional as F

class CNN(nn.Module):
        # Constructor de la clase
    def __init__(self):
        
        super(CNN, self).__init__()
        self.covs = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.fcs = nn.Sequential(
        nn.Linear(in_features=64*5*5, out_features=128),
        nn.ReLU(),
        nn.Linear(in_features=128, out_features=10)
        )
        
        
    def forward(self, x):
        # reshape the input tensor to the correct shape
        x = x.float() # A torch le gustan los floats, no los doubles que eran antes
        x = x.view(x.size(0),1,28,28) # BxCxHxW
        x = self.covs(x)
        x = x.view(x.size(0),-1) # flatten
        x = self.fcs(x)
        return x
    
        
model = CNN().to(device) # ahora declaramos el modelo. Lo pasamos a la GPU sin está disponible.

In [None]:
# Vamos a comprobar que el modelo funciona correctamente
for patch in train_loader:
    x, y = patch
    print('Input shape:',x.shape)
    print('Output shape:',model(x).shape)
    break

In [None]:
#training loop
import torch.optim as optim
from tqdm import tqdm
def trainin_loop(model,train_loader,val_loader,epochs = 10,loss_fcn =  nn.CrossEntropyLoss(),optimizer=optim.SGD(model.parameters(),lr=1e-3)):

    for epoch in range(epochs):
        val_loss = 0
        training_loss = 0
        
        model.train() # modo entrenamiento        
        for x,y in tqdm(train_loader):
            
            x = x.to(device) # importante enviar los datos al dispositivo
            
            optimizer.zero_grad() # reiniciamos los gradientes
            y_pred = model(x) # realizamos la predicción con el modelo
            loss = loss_fcn(y_pred,y) # Calculamos la pérdida
            loss.backward() # Propagamos los gradientes por la red
            optimizer.step() # Actualizamos los pesos
            training_loss += loss.item()
            
        model.eval() # modo evaluación
        for x,y in tqdm(val_loader):
            x = x.to(device)
            
            
            y_pred = model(x)
            val_loss += loss_fcn(y_pred,y).item()

        print('Epoch:',epoch, 'Training loss:',training_loss/len(train_loader), 'Validation loss:',val_loss/len(val_loader))
        
    print('Training finished')

trainin_loop(model,train_loader,val_loader,epochs=5)

# save model 
torch.save(model.state_dict(), 'model.pth')




In [None]:
# Cargamos el modelo
model = CNN()
model.load_state_dict(torch.load('model.pth'))


In [None]:
def eval_model (model,test_loader):
    model.eval()
    correct = 0
    total = 0
    for x,y in tqdm(test_loader):
        x = x.to(device)
        y = y.to(device)
        y_pred = model(x)
        _, predicted = torch.max(y_pred.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()
    print('Accuracy:',100*correct/total)
    
eval_model(model,test_loader)
