In [1]:
import torch
from torch import nn, cuda, optim, tensor
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np

In [2]:
test_dataset = datasets.MNIST(root='../app/mnist_data/', train=False, transform=transforms.ToTensor(), download=False)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=128, shuffle=False)

In [3]:
class Encoder(nn.Module):

    def __init__(self, input_size, latent_size, num_neurons):
        super(Encoder, self).__init__()

        self.layers = nn.ModuleList()

        old_out = input_size
        for layer_size in num_neurons:
            new_layer = nn.Sequential(
                nn.Linear(old_out, layer_size),
                nn.ReLU(True),
            )
            new_layer = nn.DataParallel(new_layer)
            self.layers.append(new_layer)

            old_out = layer_size

        self.latent_layer = nn.Sequential(
            nn.Linear(old_out, latent_size),
            nn.ReLU(True),
        )
        self.latent_layer = nn.DataParallel(self.latent_layer)

    def forward(self, x):

        out = x
        for l in self.layers:
            out = l(out)

        return self.latent_layer(out)

In [4]:
class Decoder(nn.Module):

    def __init__(self, input_size, latent_size, num_neurons):
        super(Decoder, self).__init__()

        self.layers = nn.ModuleList()

        old_out = latent_size
        for layer_size in num_neurons:
            new_layer = nn.Sequential(
                nn.Linear(old_out, layer_size),
                nn.ReLU(True),
            )
            new_layer = nn.DataParallel(new_layer)
            self.layers.append(new_layer)

            old_out = layer_size

        self.latent_layer = nn.Sequential(
            nn.Linear(old_out, input_size),
            nn.ReLU(True),
        )
        self.latent_layer = nn.DataParallel(self.latent_layer)

    def forward(self, x):

        out = x
        for l in self.layers:
            out = l(out)

        return self.latent_layer(out)

In [5]:
class EncoderDecoder(nn.Module):

    def __init__(self, encoder, decoder):
        super(EncoderDecoder, self).__init__()

        self.encoder = encoder
        self.decoder = decoder

    def forward(self, state):
        latent = self.encoder(state)
        output = self.decoder(latent)
        return output

In [6]:
class AutoEncoder:

    def __init__(self, n_components=2, num_neurons=[200, 100], num_epochs=10):

        self.n_components = n_components
        self.num_neurons = num_neurons
        self.num_epochs = num_epochs

    def fit_transform(self, data):

        num_epochs = self.num_epochs
        batch_size = 100
        learning_rate = 1e-3
        criterion = nn.MSELoss()

        input_size = data.shape[1]

        self.encoder = Encoder(input_size, self.n_components, self.num_neurons)
        self.decoder = Decoder(input_size, self.n_components, self.num_neurons)

        if cuda.is_available():
            self.encoder = self.encoder.cuda()
            self.decoder = self.decoder.cuda()

        self.autoencoder = EncoderDecoder(self.encoder, self.decoder)

        if cuda.is_available():
            self.autoencoder = self.autoencoder.cuda()

        self.model = self.autoencoder

        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            for i in range(int(data.shape[0] / batch_size)):
                batch = data[i * batch_size:(i + 1) * batch_size]
                batch = tensor(batch, dtype=torch.float)
                if cuda.is_available():
                    batch = batch.cuda()
                # ===================forward=====================
                output = self.model(batch)
                loss = criterion(output, batch)
                # ===================backward====================
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        data = tensor(data, dtype=torch.float)
        if cuda.is_available():
            data = data.cuda()
        output = self.encoder(data)
        if cuda.is_available():
            output = output.cpu()
        return output.detach().numpy()
    
    def img_fn(self):
        # obtain one batch of test images
        dataiter = iter(test_loader)
        images, labels = dataiter.next()

        with torch.no_grad():
            images_flatten = images.view(images.size(0), -1)
            images_flatten = images_flatten.cuda()
            # get sample outputs
            output = self.model(images_flatten)
            # prep images for display
            return output, images.numpy()

In [9]:
if __name__ == "__main__":
    import timeit
    import torchvision
    import torch
    from torchvision import datasets, transforms

    import numpy as np

    dataset = torchvision.datasets.MNIST(
        './data/', download=True, transform=transforms.ToTensor())
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=10000000)
    points = [1, 2, 3, 4, 5]

    print('Start projection training')
    ae = AutoEncoder()
    for data in dataloader:
        data = data[0].numpy()
        data = data[points]
        data = data.reshape((data.shape[0], -1))
        data = data.astype(np.float32)

        start_time = timeit.default_timer()
        dim_reducted = ae.fit_transform(data)
        print('Training done', timeit.default_timer() - start_time)
        #print(dim_reducted)

Start projection training
Training done 3.9676811520002957


In [10]:
dim_reducted

array([[0.07226029, 0.1304933 ],
       [0.08331347, 0.10019022],
       [0.06701024, 0.07883672],
       [0.09453031, 0.16491795],
       [0.10166308, 0.11615867]], dtype=float32)

In [None]:
decoded, images = ae.img_fn()
# output is resized into a batch of images
output = decoded.view(128, 1, 28, 28)
# use detach when it's an output that requires_grad
output = output.detach().cpu().numpy()

# plot the first ten input images and then reconstructed images
fig, axes = plt.subplots(nrows=2, ncols=10, sharex=True, sharey=True, figsize=(25,4))

# input images on top row, reconstructions on bottom
for images, row in zip([images, output], axes):
    for img, ax in zip(images, row):
        ax.imshow(np.squeeze(img), cmap='gray')
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)