<a href="https://colab.research.google.com/github/yuuki330/RBM/blob/main/rbm_py_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MNIST画像をRBMにて学習する基本実装


### RBMクラスの定義

In [19]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
from torchvision import datasets, transforms
import torchvision.transforms.functional as TF
from torch.utils.data import DataLoader
import numpy as np


class RBM(nn.Module):
    """Restricted Boltzmann Machine. Default: Bernoulli-Bernoulli RBM
    Args:
        xsize: The number of visible units.
        hsize: The number of hidden units. (Default: 100)
        data_loader: Data Loader used to calculate initial parameter values. (Default: None)
        k: The number of Gibbs sampling. (Default: 1)
        winit: Initial values of weights. (Default: 0.001)
    """

    def __init__(self, xsize, hsize=100, data_loader=None, k=1, winit=0.001):
        super(RBM, self).__init__()
        if data_loader is None:
            b0 = 0
        else:
            # just compute a mean vector of training data
            # b0 = data_loader.dataset.data.mean(0)

            ####### 変更 #######
            b0 = data_loader.dataset.data.to(torch.float32).mean(0)
            ###################

        self.b = nn.Parameter(b0 * torch.ones(1, xsize))
        self.c = nn.Parameter(-1 * torch.ones(1, hsize)) # to make h sparse
        self.W = nn.Parameter(winit * torch.randn(hsize, xsize))
        self.k = k

    def rbmup(self, x):
        h = torch.sigmoid(F.linear(x, self.W, self.c))
        return h.bernoulli()

    def rbmdown(self, h):
        x = torch.sigmoid(F.linear(h, self.W.t(), self.b))
        return x.bernoulli()

    def sample(self, x):
        h = self.rbmup(x)
        for _ in range(self.k):
            x_ = self.rbmdown(h)
            h = self.rbmup(x_)
        return x_

    def forward(self, x):
        """
        Returns: free energy of input
        """
        x_term = torch.matmul(x, self.b.t())
        w_x_h = F.linear(x, self.W, self.c)
        h_term = torch.sum(F.softplus(w_x_h), dim=1)
        return torch.mean(- h_term - x_term)


In [20]:
def mnist_transform(x):
    x = x.view(-1, x.size(1)*x.size(2))
    return x

##疑問点1(未解消)
Totensor()では正規化処理が行われるはずなのに、試行1に示すように行われていない。\
しかし、試行2のように実行するとうまく動作するうまく動作する。-> 原因は不明\
また、試行3に示すように、無理矢理255で割ると結果が反映されるため、画像で実行する際は一時的にそのように実装するのが良さそう。

In [21]:
# 試行1

import matplotlib.pyplot as plt 

torch.set_printoptions(edgeitems=10)

train_set = datasets.MNIST('./data', train=True, download=True)
print(f'train_set.data[0] : {train_set.data[0].dtype}')

# trans = transforms.ToTensor()
train_set.data = train_set.data.numpy()
# train_set.data = train_set.data.astype(np.uint8)
tmp_data = TF.to_tensor(train_set.data)
print(f'ToTensor後 train_set.data[0] : {tmp_data[0] > 0}')

# mnist_transform()により、[N, 28, 28] -> [N, 784]へ変更
# out_x = mnist_transform(train_set.data)
# print(f'out_x[0] : {out_x[0]}')

# 以下でも同様の処理が可能
# train_set = train_set.reshape(-1, 784)

train_set.data[0] : torch.uint8
ToTensor後 train_set.data[0] : tensor([[False, False, False, False, False, False, False, False, False, False,
          ..., False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False, False,
          ..., False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False, False,
          ..., False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False, False,
          ..., False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False, False,
          ..., False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, Fal

In [22]:
# 試行2

a = np.full((3,3), 250, dtype=np.uint8)  # need uint8
print(a)
x = transforms.ToTensor()
print(x(a))  # 0 ~ 1

[[250 250 250]
 [250 250 250]
 [250 250 250]]
tensor([[[0.9804, 0.9804, 0.9804],
         [0.9804, 0.9804, 0.9804],
         [0.9804, 0.9804, 0.9804]]])


In [23]:
# 試行3

transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(mnist_transform)])  # transfoems.Lambda()により値を0からから1へ正規化...されない
train_set = datasets.MNIST('./data', train=True, download=True, transform=transform)
print(f'train_set.data[0] : {train_set.data[0]}')

train_set.data = train_set.data / 255
print(f'train_set.data[0] : {train_set.data[0]}')

train_set.data[0] : tensor([[  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
        [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   3,  18,
          18,  18, 126, 136, 175,  26, 166, 255, 247, 127,   0,   0,   0,   0],
        [  0,   0,   0,   

### データセットの定義とモデルのインスタンス作成

In [24]:
xsize = 28
hsize = 10
batch_size = 8
seed = 1
torch.manual_seed(seed)

device = device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(mnist_transform)])
train_set = datasets.MNIST('./data', train=True, download=True, transform=transform)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)

model = RBM(xsize=xsize, hsize=hsize, data_loader=train_loader).to(device)

In [25]:
model.c

Parameter containing:
tensor([[-1., -1., -1., -1., -1., -1., -1., -1., -1., -1.]], device='cuda:0',
       requires_grad=True)

## 疑問点2(解消)
rbmup()において、\
`RuntimeError: mat1 and mat2 must have the same dtype`\
というエラーが出る\
\
-> test_dataのdtypeが異なっていたため、torch.float32に変更したら解消した

In [28]:
test_data = train_loader.dataset.data.to(torch.float32)[0]
print(f'test_data.shape : {test_data.shape}')
print(f'test_data.type : {test_data.dtype}')

winit=0.001
c = nn.Parameter(-1 * torch.ones(1, hsize))
W = nn.Parameter(winit * torch.randn(hsize, xsize))

print(f'c.shape : {c.shape}')
print(f'W.shape : {W.shape}')
print(f'c.type : {c.dtype}')
print(f'W.type : {W.dtype}')

# F.linear(test_data, W, c)
# F.linear(test_data, W, c)

test_data = test_data.to(device)
test_data_ = model.rbmup(test_data)
print(test_data_)

test_data.shape : torch.Size([28, 28])
test_data.type : torch.float32
c.shape : torch.Size([1, 10])
W.shape : torch.Size([10, 28])
c.type : torch.float32
W.type : torch.float32
tensor([[0., 0., 0., 0., 1., 1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
        [0., 1., 0., 0., 1., 1., 1., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0., 1., 1., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [1., 0., 0., 1., 0., 0., 0., 1., 0., 1.],
        [0., 1., 1., 0., 0., 1., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 1., 1., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0., 0., 1., 1.],
        [1., 0., 0., 1., 0., 1., 0., 0., 0., 0.],
        [1., 1., 0., 1., 0., 0., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 1., 1., 0., 0.],
        [1., 0., 0., 0.

In [None]:
def train(model, train_loader, n_epochs=20, lr=0.01, device=torch.device("cpu")):
    r"""Train a RBM model.
    Args:
        model: The model.
        train_loader (DataLoader): The data loader.
        n_epochs (int, optional): The number of epochs. Defaults to 20.
        lr (Float, optional): The learning rate. Defaults to 0.01.
    Returns:
        The trained model.
    """
    # optimizer
    train_op = optim.Adam(model.parameters(), lr)

    # train the RBM model
    model.train()

    for epoch in range(n_epochs):
        t = time.time()
        loss_ = []
        mse = 0
        it = 0
        for data, _ in train_loader:
            x = data.to(device)
            x_ = model.sample(x).detach()
            loss = model(x) - model(x_)
            loss_.append(loss.item())
            train_op.zero_grad()
            loss.backward()
            train_op.step()
            # x1 = x + torch.randn_like(x)
            # x1.requires_grad = True
            # x2 = model.sample_langevin(x1).detach()
            with torch.no_grad():
                mse += F.mse_loss(x, x_, reduction='sum')
                # mse += F.mse_loss(x, x2, reduction='sum')

        mse /= len(train_loader.dataset)
        elapsed = time.time() - t
        print('[Epoch %02d/%02d] (%.2f sec)\t MSE=%.2f, Loss=%.2f, |W|=%.2f' % (epoch+1, n_epochs, elapsed, mse, np.mean(loss_), model.W.norm()))

    return model

In [None]:
class GaussRBM(RBM):
    r"""Gaussian-Bernoulli RBM.
    .. math::
        \begin{align}
            E(x, h) = \frac{1}{2} (\frac{x}{\sigma})^\top (\frac{x}{\sigma}) - x^\top W h -b^\top x - c^\top h
        \end{align}
    """

    def __init__(self, xsize, hsize=100, data_loader=None, k=1, winit=0.001):
        super(GaussRBM, self).__init__(xsize, hsize, data_loader, k, winit)
        if data_loader is None:
            z0 = 1
        else:
            z0 = data_loader.dataset.data.var(0)
        self.z = nn.Parameter((z0 * torch.ones(1, xsize) + 1e-24).log()) # logvar
        #self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        #self.z = (z0 * torch.ones(1, n_vis) + 1e-24).log().to(self.device)
        self.use_langevin = False

    def rbmdown(self, h):
        mu = self.z.exp() * F.linear(h, self.W.t(), self.b)
        x = torch.randn_like(mu) * (self.z.exp().sqrt()) + mu
        return x

    def sample_langevin(self, x_, steps=30, eta=1.0e-5):
        eta = torch.tensor(eta)
        for n in range(steps):
            noise = torch.randn_like(x_)
            out = -self.forward(x_)
            out.backward()
            x_.data.add_(0.5 * eta * x_.grad.data + torch.sqrt(eta) * noise.data)
            x_.grad.detach_()
            x_.grad.zero_()
        return x_

    def sample(self, x):
        if self.use_langevin:
            x_ = torch.randn_like(x, requires_grad=True)
            return self.sample_langevin(x_)
        else:
            return super().sample(x)

    def forward(self, x):
        x_term = torch.matmul(self.b, x.t()) - ((x ** 2 / (self.z.exp())) / 2).sum(1)
        w_x_h = F.linear(x, self.W, self.c)
        h_term = torch.sum(F.softplus(w_x_h), dim=1)
        return torch.mean(- h_term - x_term)