In [1]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import datasets
from torch.utils.data import DataLoader
import numpy
import matplotlib.pyplot as plt
import matplotlib

print(matplotlib.__version__)

3.5.1


In [49]:

#hyper params
num_epoch = 5
cuda_device = -1
batch_size = 128
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

#model
#Линейный энкодер
class Encoder(nn.Module):
    # 28*28 -> hidden -> out
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.dropout1 = nn.Dropout(0.15)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout2 = nn.Dropout(0.15)
        self.linear3 = nn.Linear(hidden_dim, latent_dim)

        self.activation = nn.Sigmoid()

    def forward(self, x):
        x = self.dropout1(self.activation(self.linear1(x)))
        x = self.dropout2(self.activation(self.linear2(x)))
        x = self.activation(self.linear3(x))

        return x


class Decoder(nn.Module):
    # encoder_out -> hidden -> 28*28
    def __init__(self, latent_dim, hidden_dim, out_dim):
        super().__init__()
        self.linear1 = nn.Linear(latent_dim, hidden_dim)
        self.dropout1 = nn.Dropout(0.15)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout2 = nn.Dropout(0.15)
        self.linear3 = nn.Linear(hidden_dim, out_dim)

        self.activation = nn.Sigmoid()

    def forward(self, x):
        x = self.dropout1(self.activation(self.linear1(x)))
        x = self.dropout2(self.activation(self.linear2(x)))
        x = self.activation(self.linear3(x))

        return x


class ClassicAutoEncoder(nn.Module):
    def __init__(self, input_dim, enc_hidden_dim, dec_hidden_dim, latent_dim):
        super().__init__()
        self.encoder = Encoder(input_dim, enc_hidden_dim, latent_dim)
        self.decoder = Decoder(latent_dim, dec_hidden_dim, input_dim)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x


def collate_fn(data):
    pics = []
    target = []
    for item in data:

        pics.append(numpy.array(item[0]))
        target.append(item[1])
    return {
        'data': torch.from_numpy(numpy.array(pics)).float() / 255,
        'target': torch.from_numpy(numpy.array(target)),
    }

# model
model = ClassicAutoEncoder(28*28, 200, 300, 32)
model.train()
model.to(device)
# result = model(test_tersor)

#optimizer
optim = torch.optim.Adam(model.parameters(), lr=0.001)
#lr scheduler

#dataset
dataset = datasets.MNIST('C:\\Users\\Vampire\\Repos\\NN_reload_stream2', download=True)

#loss
loss_func = nn.MSELoss()

In [50]:
#dataloder
for epoch in range(num_epoch):
    dataloader = DataLoader(
        dataset=dataset,
        collate_fn=collate_fn,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    )
    for step, batch in enumerate(dataloader):
        data = batch['data'].to(device).view(batch['data'].size(0), -1)
        optim.zero_grad()
        predict = model(data)
        loss = loss_func(predict, data)
        loss.backward()
        optim.step()
        if (step % 100 == 0):
            print(loss)
    print(f'epoch: {epoch}')

tensor(0.2406, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0678, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0662, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0670, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0636, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 0
tensor(0.0632, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0615, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0581, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0576, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0530, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 1
tensor(0.0520, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0541, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0495, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0522, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0501, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 2
tensor(0.0455, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0460

In [54]:
with torch.no_grad():
    model.eval()
    test = dataset.data[255].view(1,-1).long() / 255
    test = test.to(device)
    #print(test.device)
    predict = model(test)
    test = test[0].view(28, 28).detach().cpu().numpy()
    print((test*255).astype(int).sum())
   
 #   plt.imshow(test)
 #   plt.show()

33676


In [None]:

plt.imshow(predict[0].view(28, 28).cpu().detach().numpy())
plt.show()

In [23]:

#hyper params
num_epoch = 20
cuda_device = -1
batch_size = 128
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# conv autoencoder
class Encoder(nn.Module):
    # 28*28 -> hidden -> out
    def __init__(self, in_chan, hidden_ch, out_channels):
        super().__init__()
        #conv2d -> maxpool2d -> conv2d -> maxpool2d -> conv2d
        self.conv1 = nn.Conv2d(in_chan, hidden_ch, kernel_size=5, stride=1, padding=2) # 28 x28
        self.pool1 = nn.MaxPool2d(2, 2) # 14 x 14 пулинг размером 2x2 уменьшает размерность в два раза
        self.conv2 = nn.Conv2d(hidden_ch, hidden_ch, kernel_size=3, stride=1, padding=1)  # 14 x 14
        self.pool2 = nn.MaxPool2d(2, 2)  # 7 x 7
        self.conv3 = nn.Conv2d(hidden_ch, out_channels, kernel_size=3, stride=1, padding=1)

        self.activation = nn.Sigmoid()

    def forward(self, x): # -> 7x7
        x = self.activation(self.pool1(self.conv1(x)))
        x = self.activation(self.pool2(self.conv2(x)))
        x = self.activation(self.conv3(x))

        return x
class Decoder(nn.Module):
    #conv2d -> upsampling2d -> conv2d -> upsampling2d -> conv2d
    def __init__(self, in_chan, hidden_ch, out_chan):
        super().__init__()
        self.conv1 = nn.Conv2d(in_chan, hidden_ch, kernel_size=3, stride=1, padding=1)  # 7 x 7
        self.upsample1 = nn.UpsamplingBilinear2d(scale_factor=2)  # 14 x 14
        self.conv2 = nn.Conv2d(hidden_ch, hidden_ch, kernel_size=3, stride=1, padding=1)  # 14 x 14
        self.upsample2 = nn.UpsamplingBilinear2d(scale_factor=2)  # 28 x 28
        self.conv3 = nn.Conv2d(hidden_ch, out_chan, kernel_size=5, stride=1, padding=2)

        self.activation = nn.Sigmoid()

    def forward(self, x): # -> 28 x 28
        x = self.activation(self.upsample1(self.conv1(x)))
        x = self.activation(self.upsample2(self.conv2(x)))
        x = self.activation(self.conv3(x))

        return x


class ConvolutionalAutoEncoder(nn.Module): # Сверточный автоэнкодер
    def __init__(self, input_ch, enc_hidden_ch, dec_hidden_ch, latent_ch):
        super().__init__()
        self.encoder = Encoder(input_ch, enc_hidden_ch, latent_ch)
        self.decoder = Decoder(latent_ch, dec_hidden_ch, input_ch)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x


def collate_fn(data):
    pics = []
    target = []
    for item in data:

        pics.append(numpy.array(item[0]))
        target.append(item[1])
    return {
        'data': torch.from_numpy(numpy.array(pics)).float() / 255,
        'target': torch.from_numpy(numpy.array(target)),
    }


# model


In [26]:
model2 = ConvolutionalAutoEncoder(1, 20, 20, 1)
model2.train()
model2.to(device)


#optimizer
optim = torch.optim.Adam(model2.parameters(), lr=0.001)



dataset = datasets.MNIST('C:\\Users\\Vampire\\Repos\\NN_reload_stream2', download=False)

loss_func = nn.MSELoss()
#dataloder
#предварительный вывод: использование сигмоиды позволяет  обойтись меньшим числом кодируемых каналов
for epoch in range(num_epoch):
    dataloader = DataLoader(
        dataset=dataset,
        collate_fn=collate_fn,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    )
    for step, batch in enumerate(dataloader):
        data = batch['data'].to(device).unsqueeze(1)
        optim.zero_grad()
        predict = model2(data)
        loss = loss_func(predict, data)
        loss.backward()
        optim.step()
        if (step % 100 == 0):
            print(loss)
    print(f'epoch: {epoch}')

tensor(0.3785, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0999, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0865, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0654, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0590, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 0
tensor(0.0567, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0560, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0493, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0424, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0387, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 1
tensor(0.0386, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0350, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0353, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0322, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0307, device='cuda:0', grad_fn=<MseLossBackward0>)
epoch: 2
tensor(0.0297, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0307

In [52]:
#dataset.data[768].unsqueeze(0).unsqueeze(0).float()/255
with torch.no_grad():
    model2.eval()
    test = dataset.data[255].unsqueeze(0).unsqueeze(0).float() / 255
    test = test.to(device)
    #print(test.device)
    predict = model2(test)
    test = test[0].view(28, 28).detach().cpu().numpy()
    print((test*255).astype(int).sum())

33676


In [43]:
num_epoch = 20
cuda_device = -1
batch_size = 256
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

#model


# conv autoencoder
#model
# conv autoencoder
class Encoder(nn.Module):
    # 28*28 -> hidden -> out
    def __init__(self, in_chan, hidden_ch, out_channels):
        super().__init__()
        #conv2d -> maxpool2d -> conv2d -> maxpool2d -> conv2d
        self.conv1 = nn.Conv2d(in_chan, hidden_ch, kernel_size=5, stride=1, padding=2) # 28 x28
        self.pool1 = nn.MaxPool2d(2, 2) # 14 x 14
        self.conv2 = nn.Conv2d(hidden_ch, hidden_ch, kernel_size=3, stride=1, padding=1)  # 14 x 14
        self.pool2 = nn.MaxPool2d(2, 2)  # 7 x 7
        self.conv_mu = nn.Conv2d(hidden_ch, out_channels, kernel_size=3, stride=1, padding=1)
        self.conv_sigma = nn.Conv2d(hidden_ch, out_channels, kernel_size=3, stride=1, padding=1)

        self.activation = nn.Sigmoid()

    def forward(self, x): # -> 7x7
        x = self.activation(self.pool1(self.conv1(x)))
        x = self.activation(self.pool2(self.conv2(x)))
        mu = self.activation(self.conv_mu(x))
        sigma = torch.exp(self.conv_sigma(x))

        return mu, sigma
    
class Decoder(nn.Module):
    #conv2d -> upsampling2d -> conv2d -> upsampling2d -> conv2d
    def __init__(self, in_chan, hidden_ch, out_chan):
        super().__init__()
        self.conv1 = nn.Conv2d(in_chan, hidden_ch, kernel_size=3, stride=1, padding=1)  # 7 x 7
        self.upsample1 = nn.UpsamplingBilinear2d(scale_factor=2)  # 14 x 14
        self.conv2 = nn.Conv2d(hidden_ch, hidden_ch, kernel_size=3, stride=1, padding=1)  # 14 x 14
        self.upsample2 = nn.UpsamplingBilinear2d(scale_factor=2)  # 28 x 28
        self.conv3 = nn.Conv2d(hidden_ch, out_chan, kernel_size=5, stride=1, padding=2)

        self.activation = nn.Sigmoid()

    def forward(self, x): # -> 28 x 28
        x = self.activation(self.upsample1(self.conv1(x)))
        x = self.activation(self.upsample2(self.conv2(x)))
        x = self.activation(self.conv3(x))

        return x


class VarAutoEncoder(nn.Module):
    def __init__(self, input_ch, enc_hidden_ch, dec_hidden_ch, latent_ch):
        super().__init__()
        self.encoder = Encoder(input_ch, enc_hidden_ch, latent_ch)
        self.decoder = Decoder(latent_ch, dec_hidden_ch, input_ch)

    def forward(self, x):
        mu, sigma = self.encoder(x)
        x = sampling(mu, sigma)
        x = self.decoder(x)

        return x, mu, sigma


# sampling
def sampling(mu, sigma):
    return mu + sigma * torch.normal(torch.zeros_like(sigma),
                                     torch.ones_like(sigma))


def kl_loss(mu, sigma):
    p = torch.distributions.Normal(mu, sigma)
    q = torch.distributions.Normal(torch.zeros_like(mu), torch.ones_like(sigma))

    return torch.distributions.kl_divergence(p, q).mean()

def collate_fn(data):
    pics = []
    target = []
    for item in data:

        pics.append(numpy.array(item[0]))
        target.append(item[1])
    return {
        'data': torch.from_numpy(numpy.array(pics)).float() / 255,
        'target': torch.from_numpy(numpy.array(target)),
    }



In [44]:
model3 = VarAutoEncoder(1, 10, 10, 1)
model3.train()
model3.to(device)
# result = model(test_tersor)

#optimizer
optim = torch.optim.Adam(model3.parameters(), lr=0.001)
#lr scheduler

#dataset
dataset = datasets.MNIST('C:\\Users\\Vampire\\Repos\\NN_reload_stream2', download=False)

#
#loss
criterion = nn.MSELoss()
#dataloder

for epoch in range(num_epoch):
    dataloader = DataLoader(
        dataset=dataset,
        collate_fn=collate_fn,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    )
    for step, batch in enumerate(dataloader):
        data = batch['data'].to(device).unsqueeze(1)
        optim.zero_grad()
        predict, mu, sigma = model3(data)
        #loss
        kl = kl_loss(mu, sigma)
        crit_loss = criterion(data, predict)
        #фактически к ошибке применяется штраф зависящий от посчитанной дивергенции
        loss = 0.1 * kl + crit_loss
        loss.backward()
        optim.step()
        if (step % 100 == 0):
            print('kl_loss: {}, criterion_loss: {}'.format(kl.item(), crit_loss.item()))
    print(f'epoch: {epoch}')





kl_loss: 0.13293519616127014, criterion_loss: 0.1522369533777237
kl_loss: 0.0033520690631121397, criterion_loss: 0.0962066799402237
kl_loss: 0.0013229588512331247, criterion_loss: 0.09411144256591797
epoch: 0
kl_loss: 0.00106586585752666, criterion_loss: 0.09338672459125519
kl_loss: 0.0006364218425005674, criterion_loss: 0.08759849518537521
kl_loss: 0.0004244714218657464, criterion_loss: 0.08645078539848328
epoch: 1
kl_loss: 0.00037854776019230485, criterion_loss: 0.08318693935871124
kl_loss: 0.0002758192422334105, criterion_loss: 0.08384966105222702
kl_loss: 0.00021264978568069637, criterion_loss: 0.08185949921607971
epoch: 2
kl_loss: 0.0001960845256689936, criterion_loss: 0.08247560262680054
kl_loss: 0.00016001630865503103, criterion_loss: 0.08144066482782364
kl_loss: 0.0001333380932919681, criterion_loss: 0.07883244007825851
epoch: 3
kl_loss: 0.00012725178385153413, criterion_loss: 0.07908649742603302
kl_loss: 0.00011334777082083747, criterion_loss: 0.08112414181232452
kl_loss: 9.64

In [None]:
with torch.no_grad():
    test = dataset.data[768].float() #.to(device)
    print(test.size())
    #predict = model(test)
    
    #encoded_mu, encoded_sigma = model.encoder(test)
    #hidden = sampling(encoded_mu, encoded_sigma)
    #hidden += torch.ones_like(hidden) * 0.1
    #decoded = model.decoder(hidden)

In [17]:
dataloader.

128

In [53]:
with torch.no_grad():
    model3.eval()
    test = dataset.data[255].unsqueeze(0).unsqueeze(0).float() / 255
    test = test.to(device)
    #print(test.device)
    predict = model3(test)
    test = test[0].view(28, 28).detach().cpu().numpy()
    print((test*255).astype(int).sum())

33676
