In [1]:
# !git clone https://github.com/pfilo8/Conditional-object-generation-using-pre-trained-models-and-plug-in-networks.git

In [None]:
# %cd Conditional-object-generation-using-pre-trained-models-and-plug-in-networks

In [None]:
# !pip install matplotlib nflows sklearn torch torchvision

# Conditional object generation using pre-trained models and plug-in networks
Prowadzący: **Patryk Wielopolski**, Politechnika Wrocławska

Abstrakt: Modele generatywne przyciągnęły uwagę wielu praktyków uczenia maszynowego w ostatnich latach, co zaowocowało modelami takimi jak StyleGAN do generowania ludzkiej twarzy lub PointFlow do generowania chmur punktów 3D. Jednak domyślnie nie możemy kontrolować jego procesu próbkowania, tj. nie możemy wygenerować próbki z określonym zestawem atrybutów. Obecne podejście polega na przekwalifikowaniu modelu z dodatkowymi danymi wejściowymi i inną architekturą, co wymaga czasu i zasobów obliczeniowych.

Podczas tego praktycznego warsztatu omówimy metodę, która pozwala nam generować obiekty o danym zestawie atrybutów bez ponownego uczenia modelu bazowego. W tym celu wykorzystamy modele normalizing flows – Conditional Masked Autoregressive Flow i Conditional Real NVP oraz sieci Plugin, w wyniku których powstaje Flow Plugin Network.


Cel warsztatów:
 * Praktyczne zapoznanie się z modelami Normalizing Flows oraz biblioteką nflows
 * Praktyczne zapoznanie się z metodą Flow Plugin Network

Agenda:
 * Wstęp do modeli generatywnych
 * Praktyczny wstęp do Normalizing Flows z wykorzystaniem pakietu **nflows**.
 * Metoda Flow Plugin Network (FPN)
 * Wykorzystanie metody FPN do warunkowego generowania obrazów

# Wstęp do modeli generatywnych

## Modele dyskryminatywne vs. modele generatywne

**Model dyskryminatywny** modeluje warunkowego prawdopodobieństwo ${P(Y\mid X=x)}$ zmiennej celu Y, biorąc pod uwagę obserwację x.

*Przykłady*: Regresja Logistyczna, Drzewa decyzyjne.

**Model generatywny** modelu rozkład łączny prawdopodobieństwa ${P(X, Y)}$ na danej zmiennej obserwowalnej X i zmiennej docelowej Y.

*Przykłady*:

![](figures/three-generative-models.png)

## Normalizing Flows

 Normalizing Flows reprezentują grupę modeli generatywnych, które można efektywnie trenować poprzez bezpośrednią estymację wiarogodności dzięki zastosowaniu wzoru na zmianę zmiennej. W praktyce wykorzystują one szereg (parametrycznych) funkcji odwracalnych: $\mathbf{y}=\mathbf{f}_n \circ \dots \circ \mathbf{f}_1(\mathbf{z})$. Zakładając, że dany rozkład bazowy $p(\mathbf{z})$ dla $\mathbf{z}$, log likelihood dla $\mathbf{y}$ jest podane przez $\log p(\mathbf{y}) = \ log p(\mathbf{z}) - \sum_{n=1}^N \log \left| \det \frac{\partial \mathbf{f}_n}{\partial \mathbf{z}_{n-1}} \right|$. W praktycznych zastosowaniach $p(\mathbf{y})$ reprezentuje rozkład obserwowalnych danych, a $p(\mathbf{z})$ jest zwykle zakładany jako rozkład normalny z niezależnymi komponentami.



![](figures/normalizing-flow.png)

# Praktyczny wstęp do Normalizing Flows z wykorzystaniem pakietu **nflows**

Omówienie modeli:
  * NICE
  * RealNVP
  * MAF

Omówienie warunkowych modeli:
  * Conditional NICE
  * Conditional RealNVP
  * Conditional MAF

### Konstrukcja biblioteki nflows

Krótkie omówienie biblioteki

  * Distributions - Bazowe rozkłady prawdopodobieństwa
  * Flows - Przykładowe implementacje modeli normalizing flows
  * Nn - Implementacje sieci neuronowych budujących bloki modelu
  * Transforms - Implementacje bloków transformujących w normalazing flows
  * Utils

### Toy example data

In [None]:
import matplotlib.pyplot as plt
import sklearn.datasets as datasets

In [None]:
x, y = datasets.make_moons(128, noise=.1)
plt.scatter(x[:, 0], x[:, 1]);

# NICE (Non-linear Independent Component Estimation)

Model NICE (Non-linear Independent Component Estimation) implementuje model normalizing flows poprzez układanie w stos sekwencji odwracalnych bijektywnych funkcji transformacji. W każdej bijekcji, znanej jako additive coupling layer, wymiary wejściowe są podzielone na dwie części:

 - Pierwsze wymiary pozostają takie same;
 - Druga część, do wymiarów, podlega transformacji addytywnej, tj. dodawany jest komponent przesunięcia.
$$
\begin{cases}
    \mathbf{u}_{1:d} &= \mathbf{x}_{1:d} \\
    \mathbf{u}_{d+1:D} &= \mathbf{x}_{d+1:D} + \mu(\mathbf{x}_{1:d})
    \end{cases}
    \Leftrightarrow
    \begin{cases}
    \mathbf{x}_{1:d} &= \mathbf{u}_{1:d} \\
    \mathbf{x}_{d+1:D} &= \mathbf{u}_{d+1:D} - \mu(\mathbf{u}_{1:d})
\end{cases}
$$

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F

from nflows.distributions.normal import StandardNormal
from nflows.flows.base import Flow
from nflows.nn import nets as nets
from nflows.transforms.base import CompositeTransform
from nflows.transforms.coupling import AdditiveCouplingTransform
from nflows.transforms.normalization import BatchNorm

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'


class NICE(Flow):
    def __init__(
            self,
            features,
            hidden_features,
            num_layers=2,
            num_blocks_per_layer=2,
            activation=F.relu,
            dropout_probability=0.0,
            batch_norm_within_layers=False,
            batch_norm_between_layers=False,
    ):

        mask = torch.ones(features)
        mask[::2] = -1

        def create_resnet(in_features, out_features):
            return nets.ResidualNet(
                in_features,
                out_features,
                hidden_features=hidden_features,
                num_blocks=num_blocks_per_layer,
                activation=activation,
                dropout_probability=dropout_probability,
                use_batch_norm=batch_norm_within_layers,
            )

        layers = []
        for _ in range(num_layers):
            transform = AdditiveCouplingTransform(mask=mask, transform_net_create_fn=create_resnet)
            layers.append(transform)
            mask *= -1
            if batch_norm_between_layers:
                layers.append(BatchNorm(features=features))

        super().__init__(
            transform=CompositeTransform(layers),
            distribution=StandardNormal([features]),
        )


In [None]:
nice = NICE(features=2, hidden_features=4, num_layers=1, num_blocks_per_layer=1)

In [None]:
# Sprawdźmy komponenty modelu
nice

In [None]:
nice = NICE(features=2, hidden_features=4, num_layers=4, num_blocks_per_layer=2)
nice_opt = optim.Adam(nice.parameters())

In [None]:
def train(model, opt, num_iter=5000, iter_log=5000):
    for i in range(num_iter):
        x, y = datasets.make_moons(128, noise=.1)
        x = torch.tensor(x, dtype=torch.float32)
        opt.zero_grad()
        loss = -model.log_prob(inputs=x).mean()
        loss.backward()
        opt.step()

        if (i + 1) % iter_log == 0:
            xline = torch.linspace(-1.5, 2.5, 100)
            yline = torch.linspace(-.75, 1.25, 100)
            xgrid, ygrid = torch.meshgrid(xline, yline)
            xyinput = torch.cat([xgrid.reshape(-1, 1), ygrid.reshape(-1, 1)], dim=1)

            with torch.no_grad():
                zgrid = model.log_prob(xyinput).exp().reshape(100, 100)

            plt.contourf(xgrid.numpy(), ygrid.numpy(), zgrid.numpy())
            plt.title('iteration {}'.format(i + 1))
            plt.show()

In [None]:
train(nice, nice_opt, num_iter=10000, iter_log=1000)

# Ćwiczenie (5 min.)

Modyfikując hiperparametry modelu wytrenuj model, który lepiej odwzoruje prawdziwy rozkład danych.

In [None]:
# TODO: Your code goes here.

### Badanie podstawowych funkcjonalności modeli
#### Samplowanie

In [None]:
x, y = datasets.make_moons(128, noise=.1)
x = torch.tensor(x, dtype=torch.float32)

In [None]:
samples = nice.sample(1000).detach().numpy()

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(12, 4))

ax[0].scatter(x[:, 0], x[:, 1])
ax[0].set_title('Data')

ax[1].scatter(samples[:, 0], samples[:, 1])
ax[1].set_title('Samples from NICE')

plt.xlim(-1.5, 2.5)
plt.ylim(-1, 1.5)
plt.show()

# RealNVP

Model RealNVP (Real-valued Non-Volume Preserving) jest rozszerzeniem modelu NICE, które opiera się na warstwie znanej jako Affine Coupling Layer. Analogicznie do modelu NICE, wymiary wejściowe są podzielone na dwie części:

 - Pierwsze wymiary pozostają takie same;
 - Druga część, do wymiarów, podlega transformacji afinicznej („scale-and-shift”) i zarówno parametry skali, jak i przesunięcia są funkcjami pierwszych wymiarów.

$$
\begin{cases}
    \mathbf{u}_{1:d} &= \mathbf{x}_{1:d} \\
    \mathbf{u}_{d+1:D} &= \mathbf{x}_{d+1:D} \odot \exp{(\sigma{(\mathbf{x}_{1:d})})} + \mu(\mathbf{x}_{1:d})
    \end{cases}
    \Leftrightarrow
    \begin{cases}
    \mathbf{x}_{1:d} &= \mathbf{u}_{1:d} \\
    \mathbf{x}_{d+1:D} &= (\mathbf{u}_{d+1:D} - \mu(\mathbf{u}_{1:d})) \odot \exp{(-\sigma{(\mathbf{u}_{1:d})})}
\end{cases}
$$


# Ćwiczenie (5 min.)

Na podstawie powyższego kodu modelu NICE oraz biblioteki nflows zaimplementuj model RealNVP.

In [None]:
# HINT: from nflows.transforms import ...


class RealNVP(Flow):
    def __init__(
            self,
            features,
            hidden_features,
            num_layers=2,
            num_blocks_per_layer=2,
            activation=F.relu,
            dropout_probability=0.0,
            batch_norm_within_layers=False,
            batch_norm_between_layers=False,
    ):

        mask = torch.ones(features)
        mask[::2] = -1

        def create_resnet(in_features, out_features):
            return nets.ResidualNet(
                in_features,
                out_features,
                hidden_features=hidden_features,
                num_blocks=num_blocks_per_layer,
                activation=activation,
                dropout_probability=dropout_probability,
                use_batch_norm=batch_norm_within_layers,
            )

        layers = []
        for _ in range(num_layers):
            transform = AdditiveCouplingTransform(mask=mask, transform_net_create_fn=create_resnet)
            layers.append(transform)
            mask *= -1
            if batch_norm_between_layers:
                layers.append(BatchNorm(features=features))

        super().__init__(
            transform=CompositeTransform(layers),
            distribution=StandardNormal([features]),
        )

In [None]:
real_nvp = RealNVP(features=2, hidden_features=4, num_layers=2, num_blocks_per_layer=2)
real_nvp_opt = optim.Adam(real_nvp.parameters())

In [None]:
train(real_nvp, real_nvp_opt)

### Dalsze badanie podstawowych funkcjonalności modeli
#### Transformacje

In [None]:
prior_samples = real_nvp._distribution.sample(1000)

In [None]:
prior_samples_numpy = prior_samples.detach().numpy()

plt.scatter(prior_samples_numpy[:, 0], prior_samples_numpy[:, 1])
plt.title('Próbki z rozkładu bazowego')
plt.show()

In [None]:
nf_blocks = real_nvp._transform._transforms
nf_blocks

In [None]:
for transform in nf_blocks:
    print(transform)

In [None]:
consequitive_samples = [prior_samples]
current_sample = prior_samples

for transform in nf_blocks:
    current_sample, _ = transform(current_sample)
    consequitive_samples.append(current_sample)

In [None]:
for s in consequitive_samples:
    s = s.detach().numpy()
    plt.scatter(s[:, 0], s[:, 1])
    plt.show()

# Masked Autoregressive Flow (MAF)

Masked Autoregressive Flow to przykład modelu Normalizing Flows, który wykorzystuje połączenie z modelami autoregresyjnymi. W praktyce oznacza to, że używa reguły łańcucha do rozłożenia dowolnej gęstości prawdopodobieństwa $p(\mathbf{x})$ na iloczyn jednowymiarowych rozkładów warunkowych jako:

$$p(\mathbf{x}) = \Pi^{N}_{i=1} p(x_i|\mathbf{x}_{1:i-1}) \textit{.}$$

W przypadku modelu MAF $p(x_i|\mathbf{x}_{1:i-1})$ jest modelowane jako:
$$p(x_i|\mathbf{x}_{1:i-1}) = \mathcal{N}(x_i|\mu_i, (\exp{(\alpha_i)})^2 \textit{,}$$
gdzie $\mu_i = f_{\mu_i}(\mathbf{x_{1:i-1}})$ i $\alpha_i = f_{\alpha_i}(\mathbf{x_{1:i-1}})$


*Przykład*: $p(x_1, x_2) = p(x_1)p(x_2|x_1)$, gdzie $p(x_1) = \mathcal{N}(x_1|0, 4)$ i $p(x_2|x1) = \mathcal{N}(x_2|x_1^2, 1)$.

In [None]:
from nflows.transforms.autoregressive import MaskedAffineAutoregressiveTransform
from nflows.transforms.permutations import RandomPermutation, ReversePermutation


class MaskedAutoregressiveFlow(Flow):
    def __init__(
            self,
            features,
            hidden_features,
            num_layers=2,
            num_blocks_per_layer=2,
            use_residual_blocks=True,
            use_random_masks=False,
            use_random_permutations=False,
            activation=F.relu,
            dropout_probability=0.0,
            batch_norm_within_layers=False,
            batch_norm_between_layers=False,
    ):

        if use_random_permutations:
            permutation_constructor = RandomPermutation
        else:
            permutation_constructor = ReversePermutation

        layers = []
        for _ in range(num_layers):
            layers.append(permutation_constructor(features))
            layers.append(
                MaskedAffineAutoregressiveTransform(
                    features=features,
                    hidden_features=hidden_features,
                    num_blocks=num_blocks_per_layer,
                    use_residual_blocks=use_residual_blocks,
                    random_mask=use_random_masks,
                    activation=activation,
                    dropout_probability=dropout_probability,
                    use_batch_norm=batch_norm_within_layers,
                )
            )
            if batch_norm_between_layers:
                layers.append(BatchNorm(features))

        super().__init__(
            transform=CompositeTransform(layers),
            distribution=StandardNormal([features]),
        )


In [None]:
maf = MaskedAutoregressiveFlow(features=2, hidden_features=16, num_layers=4)
maf_opt = optim.Adam(maf.parameters())

In [None]:
train(maf, maf_opt)

# Ćwiczenie (5 min.)

Zweryfikuj jak wyglądają kolejne transformacje modelu MAF analogicznie do przykładu RealNVP.

In [None]:
# TODO: Your code goes here.

# Przerwa (10 min.)

# Conditional NICE

$$
\begin{cases}
    \mathbf{u}_{1:d} &= \mathbf{x}_{1:d} \\
    \mathbf{u}_{d+1:D} &= \mathbf{x}_{d+1:D} + \mu(\mathbf{x}_{1:d}, \mathbf{c})
    \end{cases}
    \Leftrightarrow
    \begin{cases}
    \mathbf{x}_{1:d} &= \mathbf{u}_{1:d} \\
    \mathbf{x}_{d+1:D} &= \mathbf{u}_{d+1:D} - \mu(\mathbf{u}_{1:d}, \mathbf{c})
\end{cases}
$$

In [None]:
class cNICE(Flow):
    def __init__(
            self,
            features,
            hidden_features,
            context_features,  # New component
            num_layers=2,
            num_blocks_per_layer=2,
            activation=F.relu,
            dropout_probability=0.0,
            batch_norm_within_layers=False,
            batch_norm_between_layers=False,
    ):

        mask = torch.ones(features)
        mask[::2] = -1

        def create_resnet(in_features, out_features):
            return nets.ResidualNet(
                in_features,
                out_features,
                hidden_features=hidden_features,
                context_features=context_features,  # New component
                num_blocks=num_blocks_per_layer,
                activation=activation,
                dropout_probability=dropout_probability,
                use_batch_norm=batch_norm_within_layers,
            )

        layers = []
        for _ in range(num_layers):
            transform = AdditiveCouplingTransform(mask=mask, transform_net_create_fn=create_resnet)
            layers.append(transform)
            mask *= -1
            if batch_norm_between_layers:
                layers.append(BatchNorm(features=features))

        super().__init__(
            transform=CompositeTransform(layers),
            distribution=StandardNormal([features]),
        )


In [None]:
c_nice = cNICE(features=2, hidden_features=8, num_layers=4, context_features=1)
c_nice_opt = optim.Adam(c_nice.parameters())

In [None]:
def train_conditional(model, opt, num_iter=5000, iter_log=5000):
    for i in range(num_iter):
        x, y = datasets.make_moons(128, noise=.1)
        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)  # New
        opt.zero_grad()
        loss = -model.log_prob(inputs=x, context=y).mean()  # New
        loss.backward()
        opt.step()

        if (i + 1) % iter_log == 0:
            fig, ax = plt.subplots(1, 2, figsize=(12, 8))
            xline = torch.linspace(-1.5, 2.5, 100)
            yline = torch.linspace(-.75, 1.25, 100)
            xgrid, ygrid = torch.meshgrid(xline, yline)
            xyinput = torch.cat([xgrid.reshape(-1, 1), ygrid.reshape(-1, 1)], dim=1)

            with torch.no_grad():
                zgrid0 = model.log_prob(xyinput, torch.zeros(10000, 1)).exp().reshape(100, 100)
                zgrid1 = model.log_prob(xyinput, torch.ones(10000, 1)).exp().reshape(100, 100)

            ax[0].contourf(xgrid.numpy(), ygrid.numpy(), zgrid0.numpy())
            ax[1].contourf(xgrid.numpy(), ygrid.numpy(), zgrid1.numpy())
            fig.suptitle('iteration {}'.format(i + 1))
            plt.show()

In [None]:
train_conditional(c_nice, c_nice_opt)

# Ćwiczenie (5 min.)

Wygeneruj próbki z modelu Conditional NICE dla górnego oraz dolnego półksiężyca

In [None]:
# TODO: Your code goes here.

# Ćwiczenie (10 min.)

Stwórz model Conditional RealNVP oraz model Conditional MAF analogicznie do poprzedniego przykładu. Wytrenuj przykładowe modele.

# Conditional RealNVP

$$
\begin{cases}
    \mathbf{u}_{1:d} &= \mathbf{x}_{1:d} \\
    \mathbf{u}_{d+1:D} &= \mathbf{x}_{d+1:D} \odot \exp{(\sigma{(\mathbf{x}_{1:d}, \mathbf{c})})} + \mu(\mathbf{x}_{1:d}, \mathbf{c})
    \end{cases}
    \Leftrightarrow
    \begin{cases}
    \mathbf{x}_{1:d} &= \mathbf{u}_{1:d} \\
    \mathbf{x}_{d+1:D} &= (\mathbf{u}_{d+1:D} - \mu(\mathbf{u}_{1:d}, \mathbf{c})) \odot \exp{(-\sigma{(\mathbf{u}_{1:d}, \mathbf{c}))}}
\end{cases}
$$

In [None]:
# TODO: Your code goes here.

# Conditional Masked Autoregressive Flow (MAF)

$$p(\mathbf{x} | \mathbf{c}) = \Pi^{N}_{i=1} p(x_i|\mathbf{x}_{1:i-1}, \mathbf{c}) \textit{,}$$
gdzie
$$p(x_i|\mathbf{x}_{1:i-1}, \mathbf{c}) = \mathcal{N}(x_i|\mu_i, (\exp{(\alpha_i)})^2 \textit{,}$$
gdzie $\mu_i = f_{\mu_i}(\mathbf{x_{1:i-1}}, \mathbf{c})$ i $\alpha_i = f_{\alpha_i}(\mathbf{x_{1:i-1}}, \mathbf{c})$


In [None]:
# TODO: Your code goes here.

## Metoda Flow Plugin Network (FPN)

### Koncepcja Plugin Network

![Plugin Network](figures/plugin_koperski.png)

Pomysł:
 - Rozszerzenie istniejącej sieci neuronowej bez dodatkowego treningu, np. gdy pojawią się dane z dodatkową informację (eng. partial evidence)

Założenia koncepcji:
 - Nie przetrenowujemy oryginalnej sieci lecz jedynie trenujmy komponent plugin
 - Czas predykcji modelu jest jedynie nieznacznie większy

### Koncepcja Flow Plugin Network

 - Rozszerzenie Plugin Networku do modeli generatywnych
 - Wykorzystuje jako model bazowy - VAE
 - Wykorzystuje Conditional Normalizing Flows
 - Nie przetrenowujemy modelu bazowego


![](figures/schema-general.png)

## Trening modelu

### Kodowanie zbioru treningowego do reprezentacji ukrytej
![](figures/schema-training-encode.png)

### Trenowanie modelu Normalizing Flows
![](figures/schema-training-train.png)

## Generowanie próbek

![](figures/schema-sampling.png)

## Manipulowanie cechami obiektu

### Kodowanie obiektu
![](figures/schema-image-manipulation-encoding.png)

### Dekodowanie obiektu
![](figures/schema-image-manipulation-decode.png)

# Wykorzystanie metody FPN do warunkowego generowania obrazów

## Sample z modelu

In [None]:
# Based on: https://github.com/lyeoni/pytorch-mnist-VAE

import torch
import torch.nn as nn
import torch.nn.functional as F


class VAE(nn.Module):
    def __init__(self, x_dim, h_dim1, h_dim2, h_dim3, z_dim):
        super(VAE, self).__init__()
        self.x_dim = x_dim
        # encoder part
        self.e_fc1 = nn.Linear(x_dim, h_dim1)
        self.e_fc2 = nn.Linear(h_dim1, h_dim2)
        self.e_fc3 = nn.Linear(h_dim2, h_dim3)
        self.fc_mu = nn.Linear(h_dim3, z_dim)
        self.fc_logvar = nn.Linear(h_dim3, z_dim)
        # decoder part
        self.d_fc1 = nn.Linear(z_dim, h_dim3)
        self.d_fc2 = nn.Linear(h_dim3, h_dim2)
        self.d_fc3 = nn.Linear(h_dim2, h_dim1)
        self.d_fc4 = nn.Linear(h_dim1, x_dim)

    def encoder(self, x):
        h = F.relu(self.e_fc1(x))
        h = F.relu(self.e_fc2(h))
        h = F.relu(self.e_fc3(h))
        return self.fc_mu(h), self.fc_logvar(h)  # mu, log_var

    def sampling(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu)  # return z sample

    def decoder(self, z):
        h = F.relu(self.d_fc1(z))
        h = F.relu(self.d_fc2(h))
        h = F.relu(self.d_fc3(h))
        return torch.sigmoid(self.d_fc4(h))

    def forward(self, x):
        mu, log_var = self.encoder(x.view(-1, self.x_dim))
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var


In [None]:
model = VAE(x_dim=784, h_dim1=512, h_dim2=256, h_dim3=128, z_dim=40)
model.load_state_dict('models/VAE.pkt')
model.eval()

In [None]:
model

In [None]:
def sample_vae(vae, z_dim, n_samples=64):
    with torch.no_grad():
        z = torch.randn(n_samples, z_dim).to(DEVICE)
        return vae.decoder(z).to(DEVICE)

In [None]:
samples = sample_vae(model, z_dim=40, n_samples=64)

In [None]:
from torchvision.utils import save_image

save_image(samples.view(64, 1, 28, 28), f'results/samples_vae.png')

![](results/samples_vae.png)

## Kodowanie danych do latent space'u

In [None]:
import torchvision

batch_size = 500

train_dataset = torchvision.datasets.MNIST(root='data', train=True, transform=torchvision.transforms.ToTensor(), download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)

test_dataset = torchvision.datasets.MNIST(root='data', train=False, transform=torchvision.transforms.ToTensor(), download=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
import pandas as pd

def get_latent_space(model, loader):
    zs, ys = [], []

    for x, y in loader:
        x = x.view(-1, 28*28)
        zs.append(model.sampling(*model.encoder(x)))
        ys.append(y)
    zs, ys = torch.cat(zs).detach().numpy(), torch.cat(ys).detach().numpy()

    latent_space = pd.DataFrame(zs)
    labels = pd.DataFrame({'y': ys})
    return latent_space, labels

In [None]:
train_latent_space, train_labels = get_latent_space(model, train_loader)
test_latent_space, test_labels = get_latent_space(model, test_loader)

In [None]:
train_latent_space

# Ćwiczenie (5 min.)

Zwizualizuj przestrzeń ukrytą z wykorzystaniem PCA, gdzie kolor będzie oznaczać klasę obiektu.

In [None]:
# TODO: Your code goes here.

## Trenowanie modelu

In [None]:
train_labels_ohe = pd.get_dummies(train_labels['y'].astype(str))

In [None]:
flow = cMaskedAutoregressiveFlow(features=40, hidden_features=40, context_features=10, num_layers=5, num_blocks_per_layer=3)
opt = optim.Adam(flow.parameters())

batch_size = 1000
num_epochs = 10

for i in range(num_epochs * (len(train_latent_space) // batch_size)):
    x = train_latent_space.sample(batch_size)
    y = train_labels_ohe.loc[x.index]

    x = torch.tensor(x.values, dtype=torch.float32)
    y = torch.tensor(y.values, dtype=torch.float32)

    opt.zero_grad()
    loss = -flow.log_prob(inputs=x, context=y).mean()
    print(loss.item())
    loss.backward()
    opt.step()

# Sprawdzamy skuteczność modelu - samplowanie wybranej klasy

In [None]:
num_samples = 64

samples_latent_space_2 = flow.sample(num_samples, context=torch.Tensor([[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]))
samples_latent_space_2.shape

In [None]:
samples_2 = model.decoder(samples_latent_space_2.squeeze(0))

In [None]:
samples_2.shape

In [None]:
save_image(samples_2.view(num_samples, 1, 28, 28), f'results/samples_flow_2.png')

![](results/samples_flow_2.png)

# Ćwiczenie (5 min.)

Wygeneruj próbki dla innych klas.

# Inne metody

## PluGeN

### Intuicja

![](figures/PluGeN_a.png)
![](figures/PluGeN_b.png)

### Metoda

![](figures/PluGeN_schema.png)

## StyleFlow

### Możliwości

![](figures/styleflow_teaser.png)

### Metoda

![](figures/styleflow_schema.png)