In [50]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [51]:
A = np.random.randint(0,10,(3,4,3,4))
# solution by passing a tuple of axes (introduced in numpy 1.7.0)
sum = A.sum(axis=(-2,-1))
print(sum)
# solution by flattening the last two dimensions into one
# (useful for functions that don't accept tuples for axis argument)
sum = A.reshape(A.shape[:-2] + (-1,)).sum(axis=-1)
print(sum)

[[60 51 51 63]
 [51 58 53 46]
 [51 51 42 67]]
[[60 51 51 63]
 [51 58 53 46]
 [51 51 42 67]]


In [52]:
A.shape

(3, 4, 3, 4)

In [53]:
A.sum(axis=(0, 1))

array([[56, 46, 45, 67],
       [53, 53, 53, 41],
       [54, 73, 54, 49]])

In [54]:
A[0, 0, :, :]

array([[6, 3, 0, 3],
       [6, 9, 4, 7],
       [4, 5, 8, 5]])

In [55]:
A

array([[[[6, 3, 0, 3],
         [6, 9, 4, 7],
         [4, 5, 8, 5]],

        [[2, 0, 3, 7],
         [8, 7, 3, 2],
         [8, 3, 3, 5]],

        [[1, 9, 4, 4],
         [5, 6, 1, 2],
         [1, 7, 7, 4]],

        [[9, 1, 3, 9],
         [2, 1, 9, 2],
         [9, 9, 4, 5]]],


       [[[2, 4, 1, 2],
         [6, 5, 6, 1],
         [9, 9, 0, 6]],

        [[8, 4, 8, 1],
         [8, 1, 6, 8],
         [8, 1, 5, 0]],

        [[6, 9, 0, 7],
         [0, 5, 6, 2],
         [0, 7, 6, 5]],

        [[8, 1, 1, 9],
         [0, 4, 2, 4],
         [0, 4, 8, 5]]],


       [[[3, 2, 8, 7],
         [2, 2, 7, 4],
         [5, 6, 4, 1]],

        [[4, 1, 5, 8],
         [8, 2, 0, 5],
         [4, 5, 2, 7]],

        [[0, 5, 7, 1],
         [2, 4, 3, 2],
         [4, 8, 0, 6]],

        [[7, 7, 5, 9],
         [6, 7, 6, 2],
         [2, 9, 7, 0]]]])

In [77]:
a = np.arange(1, 51).reshape(2, 5, 5)
a[:, 2:3, 2:3] = 0
a

array([[[ 1,  2,  3,  4,  5],
        [ 6,  7,  8,  9, 10],
        [11, 12,  0, 14, 15],
        [16, 17, 18, 19, 20],
        [21, 22, 23, 24, 25]],

       [[26, 27, 28, 29, 30],
        [31, 32, 33, 34, 35],
        [36, 37,  0, 39, 40],
        [41, 42, 43, 44, 45],
        [46, 47, 48, 49, 50]]])

In [91]:
b = np.arange(1, 11).reshape(-1, 2)
b.sum(axis=0)

array([25, 30])

In [97]:
a.sum(axis=2)

array([[ 15,  40,  52,  90, 115],
       [140, 165, 152, 215, 240]])

In [96]:
a.sum(axis=(0, 2))

array([155, 205, 204, 305, 355])

In [2]:
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
    """

    Parameters
    ----------
    input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ
    filter_h : フィルターの高さ
    filter_w : フィルターの幅
    stride : ストライド
    pad : パディング

    Returns
    -------
    col : 2次元配列
    """
    N, C, H, W = input_data.shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1

    img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]

    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
    return col


def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
    """

    Parameters
    ----------
    col :
    input_shape : 入力データの形状（例：(10, 1, 28, 28)）
    filter_h :
    filter_w
    stride
    pad

    Returns
    -------

    """
    N, C, H, W = input_shape
    out_h = (H + 2*pad - filter_h)//stride + 1
    out_w = (W + 2*pad - filter_w)//stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride*out_h
        for x in range(filter_w):
            x_max = x + stride*out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]

    return img[:, :, pad:H + pad, pad:W + pad]

In [32]:
class Conv1d:
    def __init__(self, W, b, stride=1, pad=0):
        self.W = W
        self.b = b.astype('float64')
        self.stride = stride
        self.pad = pad
        
        # 中間データ（backward時に使用）
        self.x = None   
        self.col = None
        self.col_W = None
        
        # 重み・バイアスパラメータの勾配
        self.dW = None
        self.db = None

    def forward(self, x):
        if self.W.ndim < 4:
            FN, C, FH = self.W.shape
            self.W = self.W.reshape(FN, C, 1, FH)
        FN, C, FH, FW = self.W.shape
        if x.ndim < 4:
            C, H = x.shape
            x = x.reshape(1, C, 1, H)
        N, C, H, W = x.shape
        self.N, self.C, self.H, self.w = N, C, H, W
        
        out_h = 1 + int((H + 2*self.pad - FH) / self.stride)
        out_w = 1 + int((W + 2*self.pad - FW) / self.stride)

        col = im2col(x, FH, FW, self.stride, self.pad)
        col_W = self.W.reshape(FN, -1).T
        
        out = np.dot(col, col_W) + self.b
        out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)

        self.x = x
        self.col = col
        self.col_W = col_W

        return out
    
    def backward(self, dout):
        FN, C, FH, FW = self.W.shape
        dout = dout.reshape(self.N, self.C, self.H, -1)
        dout = dout.transpose(0,2,3,1).reshape(-1, FN)

        self.db = np.sum(dout, axis=0)
        self.dW = np.dot(self.col.T, dout)
        self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

        dcol = np.dot(dout, self.col_W.T)
        dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)

        self = self.update(self)
        
        return dx
    
    def update(self, layer):
        layer.W -= 1 * layer.dW
        layer.b -= 1 * layer.db
        
        return layer

In [4]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]) # shape(2, 4)で、（入力チャンネル数、特徴量数）である。
w = np.ones((3, 2, 3)) # 例の簡略化のため全て1とする。(出力チャンネル数、入力チャンネル数、フィルタサイズ)である。
b = np.array([1, 2, 3]) # （出力チャンネル数）

In [5]:
tes = Convolution(w, b)
tes.forward(x)

array([[[[16., 22.]],

        [[17., 23.]],

        [[18., 24.]]]])

In [6]:
tes.backward(tes.forward(x))

array([[[[ 61., 120., 120.,  59.]],

        [[ 61., 120., 120.,  59.]]]])

In [7]:
class FC:
    """
    ノード数n_nodes1からn_nodes2への全結合層
    Parameters
    ----------
    n_nodes1 : int
      前の層のノード数
    n_nodes2 : int
      後の層のノード数
    initializer : 初期化方法のインスタンス
    optimizer : 最適化手法のインスタンス
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        # 初期化
        # initializerのメソッドを使い、self.Wとself.Bを初期化する
        self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        self.B = initializer.B(self.n_nodes2)
        # 最適化手法
        self.optimizer = optimizer
        # AdaGradの初期値
        self.HW = 0
        self.HB = 0
        
    def forward(self, X):
        """
        フォワード
        Parameters
        ----------
        X : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        Returns
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes2)
            出力
        """ 
        if X.ndim >= 4:
            fn, c, fh, hw = X.shape
            X = X.reshape(c, hw)
        self.Z = X
        print(X.shape)
        print(self.W.shape)
        self.A = X @ self.W + self.B
        return self.A
    
    def backward(self, dA):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        Returns
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        self.dB = np.sum(dA, axis=0)
        self.dW = self.Z.T @ dA
        self.dZ = dA @ self.W.T
        self = self.optimizer.update(self)
        return self.dZ

In [8]:
class HeInitializer:
    """
    Heによる初期化
    """
    def __init__(self, sigma):
        _ = sigma
        
    def W(self, n_nodes1, n_nodes2):
        self.sigma = np.sqrt(2 / n_nodes1)
        W = self.sigma * np.random.randn(n_nodes1, n_nodes2)
        return W
    
    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B

In [9]:
class AdaGrad:
    '''
    AdaGradによる最適化
    '''
    def __init__(self, lr):
        self.lr = lr # 学習率
    
    def update(self, layer):
        """
        ある層の重みやバイアスの更新
        Parameters
        ----------
        layer : 更新前の層のインスタンス
        """
        layer.HW += layer.dW * layer.dW
        layer.HB += layer.dB * layer.dB
        delta = 1e-7 # 0で除算してしまうのを防ぐための値
        layer.W -= self.lr * layer.dW / (np.sqrt(layer.HW) + delta) / len(layer.Z)
        layer.B -= self.lr * layer.dB / (np.sqrt(layer.HB) + delta) / len(layer.Z)
        return layer

In [10]:
class Relu:
    '''
    ReLU関数
    '''
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
            出力
        """ 
        self.A = A
        Z = np.maximum(0, A)
        return Z
    
    def backward(self, dZ):
        """
        バックワード
        Parameters
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        ----------
        dZ : 次の形のndarray, shape (batch_size, n_nodes1)
            前に流す勾配
        """
        A = self.A.reshape(dZ.shape)
        dA = dZ * np.where(A > 0, 1, 0)
        return dA

In [11]:
class SoftmaxWithLoss:
    '''
    ソフトマックス関数と交差エントロピー誤差
    '''
    def forward(self, A):
        """
        フォワード
        Parameters
        ----------
        A : 次の形のndarray, shape (batch_size, n_nodes1)
            入力
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
            出力
        """ 
        Z = np.exp(A) / np.sum(np.exp(A), axis=1).reshape(-1, 1)
        return Z
        
    def backward(self, Z, y):
        """
        バックワード
        Parameters
        ----------
        Z : 次の形のndarray, shape (batch_size, n_nodes1)
        y : ラベル  ndarray, shape (batch_size,)
            入力
        ----------
        dA : 次の形のndarray, shape (batch_size, n_nodes2)
            後ろから流れてきた勾配
        loss : 交差エントロピー誤差
        """
        dA = Z - y
        loss = - np.sum(y * np.log(Z)) / len(y)
        return dA, loss

In [22]:
class Flatten:
    
    def forward(self, x):
        N, C, H, W = x.shape
        out = x.reshape(N, -1)
        
        self.x = x
        
        return out
    
    def backward(self, dout):
        dx = dout.reshape(self.x.shape)
        
        return dx

In [23]:
class GetMiniBatch:
    """
    ミニバッチを取得するイテレータ

    Parameters
    ----------
    X : 次の形のndarray, shape (n_samples, n_features)
      訓練データ
    y : 次の形のndarray, shape (n_samples, 1)
      正解値
    batch_size : int
      バッチサイズ
    seed : int
      NumPyの乱数のシード
    """
    def __init__(self, X, y, batch_size = 20, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]        
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [34]:
class Scratch1dCNNClassifier():
    """
    3層からなるニューラルネットワーク分類器

    Parameters
    ----------

    Attributes
    ----------
    """
    def __init__(self, FN=20, C=1, F=4, epoch=1, optimizer=AdaGrad, initializer=HeInitializer, activater=Relu, verbose=False,):
        self.verbose = verbose
        self.batch_size = 20 # バッチサイズ
        self.n_features = 784 # 特徴量の数
        self.n_nodes1 = 400 # 1層目のノード数
        self.n_nodes2 = 200 # 2層目のノード数
        self.n_output = 10 # 出力のクラス数（3層目のノード数）
        self.sigma = 0.02 # ガウス分布の標準偏差
        self.lr = 0.5 # 学習率
        self.epoch = epoch # エポック数
        self.optimizer = optimizer # 最適化手法
        self.initializer = initializer # 初期化方法
        self.activater = activater # 活性化関数
        self.FN = FN
        self.C = C
        self.F = F
    
    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証データの正解値
        """
        self.loss_train = [] # 学習データのloss記録用
        self.loss_val = [] # 検証データのloss記録用
        
        # 最適化手法を選択
        optimizer = self.optimizer(self.lr)
        
        # 初期化と活性化関数を定義
        w = self.sigma * np.random.randn(self.FN, self.C, self.F)
        b = self.sigma * np.random.randn(self.FN,)
        self.cv = Convolution(w, b)
        self.activation_cv = self.activater()
        self.FC = FC(781, self.n_output, self.initializer(self.sigma), optimizer)
        self.activation_fc = SoftmaxWithLoss()
        
        # 学習
        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size, seed=i)
            for mini_X, mini_y in get_mini_batch:

                # フォワード
                print(mini_X.shape)
                A1 = self.cv.forward(mini_X)
                print(A1.shape)
                Z1 = self.activation_cv.forward(A1)
                print(Z1.shape)
                A2 = self.FC.forward(Z1)
                print(A2.shape)
                Z2 = self.activation_fc.forward(A2)
                print(Z2.shape)

                # バックワード
                dA2, loss = self.activation_fc.backward(Z2, mini_y) # 交差エントロピー誤差とソフトマックスを合わせている
                dZ2 = self.FC.backward(dA2)
                dA1 = self.activation_cv.backward(dZ2)
                dZ1 = self.cv.backward(dA1)
            
#             # エポックごとに交差エントロピー誤差を記録
#             if self.verbose:
#                 A1 = self.FC1.forward(X)
#                 Z1 = self.activation1.forward(A1)
#                 A2 = self.FC2.forward(Z1)
#                 Z2 = self.activation2.forward(A2)
#                 A3 = self.FC3.forward(Z2)
#                 Z3 = self.activation3.forward(A3)            
#                 self.loss_train.append(self.activation3.backward(Z3, y)[1])
                
#                 if X_val is not None:
#                     A1 = self.FC1.forward(X_val)
#                     Z1 = self.activation1.forward(A1)
#                     A2 = self.FC2.forward(Z1)
#                     Z2 = self.activation2.forward(A2)
#                     A3 = self.FC3.forward(Z2)
#                     Z3 = self.activation3.forward(A3)            
#                     self.loss_val.append(self.activation3.backward(Z3, y_val)[1])
    
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            サンプル

        Returns
        -------
            次の形のndarray, shape (n_samples, 1)
            推定結果
        """
        A1 = self.cv.forward(X)
        Z1 = self.activation_cv.forward(A1)
        A2 = self.FC.forward(Z1)
        Z2 = self.activation_fc.forward(A2)
        return np.argmax(Z2, axis=1)

In [14]:
# データセット読み込み
from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# 標準化
X_train = X_train.reshape(-1, 784)
X_test = X_test.reshape(-1, 784)

# 前処理
X_train = X_train.astype(np.float)
X_test = X_test.astype(np.float)
X_train /= 255
X_test /= 255

# 分割
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

# ワンホットエンコーディング
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_train_one_hot = enc.fit_transform(y_train[:, np.newaxis])
y_test_one_hot = enc.transform(y_val[:, np.newaxis])

Using TensorFlow backend.


In [29]:
class Scratch1dCNNClassifier():
    """
    3層からなるニューラルネットワーク分類器

    Parameters
    ----------

    Attributes
    ----------
    """
    def __init__(self, epoch=1, optimizer=AdaGrad, initializer=HeInitializer, activater=Relu, verbose=False,):
        self.verbose = verbose
        self.batch_size = 20 # バッチサイズ
        self.n_output = 10 # 出力のクラス数（3層目のノード数）
        self.sigma = 0.02 # ガウス分布の標準偏差
        self.lr = 0.1 # 学習率
        self.epoch = epoch # エポック数
        self.optimizer = optimizer # 最適化手法
        self.initializer = initializer # 初期化方法
        self.activater = activater # 活性化関数
        self.FN = 3
        self.C = 1
        self.FH = 3
        self.FW = 3
        self.F = 3
        self.pool_h = 2
        self.pool_w = 2
        self.pad = 0
        self.stride = 1
    
    def fit(self, X, y, X_val=None, y_val=None):
        """
        ニューラルネットワーク分類器を学習する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            訓練データの特徴量
        y : 次の形のndarray, shape (n_samples, )
            訓練データの正解値
        X_val : 次の形のndarray, shape (n_samples, n_features)
            検証データの特徴量
        y_val : 次の形のndarray, shape (n_samples, )
            検証データの正解値
        """
        self.loss_train = [] # 学習データのloss記録用
        self.loss_val = [] # 検証データのloss記録用
        
        # 全結合層へ流れるnode数を取得
#         out_h, out_w = self.out_size(28, 28, self.pad, self.FH, self.FW, self.stride)
#         out_h, out_w = self.out_size(out_h, out_w, self.pad, self.pool_h, self.pool_w, self.pool_h)
#         fc_nodes = self.FN * out_h * out_w
        
        # 最適化手法を選択
        optimizer = self.optimizer(self.lr)
        
        # 重みとバイアスの初期値
        w = self.sigma * np.random.randn(self.FN, self.C, self.FH, self.FW)
        b = self.sigma * np.random.randn(self.FN,)
        
        # インスタンス化
        self.cv = Conv1d(w, b)
        self.activation_cv = self.activater()
#         self.pl = MaxPool2D(self.pool_h, self.pool_w)
        self.fl = Flatten()
        self.FC = FC(781, self.n_output, self.initializer(self.sigma), optimizer)
        self.activation_fc = SoftmaxWithLoss()
        
        # 学習
        for i in range(self.epoch):
            get_mini_batch = GetMiniBatch(X, y, batch_size=self.batch_size, seed=i)
            for mini_X, mini_y in get_mini_batch:

                # フォワード
                A1 = self.cv.forward(mini_X)
                Z1 = self.activation_cv.forward(A1)
#                 P1 = self.pl.forward(Z1)
                F1 = self.fl.forward(Z1)
                A2 = self.FC.forward(F1)
                Z2 = self.activation_fc.forward(A2)

                # バックワード
                dA2, loss = self.activation_fc.backward(Z2, mini_y) # 交差エントロピー誤差とソフトマックスを合わせている
                dZ2 = self.FC.backward(dA2)
                dF1 = self.fl.backward(dZ2)
#                 dP1 = self.pl.backward(dF1)
                dA1 = self.activation_cv.backward(dF1)
                dZ1 = self.cv.backward(dA1)
            
            # エポックごとに交差エントロピー誤差を記録
#             if self.verbose:
#                 A1 = self.cv.forward(X)
#                 Z1 = self.activation_cv.forward(A1)
#                 P1 = self.pl.forward(Z1)
#                 F1 = self.fl.forward(P1)
#                 A2 = self.FC.forward(F1)
#                 Z2 = self.activation_fc.forward(A2)  
#                 self.loss_train.append(self.activation_fc.backward(Z2, y)[1])
                
#                 if X_val is not None:
#                     A1 = self.cv.forward(X_val)
#                     Z1 = self.activation_cv.forward(A1)
#                     P1 = self.pl.forward(Z1)
#                     F1 = self.fl.forward(P1)
#                     A2 = self.FC.forward(F1)
#                     Z2 = self.activation_fc.forward(A2)         
#                     self.loss_val.append(self.activation_fc.backward(Z2, y_val)[1])
                    
    def out_size(self, H, W, P, FH, FW, S):
        out_h = (H + 2 * P - FH) // S + 1
        out_w = (W + 2 * P - FW) // S + 1
        return out_h, out_w
    
    def predict(self, X):
        """
        ニューラルネットワーク分類器を使い推定する。

        Parameters
        ----------
        X : 次の形のndarray, shape (n_samples, n_features)
            サンプル

        Returns
        -------
            次の形のndarray, shape (n_samples, 1)
            推定結果
        """
        A1 = self.cv.forward(X)
        Z1 = self.activation_cv.forward(A1)
        P1 = self.pl.forward(Z1)
        F1 = self.fl.forward(P1)
        A2 = self.FC.forward(F1)
        Z2 = self.activation_fc.forward(A2)
        return np.argmax(Z2, axis=1)

In [35]:
tes = Scratch1dCNNClassifier()
tes.fit(X_train, y_train_one_hot)

(20, 784)


ValueError: shapes (781,80) and (4,20) not aligned: 80 (dim 1) != 4 (dim 0)

In [None]:
tes.predict(X_val)

In [None]:
X_val.shape