<a href="https://colab.research.google.com/github/takatakamanbou/AdvML/blob/2024/AdvML2024_reportD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AdvML reportD

<img width=72 src="https://www-tlab.math.ryukoku.ac.jp/~takataka/course/AdvML/AdvML-logo.png"> [この授業のウェブページ](https://www-tlab.math.ryukoku.ac.jp/wiki/?AdvML)




----
## 準備
----


今回の notebook で行う実験の中には，それなりに実行時間が長くなるものもある．GPU を利用した方が短い時間で済ませられるので，次のようにランタイムのタイプを変更しよう．

1. Colab のメニューから「ランタイム」>「ランタイムのタイプを変更」>「CPU」に代えて「T4 GPU」を選択して「保存」
1. ランタイムが初期化されるので，必要なら全てのコードセルを最初から実行し直す


In [None]:
# 準備あれこれ
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn
seaborn.set()

# scikit-learn のいろいろ
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# NumPy の 疑似乱数生成器（rng = random number generator）
from numpy.random import default_rng
rng = default_rng() # 疑似乱数生成器を初期化

# PyTorch 関係のほげ
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
import torchsummary

次のコードセルを実行して，`cuda` と表示されれば， PyTorch で GPU が使える状態になっている．`cpu` と表示される場合は，GPU が使えるようになっておらず，PyTorch のプログラムは CPU で実行される．


In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

次のセルを実行すると，いつもの MNIST 手書き数字画像データセットを読み込む．
本来のデータセットは，学習データ6万枚，テストデータ1万枚から成るが，ここでは学習データの一部2万枚を取り出し，そのうち1.6万枚を学習用に，残り4千枚をテスト用とする．

In [None]:
# MNIST データセットの入手
Xraw, yraw = fetch_openml('mnist_784', version=1, parser='auto', return_X_y=True, as_frame=False)
Xall = Xraw[:20000] / 255.0     # 画素値が [0, 255] の整数値なので [0, 1] の浮動小数点数値に変換
yall = yraw[:20000].astype(int) # クラスラベル．0 から 9 の整数値

# 学習データとテストデータの分割
XL, XT, yL, yT = train_test_split(Xall, yall, test_size=4000, random_state=4649, stratify=yall)
print(XL.shape, yL.shape)
print(XT.shape, yT.shape)
NL, D = XL.shape
NT = len(XT)

#K = 10

# 平均を引いたデータを用意
Xm = np.mean(XL, axis=0)
XL2 = XL - Xm
XT2 = XT - Xm

---
## PCA, AE, VAEによる手書き数字画像の再構成と生成
---

---
### PCAの実験

#### 学習

次のセルを実行すると，PCAの学習，すなわち，学習データの分散共分散行列の固有値と固有ベクトルを求める計算を行う．

In [None]:
# 平均を引いたデータ行列をSVD
_, sva, Vt = np.linalg.svd(XL2, full_matrices=False)
eva = sva**2/len(XL2)  # 分散共分散行列の固有値
U = Vt                 # 同固有ベクトルをならべた行列

# 固有値の大きい方 10 個の値
print(eva[:10])

次のセルを実行すると，固有値や累積寄与率をグラフに描く．

In [None]:
### 固有値と累積寄与率のグラフ
#
cumeva = np.cumsum(eva)
cumeva /= cumeva[-1]

# グラフ
fig = plt.figure(figsize=(10, 6))
# 固有値
ax1 = fig.add_subplot(221)
ax1.plot(np.arange(len(eva))+1, eva, label='eigenvalues')
ax1.axhline(0, color='gray')
ax1.legend()
# 固有値（次元数の範囲を限定）
ax2 = fig.add_subplot(222)
ax2.plot(np.arange(len(eva))+1, eva, '.', label='eigenvalues')
ax2.axhline(0, color='gray')
ax2.set_xlim(-5, 100)
ax2.legend()
# 累積寄与率
ax3 = fig.add_subplot(223)
ax3.plot(np.arange(len(eva))+1, cumeva, label='cumulative contribution rate')
ax3.axhline(1, color='gray')
ax3.axhline(0.9, color='gray', linestyle='--')
ax3.axhline(0.8, color='gray', linestyle='--')
ax3.legend()
# 累積寄与率（次元数の範囲を限定）
ax4 = fig.add_subplot(224)
ax4.plot(np.arange(len(eva))+1, cumeva, '.', label='cumulative contribution rate')
ax4.axhline(1, color='gray')
ax4.axhline(0.9, color='gray', linestyle='--')
ax4.axhline(0.8, color='gray', linestyle='--')
ax4.set_xlim(-5, 100)
ax4.legend()
fig.tight_layout()
plt.show()

#### 再構成

学習データとテストデータを次元削減してから再構成する．削減後の次元数を変えながら再構成誤差を求める．
`L` と付いているのが学習データの値，`T` と付いているのがテストデータの値．

In [None]:
### 次元数を変えながら学習データ/テストデータを再構成して二乗誤差を計算

# データを変換
ZL = XL2 @ U.T
ZT = XT2 @ U.T

# 次元数 H の設定
HList = np.hstack((
    [0], np.arange(1, 30), np.arange(30, 50, 5),
    np.arange(50, 100, 10), np.arange(100, 500, 50),  [D]
))

# 平均二乗誤差の配列
mseL = np.zeros(len(HList))
mseT = np.zeros(len(HList))

print('H    mseL    mseT')
for i, H in enumerate(HList):
    # H次元で再構成
    XXL = ZL[:, :H] @ U[:H, :]
    XXT = ZT[:, :H] @ U[:H, :]
    # 元画像と再構成との間の平均二乗誤差
    mseL[i] = np.mean((XL2 - XXL)**2)
    mseT[i] = np.mean((XT2 - XXT)**2)
    print(f'{H} {mseL[i]:.6f}   {mseT[i]:.6f}')

# 二乗誤差をグラフに描く
fig = plt.figure(figsize=(6, 4))
ax1 = fig.add_subplot(111)
ax1.plot(HList, mseL, '.', label='reconstruction error (L)')
ax1.plot(HList, mseT, '.', label='reconstruction error (T)')
ax1.axhline(0, color='gray')
ax1.set_xlim(-5, 105)
ax1.set_ylim(0, 0.05)
ax1.legend()
plt.show()

削減後の次元数を `H = 30` として，学習データとテストデータの再構成誤差を求めるとともに，テストデータの一部を再構成した画像を表示する．

In [None]:
# 次元数
H = 30

# データを H 次元に次元削減
ZL = XL2 @ U[:H, :].T
ZT = XT2 @ U[:H, :].T

# 再構成
XXL = ZL @ U[:H, :] + Xm
XXT = ZT @ U[:H, :] + Xm

# 再構成誤差
mseL = np.mean((XL - XXL)**2)
mseT = np.mean((XT - XXT)**2)
print(f'H = {H}')
print(f'reconstruction error (L) = {mseL:.6f}')
print(f'reconstruction error (T) = {mseT:.6f}')

# 再構成したテストデータの最初の15枚を可視化
ncol = 15
fig, ax = plt.subplots(2, ncol, figsize=(8.0, 8.0/ncol*2.5))

# 元画像
for j in range(ncol):
    img = XT[j, ::].reshape((28, 28))
    ax[0, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[0, j].axis('off')

# 再構成した画像
for j in range(ncol):
    img = XXT[j, ::].reshape((28, 28))
    ax[1, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[1, j].axis('off')

fig.tight_layout()
plt.show()

#### 生成

`H` 次元への変換によって得られる変数が正規分布に従うと仮定して，学習データを用いてそのパラメータ（平均と分散共分散行列）を推定する．そのパラメータを用いて正規分布の乱数を生成し，それを再構成した画像を描く．

In [None]:
# 生成
mu = np.zeros(H)
cov = np.diag(eva[:H])
Z = rng.multivariate_normal(np.zeros(H), cov, 50)
XX = Z @ U[:H, :] + Xm

# 生成した画像50枚を可視化
nrow, ncol = 5, 10
fig, ax = plt.subplots(nrow, ncol, figsize=(8.0, 8.0/ncol*nrow))
for i in range(nrow):
    for j in range(ncol):
        img = XX[i*ncol + j, ::].reshape((28, 28))
        ax[i, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
        ax[i, j].axis('off')

fig.tight_layout()
plt.show()

---
### 全結合型AEの実験

オートエンコーダを用いてPCAと同様の実験を行う．


#### 準備

クラスと関数をいくつか．このセクションのあとの別の実験でも利用する．

In [None]:
# データを扱うためのクラス
#
class MMDataset(Dataset):

    def __init__(self, dataX, nrow=28, ncol=28, flatten=True):
        assert dataX.ndim == 2 and dataX.shape[1] == nrow*ncol
        if flatten:
            self.X = dataX
        else:
            self.X = dataX.reshape((-1, 1, nrow, ncol))
        print(self.X.shape)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X = torch.tensor(self.X[idx], dtype=torch.float32)
        return X

In [None]:
# 学習の関数
#
def trainAE(model, optimizer, dl):
    loss_sum = 0.0
    n = 0
    for i, X in enumerate(dl):
        X = X.to(device)
        Xt = model(X)         # 一つのバッチ X を入力して出力 Xt を計算
        loss = model.loss(Xt, X) # 入力 X を正解として loss を計算
        optimizer.zero_grad()   # 勾配をリセット
        loss.backward()         # 誤差逆伝播でパラメータ更新量を計算
        optimizer.step()         # パラメータを更新
        n += len(X)
        loss_sum += loss.item()  # 損失関数の値

    return loss_sum/n

# 損失関数の値を求める関数
#
@torch.no_grad()
def evaluateAE(model, dl):
    loss_sum = 0.0
    n = 0
    for i, X in enumerate(dl):
        X = X.to(device)
        Xt = model(X)           # 一つのバッチ X を入力して出力 Xt を計算
        loss = model.loss(Xt, X)  # 入力 X を正解として loss を計算
        n += len(X)
        loss_sum += loss.item() # 損失関数の値

    return loss_sum/n

#### ネットワーク構造の定義

このセクションでは，次の構成の階層型ニューラルネットでオートエンコーダを構成する．
全ての層が全結合層でできているので，ここではこれを全結合型AEと呼ぶことにする．

- エンコーダ: 入力 784 次元 → 隠れ層1（ニューロン数 `dimHidden`，非線形） → 隠れ層2（隠れ層1と同様） → 出力層（ニューロン数 `H`，線形）
- デコーダ: 入力 `H` 次元 → 隠れ層1（ニューロン数 `dimHidden`，非線形） → 隠れ層2（隠れ層1と同様） → 出力層（ニューロン数 784，線形）

In [None]:
# 全結合型オートエンコーダ
#
class AutoEncoder(nn.Module):

    def __init__(self, dimX, dimHidden, dimZ):
        super(AutoEncoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(dimX, dimHidden), nn.LeakyReLU(),
            nn.Linear(dimHidden, dimHidden), nn.LeakyReLU(),
            nn.Linear(dimHidden, dimZ),
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(dimZ, dimHidden), nn.LeakyReLU(),
            nn.Linear(dimHidden, dimHidden), nn.LeakyReLU(),
            nn.Linear(dimHidden, dimX),
        )

    def forward(self, X):
        Z = self.encoder(X)
        Xt = self.decoder(Z)
        return Xt

    def loss(self, Xt, X):
        return F.mse_loss(Xt, X, reduction='sum')

#### 学習

次のセルは，`dimHidden = 200, H = 30` の全結合型AEの学習を行う．

In [None]:
# データ読み込みの仕組みを作る
dsL = MMDataset(XL2, flatten=True)
dsT = MMDataset(XT2, flatten=True)
dlL = DataLoader(dsL, batch_size=100, shuffle=True)
dlT = DataLoader(dsT, batch_size=100, shuffle=False)

# ネットワークモデルの定義
H = 30
dimHidden = 200
nae = AutoEncoder(D, dimHidden, H).to(device)

# パラメータ最適化器の設定
optimizer = torch.optim.Adam(nae.parameters(), lr=1e-3)

# 学習の繰り返し回数
nepoch = 100

# ネットワークの構造を表示
torchsummary.summary(nae, (1, D))

# 学習
L = []
print(f'学習データ数: {len(dsL)}  テストデータ数: {len(dsT)}')
print()
print('# epoch  lossL  lossT')
for t in range(1, nepoch+1):
    lossL = trainAE(nae, optimizer, dlL) / D
    lossT = evaluateAE(nae, dlT) / D
    L.append([t, lossL, lossT])
    if (t < 10) or (t % 10 == 0):
        print(f'{t}   {lossL:.5f}   {lossT:.5f}')

# 学習曲線の表示
data = np.array(L)
fig, ax = plt.subplots(1, 1)
ax.plot(data[:, 0], data[:, 1], '.-', label='loss for training data')
ax.plot(data[:, 0], data[:, 2], '.-', label='loss for test data')
ax.axhline(0.0, color='gray')
ax.legend()
ax.set_title(f'loss')
plt.show()

#### 再構成

PCA と同様に再構成誤差を求め，一部のデータを再構成した結果を表示する．

In [None]:
# 学習後の再構成誤差
mseL = evaluateAE(nae, dlL) / D
mseT = evaluateAE(nae, dlT) / D

print(f'H = {H}')
print(f'reconstruction error (L) = {mseL:.6f}')
print(f'reconstruction error (T) = {mseT:.6f}')

# テストデータ1バッチ分の再構成
for i, X in enumerate(dlT):
    X = X.to(device)
    Xt = nae(X)
    break
XX    = X.to('cpu').detach().numpy() + Xm
XXrec = Xt.to('cpu').detach().numpy() + Xm

# 再構成したテストデータの最初の10枚を可視化
ncol = 15
fig, ax = plt.subplots(2, ncol, figsize=(8.0, 8.0/ncol*2.5))

# 元画像
for j in range(ncol):
    img = XX[j, ::].reshape((28, 28))
    ax[0, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[0, j].axis('off')

# 再構成した画像
for j in range(ncol):
    img = XXrec[j, ::].reshape((28, 28))
    ax[1, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[1, j].axis('off')

fig.tight_layout()
plt.show()

#### 生成

エンコーダの出力（`H` 次元）が正規分布に従うと仮定して，学習データを用いてそのパラメータ（平均と分散共分散行列）を推定する．そのパラメータを用いて正規分布の乱数を生成し，それをデコーダに入力して再構成した画像を描く．

In [None]:
##### 生成

# 学習データの特徴量を求める
Z = np.empty((len(dsL), H))
n = 0
for i, X in enumerate(dlL):
    X = X.to(device)
    Z[n:n+len(X)] = nae.encoder(X).cpu().detach().numpy()
    n += len(X)

# その平均と分散共分散行列
Zmu = np.mean(Z, axis=0)
Zcov = np.cov(Z.T)
print(Zmu.shape, Zcov.shape)

# 平均 Zmu 分散共分散行列 Zcov の正規分布から Z をサンプリングし，画像を生成
Zgen = rng.multivariate_normal(Zmu, Zcov, 50)
Zgen = torch.tensor(Zgen.astype(np.float32)).to(device)
XXrec = nae.decoder(Zgen)
XXrec = XXrec.cpu().detach().numpy() + Xm
print(XXrec.shape)

# 生成した画像50枚を可視化
nrow, ncol = 5, 10
fig, ax = plt.subplots(nrow, ncol, figsize=(8.0, 8.0/ncol*nrow))
for i in range(nrow):
    for j in range(ncol):
        img = XXrec[i*ncol + j, ::].reshape((28, 28))
        ax[i, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
        ax[i, j].axis('off')

fig.tight_layout()
plt.show()

---
### 畳み込みAEの実験

#### ネットワーク構造の定義

このセクションでは，畳み込みとその逆変換に相当する演算を用いるニューラルネットでオートエンコーダを構成する．
ここではこのようなオートエンコーダを畳み込みAEと呼ぶ．
具体的なネットワーク構造の説明は省略するが，大まかには次のようなものである．興味があれば以下のコメントを参考に調べてね．

- エンコーダ: 2つの畳み込み層のあとに2つの全結合層を持ち，(チャンネル数, 高さ, 幅) = (1, 28, 28) という shape の画像を `H` 個の実数値に符号化する．
- デコーダ: `H` 個の実数値を入力と同じ shape の画像に再構成する．


In [None]:
### 畳み込みオートエンコーダ

# Encoder
class ConvAE_Encoder(nn.Module):

    def __init__(self, dimZ):
        super(ConvAE_Encoder, self).__init__()
        self.encoder = nn.Sequential(
            # conv1 (1, 28, 28) => (16, 14, 14)
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
            # conv2 (16, 14, 14) => (32, 7, 7)
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
        )
        self.fc = nn.Sequential(
            # flatten (32, 7, 7) => 1568
            nn.Flatten(),
            # fc1 1568 => 200
            nn.Linear(32*7*7, 200),
            nn.LeakyReLU(),
            # fc2 200 => dimZ
            nn.Linear(200, dimZ)
        )

    def forward(self, X):
        Y = self.encoder(X)
        Z = self.fc(Y)
        return Z


# Decoder
class ConvAE_Decoder(nn.Module):

    def __init__(self, dimZ):
        super(ConvAE_Decoder, self).__init__()
        self.fc = nn.Sequential(
            # fc1 dimZ => 200
            nn.Linear(dimZ, 200),
            nn.LeakyReLU(),
            # fc2 200 => 1568
            nn.Linear(200, 32*7*7),
            nn.LeakyReLU(),
            # unflatten 1568 => (32, 7, 7)
            nn.Unflatten(-1, (32, 7, 7)),
        )
        self.decoder = nn.Sequential(
            # convT1 (32, 7, 7) => (16, 14, 14)
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(),
            # convT2 (16, 14, 14) => (1, 28, 28)
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
        )

    def forward(self, Z):
        Y = self.fc(Z)
        Xt = self.decoder(Y)
        return Xt


# ConvAE
class ConvAE(nn.Module):

    def __init__(self, dimZ):
        super(ConvAE, self).__init__()
        self.encoder = ConvAE_Encoder(dimZ)
        self.decoder = ConvAE_Decoder(dimZ)

    def forward(self, X):
        Z = self.encoder(X)
        Xt = self.decoder(Z)
        return Xt

    def loss(self, X, Xt):
        return F.mse_loss(Xt, X, reduction='sum')

#### 学習

次のセルは，`H = 30` の畳み込みAEの学習を行う．

In [None]:
# データ読み込みの仕組みを作る
ds2L = MMDataset(XL2, flatten=False)
ds2T = MMDataset(XT2, flatten=False)
dl2L = DataLoader(ds2L, batch_size=100, shuffle=True)
dl2T = DataLoader(ds2T, batch_size=100, shuffle=False)

# ネットワークモデルの定義
H = 30
cae = ConvAE(H).to(device)

# パラメータ最適化器の設定
optimizer = torch.optim.Adam(cae.parameters(), lr=1e-3)

# 学習の繰り返し回数
nepoch = 100

# ネットワークの構造を表示
torchsummary.summary(cae, (1, 28, 28))

# 学習
L = []
print(f'学習データ数: {len(ds2L)}  テストデータ数: {len(ds2T)}')
print()
print('# epoch  lossL  lossT')
for t in range(1, nepoch+1):
    lossL = trainAE(cae, optimizer, dl2L) / D
    lossT = evaluateAE(cae, dl2T) / D
    L.append([t, lossL, lossT])
    if (t < 10) or (t % 10 == 0):
        print(f'{t}   {lossL:.5f}   {lossT:.5f}')

# 学習曲線の表示
data = np.array(L)
fig, ax = plt.subplots(1, 1)
ax.plot(data[:, 0], data[:, 1], '.-', label='loss for training data')
ax.plot(data[:, 0], data[:, 2], '.-', label='loss for test data')
ax.axhline(0.0, color='gray')
ax.legend()
ax.set_title(f'loss')
plt.show()

#### 再構成

In [None]:
# 学習後の再構成誤差
mseL = evaluateAE(cae, dl2L) / D
mseT = evaluateAE(cae, dl2T) / D

print(f'H = {H}')
print(f'reconstruction error (L) = {mseL:.6f}')
print(f'reconstruction error (T) = {mseT:.6f}')

# テストデータ1バッチ分の再構成
for i, X in enumerate(dl2T):
    X = X.to(device)
    Xt = cae(X)
    break
XX    = X.view(-1, D).to('cpu').detach().numpy() + Xm
XXrec = Xt.view(-1, D).to('cpu').detach().numpy() + Xm

# 再構成したテストデータの最初の10枚を可視化
ncol = 15
fig, ax = plt.subplots(2, ncol, figsize=(8.0, 8.0/ncol*2.5))

# 元画像
for j in range(ncol):
    img = XX[j, ::].reshape((28, 28))
    ax[0, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[0, j].axis('off')

# 再構成した画像
for j in range(ncol):
    img = XXrec[j, ::].reshape((28, 28))
    ax[1, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[1, j].axis('off')

fig.tight_layout()
plt.show()

#### 生成

全結合型AEと同様の方法で画像を生成する．

In [None]:
##### 生成

# 学習データの特徴量を求める
Z = np.empty((len(ds2L), H))
n = 0
for i, X in enumerate(dl2L):
    X = X.to(device)
    Z[n:n+len(X)] = cae.encoder(X).cpu().detach().numpy()
    n += len(X)

# その平均と分散共分散行列
Zmu = np.mean(Z, axis=0)
Zcov = np.cov(Z.T)
print(Zmu.shape, Zcov.shape)

# 平均 Zmu 分散共分散行列 Zcov の正規分布から Z をサンプリングし，画像を生成
Zgen = rng.multivariate_normal(Zmu, Zcov, 50)
Zgen = torch.tensor(Zgen.astype(np.float32)).to(device)
XXrec = cae.decoder(Zgen).view(-1, D)
XXrec = XXrec.cpu().detach().numpy() + Xm
print(XXrec.shape)

# 生成した画像50枚を可視化
nrow, ncol = 5, 10
fig, ax = plt.subplots(nrow, ncol, figsize=(8.0, 8.0/ncol*nrow))
for i in range(nrow):
    for j in range(ncol):
        img = XXrec[i*ncol + j, ::].reshape((28, 28))
        ax[i, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
        ax[i, j].axis('off')

fig.tight_layout()
plt.show()

---
### 畳み込みVAEの実験

#### ネットワーク構造と学習等のための関数の定義

このセクションでは，畳み込みAEを VAE に改造したニューラルネットでオートエンコーダを構成する．
ここではこのようなオートエンコーダを畳み込みVAEと呼ぶ．
ネットワークの構造は，ほぼ畳み込みAEと同じである．


In [None]:
### 畳み込み VAE

# Encoder
class ConvVAE_Encoder(nn.Module):

    def __init__(self, dimZ):
        super(ConvVAE_Encoder, self).__init__()
        self.encoder = nn.Sequential(
            # conv1 (1, 28, 28) => (16, 14, 14)
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
            # conv2 (16, 14, 14) => (32, 7, 7)
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
        )
        self.fc1 = nn.Sequential(
            # flatten (32, 7, 7) => 1568
            nn.Flatten(),
            # fc1 1568 => 200
            nn.Linear(32*7*7, 200),
            nn.LeakyReLU(0.1),
        )
        # fc2 200 => dimZ (mu, logvar)
        self.fc2_mu    = nn.Linear(200, dimZ)
        self.fc2_logvar = nn.Linear(200, dimZ)

    def forward(self, X):
        Y = self.encoder(X)
        Y = self.fc1(Y)
        mu    = self.fc2_mu(Y)
        logvar = self.fc2_logvar(Y)
        sigma = torch.exp(0.5*logvar)
        return mu, sigma


# Decoder
class ConvVAE_Decoder(nn.Module):

    def __init__(self, dimZ):
        super(ConvVAE_Decoder, self).__init__()
        self.fc = nn.Sequential(
            # fc1 dimZ => 200
            nn.Linear(dimZ, 200),
            nn.LeakyReLU(),
            # fc2 200 => 1568
            nn.Linear(200, 32*7*7),
            nn.LeakyReLU(),
            # unflatten 1568 => (32, 7, 7)
            nn.Unflatten(-1, (32, 7, 7)),
        )
        self.decoder = nn.Sequential(
            # convT1 (32, 7, 7) => (16, 14, 14)
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(),
            # convT2 (16, 14, 14) => (1, 28, 28)
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
        )

    def forward(self, Z):
        Y = self.fc(Z)
        Xt = self.decoder(Y)
        return Xt


### reparameterization trick
#
def reparameterization(mu, sigma):
    eps = torch.randn_like(sigma)
    Z = mu + sigma * eps
    return Z


# ConvVAE
class ConvVAE(nn.Module):

    def __init__(self, dimZ):
        super(ConvVAE, self).__init__()
        self.encoder = ConvVAE_Encoder(dimZ)
        self.decoder = ConvVAE_Decoder(dimZ)

    def forward(self, X):
        mu, sigma = self.encoder(X)
        Z = reparameterization(mu, sigma)
        Xt = self.decoder(Z)
        return Xt, mu, sigma

    def reconstruct(self, X):
        mu, sigma = self.encoder(X)
        Xt = self.decoder(mu)
        return Xt

    def loss(self, Xt, X, mu, sigma):
        SQE = F.mse_loss(Xt, X, reduction='sum')
        sigma2 = sigma**2
        KLD = - torch.sum(1 + torch.log(sigma2) - mu**2 - sigma2)
        return SQE, KLD

In [None]:
# 学習の関数
#
def trainVAE(model, optimizer, dl):
    loss_sum = sqe_sum = kld_sum = 0.0
    n = 0
    for i, X in enumerate(dl):
        X = X.to(device)
        Xt, mu, sigma = model(X) # 一つのバッチ X を入力して出力を計算
        sqe, kld = model.loss(Xt, X, mu, sigma) # 損失関数の値を計算
        loss = sqe + kld
        optimizer.zero_grad()   # 勾配をリセット
        loss.backward()         # 誤差逆伝播でパラメータ更新量を計算
        optimizer.step()         # パラメータを更新
        n += len(X)
        loss_sum += loss.item()  # 損失関数の値
        sqe_sum += sqe.item()
        kld_sum += kld.item()

    return np.array([loss_sum, sqe_sum, kld_sum])/n


# 損失関数の値を求める関数
#
@torch.no_grad()
def evaluateVAE(model, dl):
    loss_sum = sqe_sum = kld_sum = 0.0
    n = 0
    for i, X in enumerate(dl):
        X = X.to(device)
        Xt, mu, sigma = model(X)  # 一つのバッチ X を入力して出力を計算
        sqe, kld = model.loss(Xt, X, mu, sigma) # 損失関数の値を計算
        loss = sqe + kld
        n += len(X)
        loss_sum += loss.item() # 損失関数の値
        sqe_sum += sqe.item()
        kld_sum += kld.item()

    return np.array([loss_sum, sqe_sum, kld_sum])/n


#### 学習

次のセルは，`H = 30` の畳み込みVAEの学習を行う．

In [None]:
# データ読み込みの仕組みを作る
ds2L = MMDataset(XL2, flatten=False)
ds2T = MMDataset(XT2, flatten=False)
dl2L = DataLoader(ds2L, batch_size=100, shuffle=True)
dl2T = DataLoader(ds2T, batch_size=100, shuffle=False)

# ネットワークモデルの定義
H = 30
cvae = ConvVAE(H).to(device)

# パラメータ最適化器の設定
optimizer = torch.optim.Adam(cvae.parameters(), lr=1e-3)

# 学習の繰り返し回数
nepoch = 100

# ネットワークの構造を表示
torchsummary.summary(cvae, (1, 28, 28))

# 学習
L = []
print(f'学習データ数: {len(dsL)}  テストデータ数: {len(dsT)}')
print()
print('# epoch  lossL  lossT  sqeL  sqeT  kldL  kldT')
for t in range(1, nepoch+1):
    rv = trainVAE(cvae, optimizer, dl2L) / D
    lossL, sqeL, kldL = rv
    rv = evaluateVAE(cvae, dl2T) / D
    lossT, sqeT, kldT = rv
    L.append([t, lossL, lossT, sqeL, sqeT, kldL, kldT])
    if (t < 10) or (t % 10 == 0):
        print(f'{t}   {lossL:.5f}   {lossT:.5f}   {sqeL:.5f}   {sqeT:.5f}   {kldL:.5f}   {kldT:.5f}')

# 学習曲線の表示
data = np.array(L)
fig, ax = plt.subplots(1, 1)
ax.plot(data[:, 0], data[:, 1], '.-', label='loss for training data')
ax.plot(data[:, 0], data[:, 2], '.-', label='loss for test data')
ax.axhline(0.0, color='gray')
ax.legend()
ax.set_title(f'loss')
plt.show()


#### 再構成

In [None]:
# 学習後の再構成誤差
_, mseL, _ = evaluateVAE(cvae, dl2L) / D
_, mseT, _ = evaluateVAE(cvae, dl2T) / D

print(f'H = {H}')
print(f'reconstruction error (L) = {mseL:.6f}')
print(f'reconstruction error (T) = {mseT:.6f}')

# テストデータ1バッチ分の再構成
for i, X in enumerate(dl2T):
    X = X.to(device)
    Xt = cvae.reconstruct(X)
    break
XX    = X.view(-1, D).to('cpu').detach().numpy() + Xm
XXrec = Xt.view(-1, D).to('cpu').detach().numpy() + Xm

# 再構成したテストデータの最初の10枚を可視化
ncol = 15
fig, ax = plt.subplots(2, ncol, figsize=(8.0, 8.0/ncol*2.5))

# 元画像
for j in range(ncol):
    img = XX[j, ::].reshape((28, 28))
    ax[0, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[0, j].axis('off')

# 再構成した画像
for j in range(ncol):
    img = XXrec[j, ::].reshape((28, 28))
    ax[1, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
    ax[1, j].axis('off')

fig.tight_layout()
plt.show()

#### 生成


VAEの潜在変数は標準正規分布 $N(\mathbf{z}; \mathbf{0}, I)$ に従うと仮定できるので，標準正規分布に従う乱数をサンプリングし，デコーダに入力して画像を生成する．


In [None]:
# 生成
Zgen = np.random.normal(size=50*H).reshape((50, H))
Zgen = torch.tensor(Zgen.astype(np.float32)).to(device)
XXrec = cvae.decoder(Zgen)
XXrec = XXrec.view(-1, D).cpu().detach().numpy() + Xm

# 生成した画像50枚を可視化
nrow, ncol = 5, 10
fig, ax = plt.subplots(nrow, ncol, figsize=(8.0, 8.0/ncol*nrow))
for i in range(nrow):
    for j in range(ncol):
        img = XXrec[i*ncol + j, ::].reshape((28, 28))
        ax[i, j].imshow(img, cmap=plt.cm.gray, vmin=0, vmax=1)
        ax[i, j].axis('off')

fig.tight_layout()
plt.show()

---
## 問題
---

#### 問題1

上記の実験の結果を見て，PCA, 全結合型AE, 畳み込みAE, 畳み込みVAE の再構成誤差および再構成画像の質について比較・考察しなさい．
本来，ニューラルネットの学習にはパラメータの初期値その他の選択によるランダム性があるが，ここではそれぞれの手法について1回ずつの結果の比較でよい．
また，再構成画像の質については，主観的な評価でよい．

#### 問題2

PCA, 全結合型AE, 畳み込みAE, 畳み込みVAE の生成画像の質について比較・考察しなさい．
主観的な評価でよい．

#### 問題3

畳み込みニューラルネットの応用事例について調べて論述しなさい．

#### 問題4

VAEのようなニューラルネットを用いる生成モデルの応用事例について調べて論述しなさい．