Q. Using a deep learning algorithm of your choice, learn the representation of dark matter in the strong gravitational lensing images provided using PyTorch.
The goal is to use this learned representation for anomaly detection, hence you should pick the most appropriate algorithm and discuss your strategy.   
   
A. Considering the purpose is anomaly detection, I choose Autoencoder as the most appropriate method, as discussed in [1]. I think Conditional VAE (CVAE)[5] especially suits this purpose.   
Deep neural network architecture is known to function relatively better in tasks about images than MLP(multilayer perception), and in the early stage Convolutional AutoEncoder(hereafter CAE)[2] and Variational AutoEncoder(VAE)[8] was the most competent method.
Of cource, it is said that CNN architecture in general performs well, speaking of image processing.  
CAE incorpolates CNN's algorithm into itself, and according to [3],[4] (written in Japnanese), it clearly shows that CAE overwhelms other MLP algorithms.   
VAE incorpolates probability distribution into latent variable z, and it also performs as well as CAE does.
These days, lots of methods derived from these, and I noticed CVAE is the best, which add supervising features to VAE. it is compared with lots of other autoencoder methods on purpose of analysis on astronomical images[6], and prove that the latent variables obtained with CVAE preserve more information and are more discriminative towards real astronomical transients.(SCAE[7] did well as well, but slightly CVAE has an advantage of it)    
This paper was published in April 2018, and it is the most recent paper applying autoencoder to processing astronomical images.(According to my survey on Google Scholor)
In addition to that, CVAE is implementable on laptop, whereas other methods sometimes need a bulk of computer properties(such as [9].)

Using such autoencoders, anomaly detection will be easy.(in future work I can impliment heat map which clearly tells anomalies from norms [10] )

According to Hold-out method, I devided images into train(3001 images) and test(2000 images).  

Of cource, CNN could be used because this problem can be taken as a classification problem(so no_sub images provided)

About optimizer, I use Adam because this is one of the most popular optimizer.


in inplementation, I refered to this site:  
https://graviraja.github.io/conditionalvae/#

----

[1]Deep Learning the Morphology of Dark Matter Substructure, pp8
https://arxiv.org/pdf/1909.07346.pdf

[2]Deep Clustering with Convolutional Autoencoders
https://link.springer.com/chapter/10.1007/978-3-319-70096-0_39

[3]PytorchによるAutoEncoder Familyの実装(Implementation of various Autoencoders using PyTorch)
https://elix-tech.github.io/ja/2016/07/17/autoencoder.html

[4]様々なオートエンコーダによる異常検知(Anomaly Detection with lots of Autoencoders)
https://sinyblog.com/deaplearning/auto_encoder_001/

[5]Semi-Supervised Learning with Deep Generative Models (CVAE)
https://arxiv.org/abs/1406.5298

[6]Latent representations of transient candidates from an astronomical image difference pipeline using Variational Autoencoders
https://pdfs.semanticscholar.org/57ce/a9520008706b8cb190d94513e695068210cd.pdf

[7]Convolutional Sparse Autoencoders for Image Classification (SCAE)
https://ieeexplore.ieee.org/document/7962256

[8]Auto-Encoding Variational Bayes(VAE)
https://arxiv.org/pdf/1312.6114.pdf

[9]Improving Variational Autoencoder with Deep Feature Consistent and Generative Adversarial Training (GAN)
https://arxiv.org/pdf/1906.01984.pdf

[10]Anomaly Manufacturing Product Detection using Unregularized Anomaly Score on Deep Generative Models
https://confit.atlas.jp/guide/event-img/jsai2018/2A1-03/public/pdf?type=in
https://qiita.com/shinmura0/items/811d01384e20bfd1e035

### CVAE

In [7]:
from __future__ import print_function, division
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, utils

import matplotlib.pyplot as plt
import pandas as pd
from skimage import io, transform
import numpy as np

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [9]:
BATCH_SIZE = 128        # number of data points in each batch
N_EPOCHS = 10           # times to run the model on complete data
INPUT_DIM = 150 * 150     # size of each input
HIDDEN_DIM = 256        # hidden dimension
LATENT_DIM = 50         # latent vector dimension
lr = 1e-3               # learning rate

In [44]:
data_transform = transforms.Compose([transforms.RandomSizedCrop(INPUT_DIM), transforms.ToTensor()])

train_dataset = datasets.ImageFolder(root='./lenses/train', #3001 images each
                                           transform=data_transform)
test_dataset = datasets.ImageFolder(root='./lenses/test', #2000 images each
                                           transform=data_transform)

In [45]:
train_iterator = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_iterator = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
def idx2onehot(idx, n=N_CLASSES):

    assert idx.shape[1] == 1
    assert torch.max(idx).item() < n

    onehot = torch.zeros(idx.size(0), n)
    onehot.scatter_(1, idx.data, 1)

    return onehot

In [46]:
class Encoder(nn.Module):
    ''' This the encoder part of VAE
    '''
    def __init__(self, input_dim, hidden_dim, z_dim):
        '''
        Args:
            input_dim: A integer indicating the size of input (in case of MNIST 28 * 28).
            hidden_dim: A integer indicating the size of hidden dimension.
            z_dim: A integer indicating the latent dimension.
        '''
        super().__init__()

        self.linear = nn.Linear(input_dim, hidden_dim)
        self.mu = nn.Linear(hidden_dim, z_dim)
        self.var = nn.Linear(hidden_dim, z_dim)

    def forward(self, x):
        # x is of shape [batch_size, input_dim]

        hidden = F.relu(self.linear(x))
        # hidden is of shape [batch_size, hidden_dim]
        z_mu = self.mu(hidden)
        # z_mu is of shape [batch_size, latent_dim]
        z_var = self.var(hidden)
        # z_var is of shape [batch_size, latent_dim]

        return z_mu, z_var

In [47]:
# encoder
encoder = Encoder(INPUT_DIM, HIDDEN_DIM, LATENT_DIM)

# decoder
decoder = Decoder(LATENT_DIM, HIDDEN_DIM, INPUT_DIM)

# vae
model = VAE(encoder, decoder).to(device)

# optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)

In [48]:
def train():
    # set the train mode
    model.train()

    # loss of the epoch
    train_loss = 0

    for i, (x, _) in enumerate(train_iterator):
        # reshape the data into [batch_size, 784]
        x = x.view(BATCH_SIZE*3, 150 * 150)
        x = x.to(device)

        # update the gradients to zero
        optimizer.zero_grad()

        # forward pass
        x_sample, z_mu, z_var = model(x)

        # reconstruction loss
        recon_loss = F.binary_cross_entropy(x_sample, x, size_average=False)

        # kl divergence loss
        kl_loss = 0.5 * torch.sum(torch.exp(z_var) + z_mu**2 - 1.0 - z_var)

        # total loss
        loss = recon_loss + kl_loss

        # backward pass
        loss.backward()
        train_loss += loss.item()

        # update the weights
        optimizer.step()

    return train_loss


In [49]:

def test():
    # set the evaluation mode
    model.eval()

    # test loss for the data
    test_loss = 0

    # we don't need to track the gradients, since we are not updating the parameters during evaluation / testing
    with torch.no_grad():
        for i, (x, _) in enumerate(test_iterator):
            # reshape the data
            x = x.view(BATCH_SIZE*3, 150 * 150)
            x = x.to(device)

            # forward pass
            x_sample, z_mu, z_var = model(x)

            # reconstruction loss
            recon_loss = F.binary_cross_entropy(x_sample, x, size_average=False)

            # kl divergence loss
            kl_loss = 0.5 * torch.sum(torch.exp(z_var) + z_mu**2 - 1.0 - z_var)

            # total loss
            loss = recon_loss + kl_loss
            test_loss += loss.item()

    return test_loss


In [None]:
best_test_loss = float('inf')

for e in range(N_EPOCHS):

    train_loss = train()
    test_loss = test()

    train_loss /= len(train_dataset)
    test_loss /= len(test_dataset)

    print(f'Epoch {e}, Train Loss: {train_loss:.2f}, Test Loss: {test_loss:.2f}')

    if best_test_loss > test_loss:
        best_test_loss = test_loss
        patience_counter = 1
    else:
        patience_counter += 1

    if patience_counter > 3:
        break

In [None]:
# sample and generate a image
z = torch.randn(1, LATENT_DIM).to(device)
reconstructed_img = model.dec(z)
img = reconstructed_img.view(150, 150).data

print(z.shape)
print(img.shape)
plt.figure()
plt.imshow(img, cmap='gray')
plt.show()