# Importamos librerias

In [1]:
import torch
import torchvision
import os
import torch.nn as nn
import torch.optim as optim 
import torch.nn.functional as F
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy 

from torch.utils.data import Dataset
from torchvision.datasets import FashionMNIST
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.utils import save_image
from torchvision.transforms import ToTensor

# Punto A
Se debe implementar una red feed-forward auto-encoder con una capa oculta $n=64$ para aprender la función identidad con la base de datos Fashion-MNIST. 

In [None]:
# Descargamos los conjuntos de datos de entrenamiento y testeo desde la libreria Datasets de torch
train_data = datasets.FashionMNIST(
    root = "data",
    train= True, 
    download= True, 
    transform = ToTensor()
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform= ToTensor(),
)

# Se toma la subclase creada en los prácticos para poder redimensionar las imagénes y los labels
class CustomImageDataset(Dataset):
    def __init__(self,dataset):
        self.dataset=dataset
    def __len__(self):
        return len(self.dataset)   
    def __getitem__(self,i):
        image,label=self.dataset[i]
        label=torch.flatten(image) # Se reescribe el label original con una version achatada de la imagen dado
        # que en este trabajo no estamos clasificando las imagenes, simplemente las estamos reconstruyendo.
        return image,label

In [None]:
train_dataset = CustomImageDataset(train_data)
test_dataset = CustomImageDataset(test_data)

In [None]:
## Hiperparámtros
batch_size = 1000
epochs = 200

In [None]:
# Luego de descargar los datos ahora es necesario cargarlos especificando el tamaño del batch y con el 
# parámetro shuffle estamos permitiendo que las muestras sean aleatorias. 
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = True)

In [None]:
# Dimensiones de los tensores de las imágenes y las etiquetas en el conjunto de entrenamiento
train_imag, train_labels = next(iter(train_loader))
print(f"Feature batch shape: {train_imag.size()}")
print(f"Labels batch shape: {train_labels.size()}")

In [None]:
# Dimensiones de los tensores de las imágenes y las etiquetas en el conjunto de testeo
test_imag, test_labels = next(iter(test_loader))
print(f"Feature batch shape: {test_imag.size()}")
print(f"Labels batch shape: {test_labels.size()}")

In [None]:
# Esta función se utiliza para obtener el dispositivo disponible en mi computadora y para luego "guardar" el modelo ahi
def get_device():
    if torch.cuda.is_available():
        device = 'cuda:0'
    else:
        device = 'cpu'
    return device

In [None]:
# Se define la arquitectura de la red neuronal
class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        self.flatten = nn.Flatten()
        self.drop = nn.Dropout(0.1)
        self.relu = nn.ReLU()
        #encoder
        self.enc1 = nn.Linear(in_features=28*28, out_features= n)

        #decoder
        self.dec1 = nn.Linear(in_features=n, out_features= 28*28)
        
    def forward(self, x):
        x = self.flatten(x)
        x = self.enc1(x)
        x = self.drop(x)
        x = self.relu(x)
        x = self.dec1(x)
        return x

In [None]:
n = 64 #Tamaño capa intermedia
device = get_device()
AE_model = AE().to(device)
print(AE_model)

In [None]:
lr = 1 #tasa de aprendizaje
loss_fn = nn.MSELoss() #función de pérdida
optimizer = optim.SGD(AE_model.parameters(), lr = lr) #algoritmo optimizador

In [None]:
#se define la función entrenamiento y de testeo 
train_loss = []
test_loss = []
def train(dataset):
  running_loss = 0
  AE_model.train() #modo entrenamiento
  for batch, (X,y) in enumerate(dataset): #se itera por batch
    optimizer.zero_grad()

    pred = AE_model(X)
    
    loss = loss_fn(pred,y)
    
    loss.backward()
    
    optimizer.step()
    
    running_loss += loss.item()
  #Resultados de la función de pérdida, cada resultado se guarda en el vector vacío train_loss    
  running_loss /= len(train_loader)
  print("Train Loss: {}".format(running_loss))
  train_loss.append(running_loss)

def test(dataset):
  running_test_loss = 0
  AE_model.eval() #modo evaluación (o testeo)
  with torch.no_grad():
    for X, y in dataset:
      pred = AE_model(X)

      loss = loss_fn(pred, y)

      running_test_loss += loss.item()
  #Resultados de la función de pérdida, cada resultado se guarda en el vector vacío test_loss  
  running_test_loss /= len(dataset)
  print("Test Loss: {}".format(running_test_loss))
  test_loss.append(running_test_loss)

In [None]:
# Se entrena el modelo por 200 épocas
for epoch in range(epochs):
  print("Época {} de {}".format(epoch+1, epochs))
  train(train_loader)
  test(test_loader)

In [None]:
outputs = {}
  
# Se extrae el último batch del dataset de testeo 
img, _ = list(test_loader)[-1]

out = AE_model(img) # Se obtiene el output del modelo para este batch, es decir que vamos a ver como 
# el modelo reconstruye las imágenes 
  
# guardamos en un diccionario la imagen original y la predicha por el modelo
outputs['img'] = img
outputs['out'] = out
  
#Graficamos las imágenes 
counter = 1
val = outputs['out'].detach().numpy()

figure = plt.figure(figsize = (20, 10))  
# predicción de las 10 primeras imagenes del batch 
for idx in range(10):
    plt.subplot(2, 10, counter)
    plt.title("Reconstructed \n image n=64")
    plt.imshow(val[idx].reshape(28, 28), cmap='gray')
    plt.axis('off')
    counter += 1
  
  
# 10 primeras imágenes originales del batch
for idx in range(10):
    val = outputs['img']
    plt.subplot(2, 10, counter)
    plt.imshow(val[idx].reshape(28, 28), cmap='gray')
    plt.title("Original Image")
    plt.axis('off')
    counter += 1
  
plt.tight_layout()
plt.savefig('Img64.pdf')
plt.show()

# B