In [7]:
# ATAC model specification

#MMVAE module
import torch
import torch.distributions as dist
import torch.nn as nn
import torch.nn.functional as F
from numpy import prod, sqrt
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image, make_grid

from utils import Constants
from vis import plot_embeddings, plot_kls_df
from .vae import VAE

#SCALE module
#import torch
#import torch.nn as nn
#import torch.nn.functional as F
#from torch.nn import init
#from torch.optim.lr_scheduler import MultiStepLR, ExponentialLR, ReduceLROnPlateau

#import time
#import math
#import numpy as np
#from tqdm import trange
#from itertools import repeat
#from sklearn.mixture import GaussianMixture

#from .layer import Encoder, Decoder, build_mlp, DeterministicWarmup
#from .loss import elbo, elbo_SCALE

from datasets import SingleCellDataset
from sklearn.preprocessing import MaxAbsScaler

ModuleNotFoundError: No module named 'utils'

In [None]:
# Constants
dataSize = torch.Size([43703,1]) #number of filtered peaks p0 Brain Cortex SNARE-seq
data_dim = int(prod(dataSize))
hidden_dim = 400

In [None]:
def extra_hidden_layer():
    return nn.Sequential(nn.Linear(hidden_dim, hidden_dim), nn.ReLU(True))

In [None]:
# Classes
class Enc(nn.Module):
    """ Generate latent parameters for ATAC-seq data. """

    def __init__(self, latent_dim, num_hidden_layers=1):
        super(Enc, self).__init__()
        modules = []
        modules.append(nn.Sequential(nn.Linear(data_dim, hidden_dim), nn.ReLU(True)))
        modules.extend([extra_hidden_layer() for _ in range(num_hidden_layers - 1)])
        self.enc = nn.Sequential(*modules)
        self.fc21 = nn.Linear(hidden_dim, latent_dim)
        self.fc22 = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        #e = self.enc(x.view(*x.size()[:-3], -1))  # flatten data
        e = self.enc(x)
        lv = self.fc22(e)
        return self.fc21(e), F.softmax(lv, dim=-1) * lv.size(-1) + Constants.eta


In [None]:
Enc = Enc(20,1)
Enc.enc

In [None]:
class ATAC(VAE):
    """ Derive a specific sub-class of a VAE for ATAC. """

    def __init__(self, params):
        super(ATAC, self).__init__(
            dist.Laplace,  # prior
            dist.Laplace,  # likelihood
            dist.Laplace,  # posterior
            Enc(params.latent_dim, params.num_hidden_layers),
            Dec(params.latent_dim, params.num_hidden_layers),
            params
        )
        grad = {'requires_grad': params.learn_prior}
        self._pz_params = nn.ParameterList([
            nn.Parameter(torch.zeros(1, params.latent_dim), requires_grad=False),  # mu
            nn.Parameter(torch.zeros(1, params.latent_dim), **grad)  # logvar
        ])
        self.modelName = 'atac'
        self.dataSize = dataSize
        self.llik_scaling = 1.

    @property
    def pz_params(self):
        return self._pz_params[0], F.softmax(self._pz_params[1], dim=1) * self._pz_params[1].size(-1)

    
    @staticmethod
    def getDataLoaders(batch_size, shuffle=True, device="cuda"):
        kwargs = {'num_workers': 1, 'pin_memory': True} if device == "cuda" else {}
        
        #SingleCellDatasetを移植
        path = '../data/'
        #batch_size = 32
        low = 0.01
        high = 0.9
        min_peaks = 100
        transpose = False 
        normalizer = MaxAbsScaler()
        
        dataset = SingleCellDataset(path, low=low, high=high, min_peaks=min_peaks,
                            transpose=transpose, transforms=[normalizer.fit_transform])
        
        train = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True, **kwargs)
        test = DataLoader(dataset, batch_size=batch_size, shuffle=False, drop_last=False, **kwargs)
        
        #tx = transforms.ToTensor()
        #train = DataLoader(datasets.MNIST('../data', train=True, download=True, transform=tx),
        #                  batch_size=batch_size, shuffle=shuffle, **kwargs)
        #test = DataLoader(datasets.MNIST('../data', train=False, download=True, transform=tx),
        #                  batch_size=batch_size, shuffle=shuffle, **kwargs)
        
        
        return train, test 

    def generate(self, runPath, epoch):
        N, K = 64, 9
        samples = super(MNIST, self).generate(N, K).cpu()
        # wrangle things so they come out tiled
        samples = samples.view(K, N, *samples.size()[1:]).transpose(0, 1)  # N x K x 1 x 28 x 28
        s = [make_grid(t, nrow=int(sqrt(K)), padding=0) for t in samples]
        save_image(torch.stack(s),
                   '{}/gen_samples_{:03d}.png'.format(runPath, epoch),
                   nrow=int(sqrt(N)))

    def reconstruct(self, data, runPath, epoch):
        recon = super(MNIST, self).reconstruct(data[:8])
        comp = torch.cat([data[:8], recon]).data.cpu()
        save_image(comp, '{}/recon_{:03d}.png'.format(runPath, epoch))

    def analyse(self, data, runPath, epoch):
        zemb, zsl, kls_df = super(MNIST, self).analyse(data, K=10)
        labels = ['Prior', self.modelName.lower()]
        plot_embeddings(zemb, zsl, labels, '{}/emb_umap_{:03d}.png'.format(runPath, epoch))
        plot_kls_df(kls_df, '{}/kl_distance_{:03d}.png'.format(runPath, epoch))