In [None]:
import numpy as np

In [None]:
class Relu:
    def __init__(self):
        self.params, self.grads = [], []
        self.mask = None

    def forward(self, x):
        self.mask = (x <= 0)
        out = x.copy()
        out[self.mask] = 0
        return out

    def backward(self, dout):
        dout[self.mask] = 0
        return dout

In [None]:
class Affine:
    def __init__(self, w, b):
        self.params = [w, b]
        self.grads = [np.zeros_like(w), np.zeros_like(b)]
        self.x, self.x_shape = None, None

    def forward(self, x):
        self.x_shape = x.shape
        self.x = x.reshape(x.shape[0], -1)
        return np.dot(self.x, self.params[0]) + self.params[1]

    def backward(self, dout):
        self.grads[0][...] = np.dot(self.x.T, dout)
        self.grads[1][...] = np.sum(dout, axis=0)
        dx = np.dot(dout, self.params[0].T)
        return dx.reshape(*self.x_shape)

In [None]:
class SoftmaxWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.y, self.t = None, None
        self.delta = 1e-7

    def softmax(self, x, T=1):
        # Tは温度パラメータ
        if x.ndim == 2:
            x -= x.max(axis=1, keepdims=True)
            x = np.exp(x / T)
            x /= x.sum(axis=1, keepdims=True)
        elif x.ndim == 1:
            x -= np.max(x)
            x = np.exp(x / T)
            x /= np.sum(x)
        return x

    def cross_entropy_error(self, y, t):
        if y.ndim == 1:
            y = y.reshape(1, y.size)
        if t.ndim == 1:
            t = t.reshape(1, t.size)
        if t.size == y.size:
            # 教師ラベルがone-hotベクトルの場合、正解のインデックスに変換
            t = t.argmax(axis=1)
        return -np.sum(np.log(y[np.arange(y.shape[0]), t] + self.delta)) / y.shape[0]

    def forward(self, x, t):
        self.t = t
        self.y = self.softmax(x)
        if self.t.size == self.y.size:
            # 教師ラベルがone-hotベクトルの場合、正解のインデックスに変換
            self.t = self.t.argmax(axis=1)
        return self.cross_entropy_error(self.y, self.t)

    def backward(self, dout=1):
        size = self.t.shape[0]
        if self.t.size == self.y.size:
            return (self.y - self.t) / size
        dx = self.y.copy()
        dx[np.arange(size), self.t] -= 1
        return dx / size
        #dx *= dout
        #dx /= self.t.shape[0]
        #return dx

In [None]:
class SGD:
    def __init__(self, lr=0.01):
        self.lr = lr

    def update(self, params, grads):
        for i in range(len(params)):
            params[i] -= self.lr * grads[i]

In [None]:
class Momentum:
    def __init__(self, lr=0.01, momentum=0.9):
        self.lr, self.momentum = lr, momentum
        self.v = None

    def update(self, params, grads):
        if self.v is None:
            self.v = [np.zeros_like(p) for p in params]
        for i in range(len(params)):
            self.v[i] = self.momentum * self.v[i] - self.lr * grads[i]
            params[i] += self.v[i]

In [None]:
class AdaGrad:
    def __init__(self, lr=0.01):
        self.lr = lr
        self.v = None
        self.delta = 1e-7

    def update(self, params, grads):
        if self.v is None:
            self.v = [np.zeros_like(p) for p in params]
        for i in range(len(params)):
            self.v[i] += grads[i] ** 2
            params[i] -= self.lr * grads[i] / (np.sqrt(self.v[i]) + self.delta)

In [None]:
class RMSprop:
    def __init__(self, lr=0.01, decay = 0.99):
        self.lr, self.decay = lr, decay
        self.v = None
        self.delta = 1e-7

    def update(self, params, grads):
        if self.v is None:
            self.v = [np.zeros_like(p) for p in params]
        for i in range(len(params)):
            self.v[i] *= self.decay
            self.v[i] += (1 - self.decay) * grads[i] ** 2
            params[i] -= self.lr * grads[i] / (np.sqrt(self.v[i]) + self.delta)

In [None]:
class Dropout:
    def __init__(self, ratio=0.5):
        self.ratio = ratio
        self.mask = None
        self.params, self.grads = [], []

    def forward(self, x, train=True):
        if train:
            self.mask = np.random.rand(*x.shape) > self.ratio
            return x * self.mask
        self.mask = np.ones_like(*x.shape, dtype=bool)
        return x * (1.0 - self.ratio)

    def backward(self, dout):
        return dout * self.mask

In [None]:
class BatchNormalization:
    def __init__(self, gamma, beta, momentum=0.9, mean=None, var=None):
        self.params, self.grads = [], []
        self.gamma, self.beta, self.momentum, self.mean, self.var = gamma, beta, momentum, mean, var
        self.input_shape, self.xc, self.std, self.dgamma, self.dbeta = None, None, None, None, None
        self.delta = 1e-6

    def __forward(self, x, train):
        if self.mean is None:
            _, D = x.shape
            self.mean, self.var = np.zeros(D), np.zeros(D)
        if train:
            mu = x.mean(axis=0)
            self.xc = x - mu
            var = np.mean(self.xc ** 2, axis=0)
            self.std = np.sqrt(var + self.delta)
            self.xn = xn = self.xc / self.std
            self.mean = self.momentum * self.mean + (1 - self.momentum) * mu
            self.var = self.momentum * self.var + (1 - self.momentum) * var
        else:
            xn = (x - self.mean) / np.sqrt(self.var + self.delta)
        return self.gamma * xn + self.beta

    def forward(self, x, train=True):
        self.input_shape = x.shape
        if x.ndim != 2:
            N, _, _, _ = x.shape
            x = x.reshape(N, -1)
        out = self.__forward(x, train)
        return out.reshape(*x.shape)

    def __backward(self, dout):
        self.dbeta = dout.sum(axis=0)
        self.dgamma = np.sum(self.xn * dout, axis=0)
        dxn = self.gamma * dout
        dxc = dxn / self.std
        dstd = -np.sum((dxn * self.xc) / (self.std **2), axis=0)
        dvar = 0.5 * dstd / self.std
        dxc += (2.0 / self.input_shape[0]) * self.xc * dvar
        dmu = np.sum(dxc, axis=0)
        return dxc - dmu / self.input_shape[0]

    def backward(self, dout):
        if dout.ndim != 2:
            N, _, _, _ = dout.shape
            dout = dout.reshape(N, -1)
        dx = self.__backward(dout)
        return dx.reshape(*self.input_shape)

In [None]:
class Model:
    def __init__(self):
        self.params, self.grads, self.layers = [], [], []
        self.loss = None
    
    def append(self, layer):
        self.layers.append(layer)
        self.params += layer.params
        self.grads += layer.grads
    
    def append_loss(self, layer):
        self.loss = layer
    
    def predict(self, x, train=False):
        for layer in self.layers:
            if isinstance(layer, Dropout) or isinstance(layer, BatchNormalization):
                x = layer.forward(x, train)
            else:
                x = layer.forward(x)
        return x

    def forward(self, x, t, train=True):
        y = self.predict(x, train)
        return self.loss.forward(y, t)

    def backward(self, dout=1):
        dout = self.loss.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout
    
    def summary(self):
        print('-' * 50)
        for layer in self.layers:
            print(type(layer))
            for param in layer.params:
                print(param.shape)
        print(type(self.loss))
        print('-' * 50)

In [None]:
class Trainer:
    def __init__(self, model, optimizer):
        self.model, self.optimizer = model, optimizer
    
    def fit(self, x, t, epoch_size, batch_size):
        size = len(x)
        iters = size // batch_size
        for ep in range(epoch_size):
            total_loss = 0
            loss_count = 0
            accuracy = 0
            for it in range(iters):
                mask = np.random.choice(size, batch_size)
                x_batch, t_batch = x[mask], t[mask]
                y = self.model.predict(x_batch, True)
                loss = self.model.loss.forward(y, t_batch)
                #if student is not None:
                #    p = student.loss.softmax(y, T)
                #    q = student.loss.softmax(student.predict(x_batch, True), T)
                #    student.backward()
                    
                #loss = self.model.forward(x_batch, t_batch)
                self.model.backward()
                self.optimizer.update(self.model.params, self.model.grads)
                total_loss += loss
                loss_count += 1
                if t_batch.ndim != 1: t_batch = np.argmax(t_batch, axis=1)
                accuracy += np.sum(np.argmax(y, axis=1) == t_batch) / float(x_batch.shape[0])
            print('loss %.2f | accuracy %.4f' %(total_loss / loss_count, accuracy / loss_count))

In [None]:
#データセット読み込み
import sys
sys.path.append('..')
from deep_learning_from_scratch.dataset.mnist import load_mnist
(x_train, t_train), (x_test, t_test) = load_mnist(normalize=True, one_hot_label=True)

In [None]:
def scale(count, typ=0):
    if typ == 0:
        return np.sqrt(2.0 / count) #He
    return np.sqrt(1.0 / count) # Xavier

In [None]:
#学習
m1 = Model()
m1.append(Affine(scale(784) * np.random.randn(784, 100), np.zeros(100)))
m1.append(BatchNormalization(np.ones(100), np.zeros(100)))
m1.append(Relu())
m1.append(Affine(scale(100) * np.random.randn(100, 10), np.zeros(10)))
m1.append_loss(SoftmaxWithLoss())
m1.summary()
o1 = SGD(lr=0.1)
t1 = Trainer(m1, o1)
t1.fit(x_train, t_train, 10, 100)

In [None]:
#テスト
ret = []
for i in range(len(x_test)):
    score = m1.predict(x_test[[i]])
    ret.extend(np.argmax(score, axis=1) == np.argmax(t_test[[i]], axis=1))
print('accuracy %.4f' %(ret.count(True)/ len(ret)))

## CNN

In [None]:
class CNNUtil:
    def __init__(self, fh, fw, stride=1, pad=0):
        self.fh, self.fw, self.stride, self.pad = fh, fw, stride, pad
        self.N, self.C, self.H, self.W = None, None, None, None
        self.oh, self.ow = None, None
    
    def im2col(self, im):
        self.N, self.C, self.H, self.W = im.shape
        self.oh = (self.H + 2 * self.pad - self.fh) // self.stride + 1
        self.ow = (self.W + 2 * self.pad - self.fw) // self.stride + 1
        img = np.pad(im, [(0,0), (0,0), (self.pad, self.pad), (self.pad, self.pad)], 'constant')
        col = np.zeros((self.N, self.C, self.fh, self.fw, self.oh, self.ow))
        for h in range(self.fh):
            h_max = h + self.stride * self.oh
            for w in range(self.fw):
                w_max = w + self.stride * self.ow
                col[:, :, h, w, :, :] = img[:, :, h: h_max: self.stride, w: w_max: self.stride]
        return col.transpose(0, 4, 5, 1, 2, 3).reshape(self.N * self.oh * self.ow, -1)
    
    def col2im(self, col):
        col = col.reshape(self.N, self.oh, self.ow, self.C, self.fh, self.fw).transpose(0, 3, 4, 5, 1, 2)
        img = np.zeros((self.N, self.C, self.H + 2 * self.pad + self.stride - 1, self.W + 2 * self.pad + self.stride - 1))
        for h in range(self.fh):
            h_max = h + self.stride * self.oh
            for w in range(self.fw):
                w_max = w + self.stride * self.ow
                img[:, :, h: h_max: self.stride, w: w_max: self.stride] += col[:, :, h, w, :, :]
        return img[:, :, self.pad: self.H + self.pad, self.pad: self.W + self.pad]

In [None]:
class Convolution:
    def __init__(self, w, b, stride=1, pad=0):
        self.params = [w, b]
        self.grads = [np.zeros_like(w), np.zeros_like(b)]
        self.col, self.col_w = None, None
        _, _, FH, FW = w.shape
        self.u = CNNUtil(FH, FW, stride, pad)
    
    def forward(self, x):
        w, b = self.params
        FN, C, FH, FW = w.shape
        N, _, H, W = x.shape
        self.col = self.u.im2col(x)
        self.col_w = w.reshape(FN, -1)
        out = np.dot(self.col, self.col_w.T) + b
        return out.reshape(N, self.u.oh, self.u.ow, -1).transpose(0, 3, 1, 2)
    
    def backward(self, dout):
        w, _ = self.params
        FN, C, FH, FW = w.shape
        dout = dout.transpose(0, 2, 3, 1).reshape(-1, FN)
        dw = np.dot(self.col.T, dout)
        self.grads[0][...] = dw.transpose(1, 0).reshape(FN, C, FH, FW)
        self.grads[1][...] = np.sum(dout, axis=0)
        return  self.u.col2im(np.dot(dout, self.col_w))

In [None]:
class Pooling:
    def __init__(self, ph=2, pw=2, stride=2, pad=0):
        self.params, self.grads = [], []
        self.ph, self.pw, self.stride, self.pad = ph, pw, stride, pad
        self.argmax = None
        self.u = CNNUtil(ph, pw, stride, pad)
    
    def forward(self, x):
        N, C, H, W = x.shape
        col = self.u.im2col(x).reshape(-1, self.ph * self.pw)
        self.argmax = np.argmax(col, axis=1)
        oh = (H - self.ph) // self.stride + 1
        ow = (W - self.pw) // self.stride + 1
        return np.max(col, axis=1).reshape(N, oh, ow, C).transpose(0, 3, 1, 2)
    
    def backward(self, dout):
        dout = dout.transpose(0, 2, 3, 1)
        size = self.ph * self.pw
        dmax = np.zeros((dout.size, size))
        dmax[np.arange(self.argmax.size), self.argmax.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (size,))
        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        return self.u.col2im(dcol)

In [None]:
#データセット読み込み
import sys
sys.path.append('..')
from deep_learning_from_scratch.dataset.mnist import load_mnist
(cx_train, ct_train), (cx_test, ct_test) = load_mnist(flatten=False)

In [None]:
#学習
m2 = Model()
m2.append(Convolution(scale(1*3*3) * np.random.randn(8, 1, 3, 3), np.zeros(8)))
m2.append(Relu())
m2.append(Convolution(scale(8*3*3) * np.random.randn(8, 8, 3, 3), np.zeros(8)))
m2.append(Relu())
m2.append(Pooling())
m2.append(Convolution(scale(8*3*3) * np.random.randn(8, 8, 3, 3), np.zeros(8)))
m2.append(Relu())
m2.append(Convolution(scale(8*3*3) * np.random.randn(8, 8, 3, 3), np.zeros(8)))
m2.append(Relu())
m2.append(Pooling())
m2.append(Affine(scale(8*4*4) * np.random.randn(8*4*4, 50), np.zeros(50)))
m2.append(Relu())
#m2.append(Dropout())
m2.append(Affine(scale(50) * np.random.randn(50, 10), np.zeros(10)))
#m2.append(Dropout())
m2.append_loss(SoftmaxWithLoss())
m2.summary()
o2 = SGD(lr=0.1)
t2 = Trainer(m2, o2)
t2.fit(cx_train, ct_train, 10, 100)

In [None]:
#テスト
ret = []
for i in range(len(cx_test)):
    score = m2.predict(cx_test[[i]])
    ret.extend(np.argmax(score, axis=1) == ct_test[i])
print('accuracy %.4f' %(ret.count(True)/ len(ret)))

In [None]:
def check_shape(x, layers):
    print(x.shape)
    for layer in layers:
        x = layer.forward(x)
        print(x.shape)