# Setup

In [34]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [35]:
cd /content/drive/MyDrive/generative_deep_learning

/content/drive/MyDrive/generative_deep_learning


In [36]:
# !git clone -b tensorflow_2 https://github.com/davidADSP/GDL_code.git

In [37]:
cd GDL_code

/content/drive/MyDrive/generative_deep_learning/GDL_code


# Import

In [38]:
import os
from utils.loaders import load_mnist

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm

In [39]:
DEVICE= 'cuda:0' if torch.cuda.is_available() else 'cpu'
LEARNING_RATE = 0.0005
BATCH_SIZE = 32
EPOCHS = 200
print(DEVICE)

cuda:0


In [40]:
(x_train, y_train), (x_test, y_test) = load_mnist()

# Dataset

In [41]:
def keras_to_torch_tensor(array):
    array = array.transpose(0, 3, 1, 2) # keras: N x H x W x C -> torch: N x C x H x W
    array = torch.tensor(array, dtype=torch.float32).to(DEVICE)
    return array

def y_to_torch_tensor(array):
    return torch.tensor(array, dtype=torch.float32).unsqueeze(-1).to(DEVICE)

x_train = keras_to_torch_tensor(x_train)
y_train = y_to_torch_tensor(y_train)

x_test = keras_to_torch_tensor(x_test)
y_test = y_to_torch_tensor(y_test)

print(x_train.shape)

torch.Size([60000, 1, 28, 28])


In [42]:
train = torch.utils.data.TensorDataset(x_train, y_train)
test = torch.utils.data.TensorDataset(x_test, y_test)

# Model

In [43]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.l_relu1 = torch.nn.LeakyReLU()
        self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=1)
        self.l_relu2 = torch.nn.LeakyReLU()
        self.conv3 = torch.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=2, padding=1)
        self.l_relu3 = torch.nn.LeakyReLU()
        self.conv4 = torch.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.l_relu4 = torch.nn.LeakyReLU()
        self.ln = nn.Linear(3136, 2)
        
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.l_relu1(x)
        x = self.conv2(x)
        x = self.l_relu2(x)
        x = self.conv3(x)
        x = self.l_relu3(x)
        x = self.conv4(x)
        x = self.l_relu4(x)
        x = torch.flatten(x, start_dim=1)
        x = self.ln(x)
        return x

In [44]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.ln = nn.Linear(2, 3136)
        self.conv1 = torch.nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.l_relu1 = torch.nn.LeakyReLU()
        self.conv2 = torch.nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=2, stride=2)
        self.l_relu2 = torch.nn.LeakyReLU()
        self.conv3 = torch.nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=2, stride=2)
        self.l_relu3 = torch.nn.LeakyReLU()
        self.conv4 = torch.nn.ConvTranspose2d(in_channels=64, out_channels=1, kernel_size=3, stride=1, padding=1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.ln(x)
        x = x.reshape(-1, 64, 7, 7)
        x = self.conv1(x)
        x = self.l_relu1(x)
        x = self.conv2(x)
        x = self.l_relu2(x)
        x = self.conv3(x)
        x = self.l_relu3(x)
        x = self.conv4(x)
        x - self.sigmoid(x)
        return x

In [45]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Loss

In [46]:
def RMSELoss(yhat,y):
    return torch.sqrt(torch.mean((yhat-y)**2))

# Train

In [47]:
def train_fn(train):
    model = AutoEncoder()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    loss_fn = RMSELoss
    train_loader = torch.utils.data.DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)
    
    for epoch in range(EPOCHS):
        model.to(DEVICE)
        model.train()
        avg_loss = 0.
        
        for x_batch, y_batch in tqdm(train_loader, disable=True):
            y_pred = model(x_batch)
            loss = loss_fn(y_pred, x_batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            avg_loss += loss.item() / len(train_loader)
        print(f'Epoch {epoch + 1}/{EPOCHS} \t loss={avg_loss}')
    return model

In [None]:
model = train_fn(train)

In [None]:
torch.save(model.state_dict(), '../output/model_03_01.pth')