# Simple Neural Networks

## 概要
- Jupyter notebook上で一望できるくらい小さなニューラルネット(すぐ肥大化しそうな予感)
- synthetic gradients試してみようと思って書きはじめた。勉強にもなるし
- ReLU, Linear, Conv2D, SoftmaxCrossEntropyをひとまず実装(バグあるかも)

## 注意
- Conv2Dは縁の処理をサボっているのでちゃんとしたフレームワークとは挙動が違うかも
- あんまり速くない
- Python2

## TODO
- BatchNormalizationの実装
- weightのinitializeをもうちょっと丁寧に
- gpu対応(私のPCにはNvidia GPUがないのでOpenCLを使う？)
- RNNの記述法を考える
- optimizerにmomentumを導入

In [1]:
import numpy as np
import math

## Layer

In [128]:
class ReLU(object):

    def __init__(self):
        self.layer_type = 'activation'
        
    def forward(self, x):
        self.x = x
        return np.maximum(x, 0, dtype=x.dtype)
    
    def backward(self, gy):
        return gy * (self.x > 0)

class Linear(object):
    
    def __init__(self, inputs, outputs):
        self.layer_type = 'linear'
        self.W = np.random.uniform(-1/math.sqrt(inputs), 1/math.sqrt(inputs), (outputs, inputs)).astype('f')
        self.b = np.zeros((outputs), dtype=np.float32)
        
    def forward(self, x):
        self.x = x
        x = x.reshape(x.shape[0], -1)
        y = x.dot(self.W.T) + self.b
        return y
    
    def backward(self, gy):
        x = self.x.reshape(self.x.shape[0], -1)
        gx = gy.dot(self.W).reshape(self.x.shape)
        self.gW = gy.T.dot(x)
        self.gb = gy.sum(0)
        return gx.reshape(self.x.shape)
    
class Convolution2D(object):
    
    def __init__(self, in_ch, out_ch, k, stride=1, pad=1):
        self.layer_type = 'convolution'
        self.ksize = k
        self.stride = stride
        self.pad = pad
        self.W = np.random.uniform(-1/math.sqrt(k*k*in_ch), 1/math.sqrt(k*k*in_ch), (out_ch, in_ch, k, k)).astype('f')
        self.b = np.zeros((out_ch), dtype=np.float32)
        
    def forward(self, x):
        self.x = x
        b, ch, h, w = x.shape
        p = self.pad
        k = self.ksize
        s = self.stride
        
        #padding input image
        _x = np.zeros((b, ch, (h + p*2), (w + p*2)), dtype=np.float32)
        _x[:, :, p:p+h, p:p+w] = x
        
        #im2col
        self.col = np.zeros((b, ch, k, k, ((h + p*2 - k)//s + 1), ((w + p*2 - k)//s + 1)), dtype=np.float32)
        for i in range(0, h + p*2 - k + 1, s):
            for j in range(0, w + p*2 - k + 1, s):
                self.col[:, :, :, :, i/s, j/s] += _x[:, :, i:i+k, j:j+k]
        
        #convolution
        y = np.tensordot(self.col, self.W, ((1, 2, 3), (1, 2, 3))).astype(x.dtype, copy=False)
        y += self.b
        return np.rollaxis(y, 3, 1)
    
    def backward(self, gy):
        self.gW = np.tensordot(gy, self.col, ((0, 2, 3), (0, 4, 5))).astype(self.W.dtype, copy=False)
        self.gb = gy.sum(axis=(0, 2, 3))
        gcol = np.tensordot(self.W, gy, (0, 1)).astype(self.x.dtype, copy=False)
        gcol = np.rollaxis(gcol, 3)
        
        #col2im
        b, ch, h, w = self.x.shape
        p = self.pad
        k = self.ksize
        s = self.stride
        gx = np.zeros((b, ch, (h + p*2), (w + p*2)), dtype=np.float32)
        for i in range(0, h + p*2 - k + 1, s):
            for j in range(0, w + p*2 - k + 1, s):
                 gx[:, :, i:i+k, j:j+k] += gcol[:, :, :, :, i/s, j/s]
        return gx[:, :, p:p+h, p:p+w]
    
class MaxPooling2D(object):
    
    def __init__(self, k, stride=2, pad=0):
        self.layer_type = 'pooling'
        self.ksize = k
        self.stride = stride
        self.pad = pad
        
    def forward(self, x):
        self.x = x
        b, ch, h, w = x.shape
        p = self.pad
        k = self.ksize
        s = self.stride
        #padding input image
        pH = h + p*2
        pW = w + p*2
        #cover all
        pH += (s - (pH-k)%s) if (pH-k)%s != 0 else 0
        pW += (s - (pW-k)%s) if (pW-k)%s != 0 else 0
        self.pH = pH
        self.pW = pW
        _x = np.zeros((b, ch, pH, pW), dtype=np.float32)
        _x[:, :, p:p+h, p:p+w] = x
        #im2col
        self.col = np.zeros((b, ch, k*k, ((pH - k)//s + 1), ((pW - k)//s + 1)), dtype=np.float32)
        for i in range(0, pH - k + 1, s):
            for j in range(0, pW - k + 1, s):
                self.col[:, :, :, i/s, j/s] += _x[:, :, i:i+k, j:j+k].reshape(b, ch, k*k)
        self.ind = np.argmax(self.col, axis=2) #hold max index
        return np.max(self.col, axis=2)
    
    def backward(self, gy):
        b, ch, h, w = self.x.shape
        p = self.pad
        k = self.ksize
        s = self.stride
        #padding input image
        pH = self.pH
        pW = self.pW
        #assign with pooling_index
        gcol = np.zeros_like(self.col)
        gcol = assign_with_ndarray_index(gcol, self.ind, 2, gy)
        #col2im
        gx = np.zeros((b, ch, pH, pW), dtype=np.float32)
        for i in range(0, pH - k + 1, s):
            for j in range(0, pW - k + 1, s):
                gx[:, :, i:i+k, j:j+k] += gcol[:, :, :, i/s, j/s].reshape(b, ch, k, k)
        return gx[:, :, p:p+h, p:p+w]

def assign_with_ndarray_index(ary, index, axis, inputs):
    #ndarray_indexを使ってarrayに値を代入する。max_pooling用
    shp = ary.shape
    num_axis = shp[axis]
    shp1 = reduce(lambda x,y:x*y, shp[:axis+1])/num_axis
    shp2 = reduce(lambda x,y:x*y, shp[axis:])/num_axis
    f_ary = ary.flatten()
    f_ind = index.flatten()
    f_inputs = inputs.flatten()
    a_size = len(f_ary)
    f_ary[f_ind * shp2 + np.rollaxis(np.arange(a_size).reshape(shp), axis)[0].flatten()] = f_inputs
    return f_ary.reshape(shp)

def softmax(x):
    x -= x.max(axis=1, keepdims=True)
    exp_x = np.exp(x)
    return exp_x/np.sum(exp_x, axis=1).reshape(-1, 1)

def softmax_cross_entropy(x, t):
    log_y = np.log(softmax(x))
    log_p = log_y[range(len(t)), t.ravel()] #Labelに対応する値が1になる→log(y)=0．不正解Labelに対して期待される確率は0であるからそれらは無視できる．
    loss = - log_p.sum() / len(t)
    
    gx = np.exp(log_y) #というかこれはsoftmax(x)か。
    gx[range(len(t)), t.ravel()] -= 1
    gx *= loss
    
    return loss, gx

def accuracy(x, t):
    t_or_f = (np.argmax(x, axis=1)==t).astype('f')
    return np.sum(t_or_f)/len(t_or_f)

## function

In [95]:
def forward(nnet, x):
    for layer in nnet:
#        print x.shape
        x = layer.forward(x)
    return x

def backward(nnet, gy):
    for layer in nnet[::-1]:
        gy = layer.backward(gy)
        
def update(nnet):
    lr = 0.001
    for layer in nnet:
        if layer.layer_type not in ['activation', 'pooling']:
            layer.W -= layer.gW * lr
            layer.b -= layer.gb * lr

## Training Test
- mnistで試してみる

In [80]:
%matplotlib inline
import pylab as plt

In [81]:
# chainerのutilityをお借りしてmnistを読み込む
import chainer
train, test = chainer.datasets.get_mnist()

X = np.zeros((60000, 1, 28, 28), dtype=np.float32)
Y = np.zeros(60000, dtype=np.int32)
X_test = np.zeros((10000, 1, 28, 28), dtype=np.float32)
Y_test = np.zeros(10000, dtype=np.int32)
for i in range(60000):
    X[i] += train[i][0].reshape(1, 28, 28)
    Y[i] = train[i][1]
for i in range(10000):
    X_test[i] += test[i][0].reshape(1, 28, 28)
    Y_test[i] = test[i][1]
    
#[-1, 1]に正規化
X -= 0.5; X_test -= 0.5
X *= 2; X_test *= 2

In [134]:
#モデルの準備

nnet = [Convolution2D(1, 32, 3, stride=1, pad=1),
        MaxPooling2D(2, 2, 0),
        ReLU(),
        Convolution2D(32, 64, 3, stride=1, pad=1),
        MaxPooling2D(2, 2, 0),
        ReLU(),
        Convolution2D(64, 128, 3, stride=1, pad=1),
        MaxPooling2D(2, 2, 0),
        ReLU(),
        Convolution2D(128, 256, 3, stride=1, pad=1),
        ReLU(),
        Linear(256*4*4, 512),
        ReLU(),
        Linear(512, 10)]

In [None]:
epoch = 100
N = len(X)
N_test = len(X_test)
batchsize = 100
step = 100
n_imgs_trained = 0

for e in range(epoch):
    print 'epoch:', e
    sum_loss = 0.
    sum_acc = 0.
    perm = np.random.permutation(N)
    
    #train
    for i in range(0, N, batchsize):
        y = forward(nnet, X[perm[i:i+batchsize]])
        loss, gy = softmax_cross_entropy(y, Y[perm[i:i+batchsize]])
        acc = accuracy(y, Y[perm[i:i+batchsize]])
        backward(nnet, gy)
        update(nnet)
        
        n_imgs_trained += batchsize
        sum_loss += loss
        sum_acc += acc
        
        if i%(batchsize*step)==0 and i!=0:
            print 'type: train, sample: {}, loss: {}, accuracy: {}'.format(n_imgs_trained, sum_loss/step, sum_acc/step)
            sum_loss = 0.
            sum_acc = 0.
    #val
    sum_loss = 0.
    sum_acc = 0.
    for i in range(0, N_test, batchsize):
        y = forward(nnet, X_test[i:i+batchsize])
        loss, gy = softmax_cross_entropy(y, Y_test[i:i+batchsize])
        acc = accuracy(y, Y_test[i:i+batchsize])
        sum_loss += loss
        sum_acc += acc
    print 'type: val, sample: {}, loss: {}, accuracy: {}'.format(n_imgs_trained, sum_loss/(N_test/batchsize), sum_acc/(N_test/batchsize))

epoch: 0


In [129]:
mp = MaxPooling2D(2, 2, 0)

In [130]:
test = np.random.randint(0, 3, (1, 1, 5, 5)).astype('f')

In [131]:
test

array([[[[ 2.,  2.,  0.,  2.,  0.],
         [ 0.,  1.,  0.,  1.,  2.],
         [ 2.,  1.,  1.,  2.,  0.],
         [ 2.,  1.,  2.,  2.,  1.],
         [ 2.,  2.,  1.,  1.,  1.]]]], dtype=float32)

In [132]:
mp.backward(mp.forward(test))

array([[[[ 2.,  0.,  0.,  2.,  0.],
         [ 0.,  0.,  0.,  0.,  2.],
         [ 2.,  0.,  0.,  2.,  0.],
         [ 0.,  0.,  0.,  0.,  1.],
         [ 2.,  0.,  1.,  0.,  1.]]]], dtype=float32)

In [133]:
mp.forward(test)

array([[[[ 2.,  2.,  2.],
         [ 2.,  2.,  1.],
         [ 2.,  1.,  1.]]]], dtype=float32)