<a href="https://colab.research.google.com/github/takatakamanbou/Vision/blob/main/Vision2023_ex14.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Vision2023-ex14





---
## 準備
---

#### GPUを利用するようにランタイムのタイプを変更する



この notebook では，通常の Colab Notebook の動かし方では時間がかかる所があります（特に最後の実験）．次のようにしてランタイムのタイプを変更し，より高速な計算ができるようにしましょう．

1. メニューの「ランタイム」 > 「ランタイムのタイプを変更」 を選択．
1. 「ノートブックの設定」というポップアップウィンドウが開くので，「ハードウェアアクセラレータ」を「None」から「GPU」に変更し，「保存」する
1. いつもどおりコードセルを実行する．すでに実行していた場合，「以前のランタイムを削除する」というポップアップウィンドウが現れるので，「OK」を押しし，一番最初のコードセルから実行し直ます．

Colab Notebook は Linux を OS とする PC （クラウド上の仮想マシン）で実行されます．
通常は，その実行は CPU 上で行われますが，上記のように設定を変更することで，一部の計算を GPU にまかせることができるようになります（注）．

<span style="font-size: 75%">
※注: 無料でできますが，計算時間等が制限されています．
</span>

GPU (Graphical Processing Unit) というのは，PCのグラフィックスボード／ビデオカードやゲーム機等に搭載される，画像処理に特化した演算装置です．CPUのような汎用性がない代わりに，特定の処理をCPUよりずっと高速に実行できます．
この notebook では PyTorch という深層学習フレームワークを使用しますが， PyTorch ではニューラルネットの出力や学習のための計算を GPU 上で行って高速化することが簡単にできるようになっています．

#### いろいろ import

In [None]:
# 準備あれこれ
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn
seaborn.set()

In [None]:
# PyTorch 関係のほげ
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
import torchsummary

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

GPU が使えるようになっていれば，↑のセルを実行すると `cuda:0` と出力されるはずです．


In [None]:
# 手書き数字データの入手
! wget -nc https://www-tlab.math.ryukoku.ac.jp/~takataka/course/ML/minimnist.npz
minimnist = np.load('minimnist.npz')
K = 10 # クラス数
D = minimnist['datL'].shape[1] # データの次元数 28 x 28 = 784

In [None]:
# データを画像として表示するための関数
#
def display(data, nx, ny, nrow=28, ncol=28, gap=4):

    assert data.shape[1] == nrow*ncol

    # 並べた画像の幅と高さ
    width  = nx * (ncol + gap) + gap
    height = ny * (nrow + gap) + gap

    # 画像の作成
    img = np.zeros((height, width), dtype = int) + 128
    for iy in range(ny):
        lty = iy*(nrow + gap) + gap
        for ix in range(nx):
            if iy*nx+ix < data.shape[0]:
                ltx = ix*(ncol + gap) + gap
                img[lty:lty+nrow, ltx:ltx+ncol] = data[iy*nx+ix].reshape((nrow, ncol))

    # 画像の出力
    plt.axis('off')
    plt.imshow(img, cmap = 'gray')
    plt.show()

In [None]:
nx, ny = 10, 10
display(minimnist['datL'], nx, ny)

---
## 主成分分析による次元削減と再構成
---



In [None]:
# 画素値を 255 で割って平均を引いたものをデータとする
XL = minimnist['datL'].astype(float) / 255
XT = minimnist['datT'].astype(float) / 255
Xmean = np.mean(XL, axis=0)
XL -= Xmean
XT -= Xmean
print(XL.shape, XT.shape)

# 平均を画像として表示
plt.figure(figsize=(2, 2))
plt.axis('off')
plt.imshow(Xmean.reshape(28, 28)*255, cmap = 'gray')
plt.show()

In [None]:
# XL の分散共分散行列の固有値と固有ベクトルを求める
_, sv, Vt = np.linalg.svd(XL, full_matrices=False)
eva = sv**2/len(XL)
U = Vt
print(eva.shape, U.shape)

# 累積寄与率を求める
cumcontrib = np.cumsum(eva)
cumcontrib /= cumcontrib[-1]

In [None]:
# 固有値と固有ベクトルを図示
plt.figure(figsize=(6, 3))
fig, ax = plt.subplots(1, 2, facecolor='white', figsize=(12, 4))
Dmax = 100
ax[0].plot(np.arange(1, Dmax+1), eva[:Dmax], '.-', label='eigenvalues')
ax[0].legend()
Dmax = 200
ax[1].plot(np.arange(1, Dmax+1), cumcontrib[:Dmax], '-', label='cumrative contribution rate')
ax[1].legend()
ax[1].axhline(0.8, color='gray')
ax[1].axhline(0.9, color='gray')
ax[1].axhline(0.95, color='gray')
ax[1].set_ylim(0, 1)
plt.show()

In [None]:
# 再構成して誤差を求める
hdimList = [10, 20, 50, 100, 200, 500]
sqeL = np.empty(len(hdimList))
sqeT = np.empty_like(sqeL)

YL = XL @ U.T
YT = XT @ U.T
print(f'hdim    sqeL    sqeT')
for i, hdim in enumerate(hdimList):
    ZL = YL[:, :hdim] @ U[:hdim, :]
    ZT = YT[:, :hdim] @ U[:hdim, :]
    sqeL[i] = np.mean((XL - ZL)**2)
    sqeT[i] = np.mean((XT - ZT)**2)
    print(f'{hdim}   {sqeL[i]:.6f}    {sqeT[i]:.6f}')


In [None]:
# 次元数を決めて再構成画像をつくる
hdim = 50
YT = XT @ U[:hdim, :].T
ZT = YT @ U[:hdim, :]
img = (ZT + Xmean) * 255
img = np.clip(img, 0, 255)

nx, ny = 10, 4
display(minimnist['datT'], nx, ny) # 元画像
display(img, nx, ny) # 再構成

---
## 全結合型 Auto-Encoder による次元削減と再構成
---

---
### 準備

In [None]:
# データを扱うためのクラス
#
class MMDataset(Dataset):

    def __init__(self, data, LT):
        self.XL = data['datL'].astype(float)/255
        self.XT = data['datT'].astype(float)/255
        self.Xmean = np.mean(self.XL, axis=0)
        self.XL -= self.Xmean
        self.XT -= self.Xmean
        if LT == 'L':
            self.X = self.XL
            self.Y = data['labL']
        else:
            self.X = self.XT
            self.Y = data['labT']

    def __len__(self):
        return len(self.Y)

    def __getitem__(self, idx):
        x = torch.tensor(self.X[idx], dtype=torch.float32)
        y = torch.tensor(self.Y[idx], dtype=torch.int64)
        return x, y

In [None]:
# 2層のオートエンコーダ（線形）
#
class AutoEncoder2(nn.Module):

    def __init__(self, D, H):
        super(AutoEncoder2, self).__init__()
        L = []
        # エンコーダ部
        L.append(nn.Linear(D, H, bias=False))
        # デコーダ部
        L.append(nn.Linear(H, D, bias=False))
        self.layers = nn.ModuleList(L)

    def forward(self, X):
        for layer in self.layers:
            X = layer(X)
        return X

In [None]:
# 4層のオートエンコーダ（非線形）
#
class AutoEncoder4(nn.Module):

    def __init__(self, D, M, H):
        super(AutoEncoder4, self).__init__()
        L = []
        # エンコーダ部
        L.append(nn.Linear(D, M))
        L.append(nn.ReLU())
        L.append(nn.Linear(M, H, bias=False))
        # デコーダ部
        L.append(nn.Linear(H, M))
        L.append(nn.ReLU())
        L.append(nn.Linear(M, D))
        self.layers = nn.ModuleList(L)

    def forward(self, X):
        for layer in self.layers:
            X = layer(X)
        return X

In [None]:
# 学習の関数
#
def train(model, lossFunc, optimizer, dl):
    loss_sum = 0.0
    n = 0
    for i, (X, lab) in enumerate(dl):
        X, lab = X.to(device), lab.to(device)
        Z = model(X)           # 一つのバッチ X を入力して出力 Z を計算
        loss = lossFunc(Z, X)  # 入力と出力の間の誤差を計算
        optimizer.zero_grad()  # 勾配をリセット
        loss.backward()        # 誤差逆伝播でパラメータ更新量を計算
        optimizer.step()       # パラメータを更新
        n += len(X)
        loss_sum += loss.item()  # 損失関数の値

    return loss_sum/n

In [None]:
# 評価の関数
#
@torch.no_grad()
def evaluate(model, lossFunc, dl):
    loss_sum = 0.0
    n = 0
    for i, (X, lab) in enumerate(dl):
        X, lab = X.to(device), lab.to(device)
        Z = model(X)           # 一つのバッチ X を入力して出力 Y を計算
        loss = lossFunc(Z, X)  # 入力と出力の間の誤差を計算
        n += len(X)
        loss_sum += loss.item() # 損失関数の値

    return loss_sum/n

---
### 実験その1 2層 Auto-Encoder

In [None]:
# データ読み込みの仕組み
dsL = MMDataset(minimnist, 'L')
dsT = MMDataset(minimnist, 'T')
dlL = DataLoader(dsL, batch_size=100, shuffle=False)
dlT = DataLoader(dsT, batch_size=100, shuffle=False)

# ネットワークモデルの定義
H = 10
net = AutoEncoder2(D, H).to(device)

# 損失関数（二乗誤差）
loss_func = nn.MSELoss(reduction='sum')

# パラメータ最適化器
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

# 学習の繰り返し回数
nepoch = 100

# 学習
L = []
print('########## 実験1 ##########');
print(f'学習データ数: {len(dsL)}  テストデータ数: {len(dsT)}')
print()
print(net)
print()
print('# epoch  msqeL  msqeT')
for t in range(1, nepoch+1):
    msqeL = train(net, loss_func, optimizer, dlL) / D
    msqeT = evaluate(net, loss_func, dlT) / D
    L.append([t, msqeL, msqeT])
    if (t < 10) or (t % 10 == 0):
        print(f'{t}   {msqeL:.6f}   {msqeT:.6f}')


---
### 実験その2 4層 Auto-Encoder

In [None]:
# データ読み込みの仕組み
dsL = MMDataset(minimnist, 'L')
dsT = MMDataset(minimnist, 'T')
dlL = DataLoader(dsL, batch_size=100, shuffle=False)
dlT = DataLoader(dsT, batch_size=100, shuffle=False)

# ネットワークモデルの定義
M = 1000
H = 10
net = AutoEncoder4(D, M, H).to(device)

# 損失関数（二乗誤差）
loss_func = nn.MSELoss(reduction='sum')

# パラメータ最適化器
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

# 学習の繰り返し回数
nepoch = 100

# 学習
L = []
print('########## 実験2 ##########');
print(f'学習データ数: {len(dsL)}  テストデータ数: {len(dsT)}')
print()
print(net)
print()
print('# epoch  msqeL  msqeT')
for t in range(1, nepoch+1):
    msqeL = train(net, loss_func, optimizer, dlL) / D
    msqeT = evaluate(net, loss_func, dlT) / D
    L.append([t, msqeL, msqeT])
    if (t < 10) or (t % 10 == 0):
        print(f'{t}   {msqeL:.6f}   {msqeT:.6f}')


In [None]:
# 学習データの再構成を求める
for X, lab in dlL:
    X, lab = X.to(device), lab.to(device)
    Z = net(X)
    break
img = (Z.cpu().detach().numpy() + Xmean) * 255
img = np.clip(img, 0, 255)

nx, ny = 10, 4
display(minimnist['datL'], nx, ny) # 元画像
display(img, nx, ny) # 再構成

In [None]:
# テストデータの再構成を求める
for X, lab in dlT:
    X, lab = X.to(device), lab.to(device)
    Z = net(X)
    break
img = (Z.cpu().detach().numpy() + Xmean) * 255
img = np.clip(img, 0, 255)

nx, ny = 10, 4
display(minimnist['datT'], nx, ny) # 元画像
display(img, nx, ny) # 再構成