In [59]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import numpy as np
import matplotlib.pyplot as plt
import pickle
import os
import csv
from tqdm import tqdm
from sklearn.model_selection import train_test_split

In [5]:
import numpy as np

def sample_data_1():
    count = 100000
    rand = np.random.RandomState(0)
    return [[1.0, 2.0]] + rand.randn(count, 2) * [[5.0, 1.0]]

def sample_data_2():
    count = 100000
    rand = np.random.RandomState(0)
    return [[1.0, 2.0]] + (rand.randn(count, 2) * [[5.0, 1.0]]).dot([[np.sqrt(2) / 2, np.sqrt(2) / 2], [-np.sqrt(2) / 2, np.sqrt(2) / 2]])

def sample_data_3():
    count = 100000
    rand = np.random.RandomState(0)
    a = [[-1.5, 2.5]] + rand.randn(count // 3, 2) * 0.2
    b = [[1.5, 2.5]] + rand.randn(count // 3, 2) * 0.2
    c = np.c_[2 * np.cos(np.linspace(0, np.pi, count // 3)),
    -np.sin(np.linspace(0, np.pi, count // 3))]

    c += rand.randn(*c.shape) * 0.2
    data_x = np.concatenate([a, b, c], axis=0)
    data_y = np.array([0] * len(a) + [1] * len(b) + [2] * len(c))
    perm = rand.permutation(len(data_x))
    return data_x[perm], data_y[perm]

In [44]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [45]:
device

device(type='cpu')

In [6]:
data_1 = sample_data_1()
data_2 = sample_data_2()
data_3 = sample_data_3()

In [69]:
class VAEEncoder(nn.Module):
    def __init__(self, dim, lin_dim):
        super().__init__()
        self.dim = dim
        self.lin_dim=lin_dim
        self.linear = nn.Linear(dim, lin_dim)
        self.relu = nn.ReLU()
        
    def encode(self, x, muf, sigmaf):
        z = self.forward(x)
        mu = muf(z)
        sigma = sigmaf(z)
        return mu, torch.exp(sigma * 0.5)
        
    def forward(self, x):
        z = self.linear(x)
        z = self.relu(z)
        return z

class VAEDecoder(nn.Module):
    def __init__(self, dim, lin_dim):
        super().__init__()
        self.dim = dim
        self.lin_dim=lin_dim
        self.linear = nn.Linear(dim, lin_dim)
        self.relu = nn.ReLU()
        
    def decode(self, x, muf, sigmaf):
        z = self.forward(x)
        mu = muf(z)
        sigma = sigmaf(z)
        return mu, torch.exp(sigma * 0.5)        
        
    def forward(self, x):
        z = self.linear(x)
        z = self.relu(z)
        return z
        
class VAE(nn.Module):
    def __init__(self, latd=2, ind=2, lin_dim=64):
        super().__init__()
        self.latd = latd
        self.ind = ind
        self.encoder = VAEEncoder(ind, lin_dim)
        self.decoder = VAEDecoder(latd, lin_dim)
        self.mu_x = nn.Linear(lin_dim, ind)
        self.mu_z = nn.Linear(lin_dim, latd)
        self.sigma_x = nn.Linear(lin_dim, ind)
        self.sigma_z = nn.Linear(lin_dim, latd)
        
    def sample(self, mu, sigma):
        if not self.training:
            return mu
        standard = torch.distributions.Normal(torch.tensor([0.0]), torch.tensor([1.0])).sample(mu.shape).to(device).view(mu.shape)
        return mu + standard * sigma
    
    def forward(self, x):
        mu_z, sigma_z = self.encoder.encode(x, self.mu_z, self.sigma_z)
        z = self.sample(mu_z, sigma_z)
        mu_x, sigma_x = self.decoder.decode(z, self.mu_x, self.sigma_x)
        z = self.sample(mu_x, sigma_x)
        return mu_z, sigma_z, mu_x, sigma_x
        

In [70]:
class VAELoss(nn.Module):
    def __init__(self, kl_loss, cross_entropy):
        super().__init__()
        self.kl_loss = kl_loss
        self.cross_entropy = cross_entropy
        
    def forward(self, x, mu_x, sigma_x, mu_z, sigma_z):
        kl = self.kl_loss(mu_z, sigma_z)
        cross_entropy = self.cross_entropy(x, mu_x, sigma_x)
        return kl + cross_entropy

In [133]:
VAE()(torch.FloatTensor(np.array([[1.0, 1.0]])))

(tensor([[ 0.0682, -0.2141]], grad_fn=<AddmmBackward>),
 tensor([[0.9856, 0.7465]], grad_fn=<ExpBackward>),
 tensor([[0.3996, 0.1842]], grad_fn=<AddmmBackward>),
 tensor([[0.8363, 1.2958]], grad_fn=<ExpBackward>))

In [137]:
model = VAE()
opt = torch.optim.Adam(params=model.parameters())
loss = VAELoss(lambda x, y: 0, lambda x, y, z: 0)

In [153]:
def train(model, opt, criterion, data, n_epoch, batch_size):
    data_train, data_test = train_test_split(data, test_size=0.2, random_state=42)
    train_loader = torch.utils.data.DataLoader(data_train, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(data_test, batch_size=batch_size, shuffle=False)
    
    train_loss = []
    test_loss = []
    for epoch in tqdm(range(n_epoch)):
        train_loss_epoch = []
        test_loss_epoch = []
        for batch in train_loader:
            opt.zero_grad()
            batch = batch.float()
            batch = batch.to(device)
            mu_z, sigma_z, mu_x, sigma_x = model(batch)
            loss = criterion(batch, mu_z, sigma_z, mu_x, sigma_x)
            train_loss_epoch.append(loss.item())
            loss.backward()
            opt.step()
        train_loss.append(np.mean(train_loss_epoch))

In [154]:
train(model, opt, loss, data_1, 10, 10)



  0%|          | 0/10 [00:00<?, ?it/s][A[A

AttributeError: 'int' object has no attribute 'item'