## Carga de bibliotecas

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import torch
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # Utilizará GPU si está disponible

# Construcción del dataset

In [None]:
class ConcentDataset(Dataset):
    
    #=====================================================
    def __init__(self, filename=None, transform=None, target_transform=None):
        
        
        data = pd.read_csv(filename)  # Reading data
        
        patterns = data.iloc[:,:-1].to_numpy()  # Extracting patterns and transforming to numpy
        labels = data.iloc[:,-1].to_numpy()  # Extracting labels and transforming to numpy
        labels[labels==-1] = 0  #  Replace "-1" labels with "0" (sigmoid function works in [0,1] range)
        
        self.patterns = torch.from_numpy(patterns).type(torch.FloatTensor)  # Transforming to torch
        self.labels = torch.from_numpy(labels).type(torch.FloatTensor)  # Transforming to torch
        
        self.transform = transform
        self.target_transform = target_transform
    
    #=====================================================
    def __len__(self):
        
        return len(self.labels)
    
    
    #=====================================================
    def __getitem__(self, idx):
        
        pattern = self.patterns[idx,:]  # Get pattern "idx"
        label = self.labels[idx]  # Get label for pattern "idx"
        
        #--------------------------------------
        if self.transform:
            pass
        if self.target_transform:
            pass
        #--------------------------------------
        
        return pattern, label

# Carga de los datasets

In [None]:
# CARGO DATASETS

train_data = ConcentDataset(filename='concent_train.csv')
val_data = ConcentDataset(filename='concent_valid.csv')
test_data = ConcentDataset(filename='concent_test.csv')

In [None]:
# GRAFICO PATRONES

fig,ax = plt.subplots(1, 3, figsize=(21,7))

for i in range(3):
    
    if i == 0:
        X = train_data.patterns
        Y = train_data.labels
        title = 'Train data'
    if i == 1:
        X = val_data.patterns
        Y = val_data.labels
        title = 'Validation data'
    if i == 2:
        X = test_data.patterns
        Y = test_data.labels
        title = 'Test data'
    
    colors = [f'C{int(y)}' for y in Y]
    
    ax[i].scatter(X[Y==0,0], X[Y==0,1], 30, c='b', label='Clase 0')
    ax[i].scatter(X[Y==1,0], X[Y==1,1], 30, c='y', label='Clase 1')
    
    ax[i].set_title(title)
    ax[i].set_xlabel(u'$X_{1}$', fontsize=14)
    ax[i].set_ylabel(u'$X_{2}$', fontsize=14)
    ax[i].grid(True)
    ax[i].legend(loc='best')

# Construcción del dataloader

In [None]:
train_dataloader = DataLoader(train_data, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=16, shuffle=False)

# Construcción del modelo neuronal

In [None]:
class MLP_desglosado(nn.Module):
    '''
    MLP con 2 capas: Entrada, salida.
    Este modelo explicita el flujo de información a través de la red.
    
    '''
    
    #===========================
    def __init__(self, n_inputs=2, layer1=3, n_outputs=1):
        
        super(MLP_desglosado, self).__init__()
        
        #===========================
        #BLOQUES DE CONSTRUCCION
        #===========================
        self.layer1 = nn.Linear(n_inputs, layer1, bias=True)  # ENTRADA: Inputs [features] --> neuronas_layer1
        self.layer2 = nn.Linear(layer1, n_outputs, bias=True)  # SALIDA: neuronas_layer2   --> Outputs [clases]
        self.sigmoid = nn.Sigmoid()  # Función de transferencia
    
    
    #===========================
    def forward(self, x):
        
        # CAPA 1
        y = self.layer1(x)  # Salida lineal
        y = self.sigmoid(y)  # Función no lineal
        
        # CAPA 2
        y = self.layer2(y)  # Salida lineal
        y = self.sigmoid(y)  # Función no lineal
        
        return y  # salida de la red

In [None]:
class MLP_bloque(nn.Module):
    '''
    MLP con 2 capas: Entrada, salida.
    Este modelo es equivalente al anterior. La diferencia está en que genera
    un "bloque" de procesamiento que puede ejecutarse como un único elemento.
    
    '''
    
    #===========================
    def __init__(self, n_inputs=2, layer1=3, n_outputs=1):
        
        super(MLP_bloque, self).__init__()
        
        self.net = nn.Sequential(nn.Linear(n_inputs, layer1, bias=True), # ENTRADA: Inputs [features] --> neuronas_layer1
                                 nn.Sigmoid(),  # Función de transferencia
                                 nn.Linear(layer1, n_outputs, bias=True), # SALIDA: neuronas_layer2   --> Outputs [clases]
                                 nn.Sigmoid()  # Función de transferencia
                                )
    
    
    #===========================
    def forward(self, x):
        
        y = self.net(x)
        
        return y  # salida de la red

# Pasos generales del entrenamiento

In [None]:
def train_step(model, dataloader, loss_criterion, device):
    
    model.train()  # Calcula gradientes
    
    cummulated_loss = 0
    
    for idx,(X,y) in enumerate(dataloader):

        #-----------------------------------------------------
        # Convierto los datos en tensores diferenciables
        #-----------------------------------------------------
        X = X.to(device)
        y = y.to(device)

        optimizer.zero_grad()  # Se limpia el caché del optimizador
        
        #----------------
        # Forward pass
        #----------------
        y_pred = model(X)

        #----------------
        # Compute Loss
        #----------------
        loss = loss_criterion(y_pred.squeeze(), y)
        
        cummulated_loss += loss.item()
        
        #----------------
        # Backward pass
        #----------------
        loss.backward()
        optimizer.step()
        
    #------------------------------------
    
    N_batches = idx+1  # numero batches, comenzando en 1
    cummulated_loss /= N_batches
    
    return cummulated_loss, model

In [None]:
def predict_step(model, dataloader, loss_criterion, device):
    
    model.eval()  # No calcula gradientes
    
    cummulated_loss = 0
    
    Y = torch.tensor([])
    Yp = torch.tensor([])
    
    for idx,(X,y) in enumerate(dataloader):
        
        Y = torch.hstack( (Y,y.flatten()) )
        
        #-----------------------------------------------------
        # Convierto los datos en tensores diferenciables
        #-----------------------------------------------------
        X = X.to(device)
        y = y.to(device)
        
        #----------------
        # Forward pass
        #----------------
        y_pred = model(X)
        
        Yp = torch.hstack( (Yp, y_pred.cpu().squeeze()) )

        #----------------
        # Compute Loss
        #----------------
        loss = loss_criterion(y_pred.squeeze(), y)
        
        cummulated_loss += loss.item()
    
    
    N_batches = idx+1  # numero batches, comenzando en 1
    cummulated_loss /= N_batches
    
    #------------------
    # UMBRAL
    #------------------
    umbral = 0.5
    
    Yp[Yp<umbral] = 0
    Yp[Yp >= umbral] = 1
    
    Acc = torch.sum(Yp == Y)/ len(Y)
    
    #------------------
    
    return cummulated_loss, Acc

## ENTRENAMIENTO

In [None]:
# INICIALIZO MODELO
model = MLP_desglosado()
#model = MLP_bloque()

model.to(device)


# DEFINO ESTRUCTURA PARA GUARDAR MEDIDAS
measures = {
            'trn_loss': [],
            'trn_acc': [],
            'val_loss': [],
            'val_acc': []
           }


#-------------------------------------------------------
# Definimos el criterio de error (métrica a minimizar)
# y trasladamos al device [CPU|GPU]
#-------------------------------------------------------
loss_criterion = nn.MSELoss(reduction='mean').to(device)

#-------------------------------------
# Definimos el optimizador a utilizar
#-------------------------------------
optimizer = optim.SGD(model.parameters(), lr=5e-2, momentum=0.9)
#optimizer = optim.Adam(model.parameters(), lr=5e-2)

# DEFINIMOS PARAMETROS
best_loss = 1E6
best_model = model.state_dict()
best_epoch = 0
counter = 0

MAX_EPOCHS = 1000  # DEFINIMOS MAXIMO DE EPOCAS DE ENTRENAMIENTO
epoch = 0  # INICIALIZAMOS CONTADOR DE EPOCAS

In [None]:
while (epoch < MAX_EPOCHS) and (counter < 20):
    
    #------------
    # TRAIN
    #------------
    trn_loss, model = train_step(model, train_dataloader, loss_criterion, device)
    
    trn_loss, trn_acc = predict_step(model, train_dataloader, loss_criterion, device)  # TRAIN EVALUATION
    val_loss, val_acc = predict_step(model, val_dataloader, loss_criterion, device)  # VALIDATION EVALUATION
    
    measures['trn_loss'].append(trn_loss)
    measures['trn_acc'].append(trn_acc)
    
    measures['val_loss'].append(val_loss)
    measures['val_acc'].append(val_acc)
    

    #=================================
    if (val_loss < best_loss):
        
        # Actualizo best_loss
        best_loss = val_loss
        best_epoch = epoch
        
        # Guardo mejor modelo
        best_model = model.state_dict()
        counter = 0
    
    else:
        counter += 1
    #=================================

    if (epoch%10) == 0:
        print(f'Epoch {epoch}: trn_loss: {trn_loss:.5}\t val_loss: {val_loss:.5}\t trn_acc: {trn_acc}\t val_acc: {val_acc}\t counter: {counter}')
    
    # Avanzo 1 epoca
    epoch += 1


print(f'Best model:\n\nepoch: {best_epoch}\nloss: {best_loss:.5}')

# GUARDO MEJOR MODELO
torch.save(best_model, 'best_model_pytorch.pt')

In [None]:
fig, ax = plt.subplots(1,2,figsize=(22,6))

ax[0].plot(measures['trn_loss'], 'C2', label='trn')
ax[0].plot(measures['val_loss'], 'C3', label='val')
ax[0].set_title('Loss')
ax[0].set_xlabel('epochs')
ax[0].set_ylabel('MSE')
ax[0].grid(True)
ax[0].legend()

ax[1].plot(measures['trn_acc'], 'C2', label='trn')
ax[1].plot(measures['val_acc'], 'C3', label='val')
ax[1].set_title('Accuracy')
ax[1].set_xlabel('epochs')
ax[1].set_ylabel('Acc')
ax[1].grid(True)
ax[1].legend();

### PREDICCIONES EN TEST

In [None]:
model = MLP_desglosado()
#model = MLP_bloque()

model.to(device)
model.load_state_dict(best_model)

tst_loss, tst_acc = predict_step(model, test_dataloader, loss_criterion, device)  # TEST EVALUATION

print(f'\nAcc: {tst_acc:.4}\n')

## Dibujo patrones clasificados

In [None]:
#------------------
# UMBRAL
#------------------
umbral = 0.5

#-------------------------------------

TRAIN = {'X': None,
         'clase': []}

Y = torch.tensor([])
Yp = torch.tensor([])

for idx,(X,y) in enumerate(train_dataloader):
    
    if TRAIN['X'] is not None:
      TRAIN['X'] = np.vstack((TRAIN['X'], X.numpy()))  # Guardo patrones
    else:
      TRAIN['X'] = X.numpy()

    Y = torch.hstack( (Y,y.flatten()) )
    
    X = X.to(device)
    y = y.to(device)
    y_pred = model(X)
    Yp = torch.hstack( (Yp, y_pred.cpu().squeeze()) )

Yp[Yp<umbral] = 0
Yp[Yp >= umbral] = 1

Yp[Yp!=Y] = -1

TRAIN['clase'] = Yp.detach().cpu().numpy()

#-------------------------------------

VAL = {'X': None,
       'clase': []}

Y = torch.tensor([])
Yp = torch.tensor([])

for idx,(X,y) in enumerate(val_dataloader):
    
    if VAL['X'] is not None:
      VAL['X'] = np.vstack((VAL['X'], X.numpy()))  # Guardo patrones
    else:
      VAL['X'] = X.numpy()

    Y = torch.hstack( (Y,y.flatten()) )
    
    X = X.to(device)
    y = y.to(device)
    y_pred = model(X)
    Yp = torch.hstack( (Yp,y_pred.cpu().squeeze()) )

Yp[Yp<umbral] = 0
Yp[Yp >= umbral] = 1
Yp[Yp!=Y] = -1

VAL['clase'] = Yp.detach().cpu().numpy()

#-------------------------------------

TEST = {'X': None,
         'clase': []}

Y = torch.tensor([])
Yp = torch.tensor([])

for idx,(X,y) in enumerate(test_dataloader):
    
    if TEST['X'] is not None:
      TEST['X'] = np.vstack((TEST['X'], X.numpy()))  # Guardo patrones
    else:
      TEST['X'] = X.numpy()

    Y = torch.hstack( (Y,y.flatten()) )
    
    X = X.to(device)
    y = y.to(device)
    y_pred = model(X)
    Yp = torch.hstack( (Yp,y_pred.cpu().squeeze()) )

Yp[Yp<umbral] = 0
Yp[Yp >= umbral] = 1
Yp[Yp!=Y] = -1

TEST['clase'] = Yp.detach().cpu().numpy()

In [None]:
fig, ax = plt.subplots(1,3,figsize=(24,8))

# TRAIN
ax[0].scatter(TRAIN['X'][TRAIN['clase']==0,0],
              TRAIN['X'][TRAIN['clase']==0,1], 30, 'b', label='Clase 0')

ax[0].scatter(TRAIN['X'][TRAIN['clase']==1,0],
              TRAIN['X'][TRAIN['clase']==1,1], 30, 'y', label='Clase 1')

ax[0].scatter(TRAIN['X'][TRAIN['clase']==-1,0],
              TRAIN['X'][TRAIN['clase']==-1,1], 30, 'r', marker='s', label='Error')

ax[0].set_title('Train data', fontsize=16)
ax[0].set_xlabel('$X_{1}$', fontsize=14)
ax[0].set_ylabel('$X_{2}$', fontsize=14)
ax[0].set_xlim([-0.05, 1.05])
ax[0].set_ylim([-0.05, 1.05])
ax[0].grid(True)
value = (TRAIN['clase'] != -1).sum()/TRAIN['clase'].shape[0]
ax[0].text(0.8, 0, f'Acc: {value:.3}', bbox=dict(facecolor='grey', alpha=0.25), fontsize=12)
ax[0].legend(loc='best')


# VALIDATION
ax[1].scatter(VAL['X'][VAL['clase']==0,0],
              VAL['X'][VAL['clase']==0,1], 30, 'b', label='Clase 0')

ax[1].scatter(VAL['X'][VAL['clase']==1,0],
              VAL['X'][VAL['clase']==1,1], 30, 'y', label='Clase 1')

ax[1].scatter(VAL['X'][VAL['clase']==-1,0],
              VAL['X'][VAL['clase']==-1,1], 30, 'r', marker='s', label='Error')

ax[1].set_title('Validation data', fontsize=16)
ax[1].set_xlabel('$X_{1}$', fontsize=14)
ax[1].set_ylabel('$X_{2}$', fontsize=14)
ax[1].set_xlim([-0.05, 1.05])
ax[1].set_ylim([-0.05, 1.05])
ax[1].grid(True)
value = (VAL['clase'] != -1).sum()/VAL['clase'].shape[0]
ax[1].text(0.8, 0, f'Acc: {value:.3}', bbox=dict(facecolor='grey', alpha=0.25), fontsize=12)
ax[1].legend(loc='best')


#TEST
ax[2].scatter(TEST['X'][TEST['clase']==0,0],
              TEST['X'][TEST['clase']==0,1], 30, 'b', label='Clase 0')

ax[2].scatter(TEST['X'][TEST['clase']==1,0],
              TEST['X'][TEST['clase']==1,1], 30, 'y', label='Clase 1')

ax[2].scatter(TEST['X'][TEST['clase']==-1,0],
              TEST['X'][TEST['clase']==-1,1], 50, 'r', marker='s', label='Error')

ax[2].set_title('Test data', fontsize=16)
ax[2].set_xlabel('$X_{1}$', fontsize=14)
ax[2].set_ylabel('$X_{2}$', fontsize=14)
ax[2].set_xlim([-0.05, 1.05])
ax[2].set_ylim([-0.05, 1.05])
ax[2].grid(True)
value = (TEST['clase'] != -1).sum()/TEST['clase'].shape[0]
ax[2].text(0.8, 0, f'Acc: {value:.3}', bbox=dict(facecolor='grey', alpha=0.25), fontsize=12)
ax[2].legend(loc='best');