In [18]:
# 必要最低限
import numpy as np
import mnist
import matplotlib.pyplot as plt
from pylab import cm

X = mnist.download_and_parse_mnist_file("train-images-idx3-ubyte.gz")
Y = mnist.download_and_parse_mnist_file("train-labels-idx1-ubyte.gz") 

#入力層のクラス
class InputLayer:
    def __init__(self, x):
        self.x = x
    def output(self):
        return self.x
    
# 畳み込み層のクラス
class ConvolutionLayer:
    def __init__(self, x, filter_mateix, bias, batch, R, s):
        self.x = x #batch * ch * dx * dy行列
        self.filter_matrix = filter_mateix #フィルター行列だが、R*Rではない。フィルター行列の集合
        self.bias = bias #バイアス
        self.batch = batch #バッチサイズ
        self.R = R #フィルターサイズ
        self.s = s #ストライド
    def padding(self):
        batch, ch, dx, dy = self.x.shape
        r = int(self.R/2)
        padding = np.zeros((batch, ch, dx + 2*r, dy + 2*r)) 
        padding[:, :, r:(dx+r), r:(dy+r)] = self.x
        return padding
    def rerange(self):
        batch, ch, dx, dy = self.x.shape
        C = self.padding()
        C = np.lib.stride_tricks.as_strided(C, shape=(batch, ch, int(dx/self.s), int(dy/self.s), self.R, self.R),\
                                            strides=(C.strides[0], C.strides[1],self.s*C.strides[2], self.s*C.strides[3], C.strides[2], C.strides[3])
                                           )
        C = C.transpose(1, 4, 5, 0, 2, 3).reshape(ch * self.R * self.R, -1)
        return C
    def output(self):
        W = self.filter_matrix
        B = self.bias
        return np.dot(W, self.rerange()) + B[:,np.newaxis]
    
# プーリング層のクラス
class PoolingLayer:
    def __init__(self, x, batch, R, s):
        self.x = x #畳み込み層からの入力
        self.batch = batch #バッチサイズ
        self.R = R #プーリング層のサイズ
        self.s = s #ストライド
        self.max_indices = None  # 最大値のインデックスを保持する変数
    def output(self):
        dx, dy = self.x.shape
        newdy = int(np.sqrt(int(dy/self.batch)))
        #これにより batch * K * dx * dy配列に変化
        tensor_x = np.array(np.array_split(self.x, dy // int(dy/self.batch), axis=1)).reshape(self.batch, dx, newdy, newdy)
        #これにpoolingを適用する
        d = self.R
        n, c, h, w = tensor_x.shape
        output_h = (h - d) // self.s + 1 #出力の高さ
        output_w = (w - d) // self.s + 1 #出力の幅
        pooled_data = tensor_x[:, :, :output_h * self.s, :output_w * self.s].reshape(n, c, output_h, d, output_w, d)
        pooled_data = pooled_data.max(axis=(3, 5))
        return pooled_data
    def max_index(self):
        dx, dy = self.x.shape
        newdy = int(np.sqrt(int(dy/self.batch)))
        #これにより batch * K * dx * dy配列に変化
        tensor_x = np.array(np.array_split(self.x, dy // int(dy/self.batch), axis=1)).reshape(self.batch, dx, newdy, newdy)
        #これにpoolingを適用する
        d = self.R
        n, c, h, w = tensor_x.shape
        output_h = (h - d) // self.s + 1 #出力の高さ
        output_w = (w - d) // self.s + 1 #出力の幅
        pooled_data = tensor_x[:, :, :output_h * self.s, :output_w * self.s].reshape(n, c, output_h, d, output_w, d)
        # マックスプーリングのマスクを作成、多分これで、インデックスを保持することができているはず、、、
        mask = (pooled_data == np.max(pooled_data, axis=(3, 5), keepdims=True))
        # mask = np.argwhere(pooled_data == np.max(pooled_data, axis=(3, 5)))
        mask = mask.reshape(n, c, output_h*d, output_w*d).astype(int)
        return mask

# 全結合層のクラス
class ConnectionLayer:
    def __init__(self, x, w, b):
        self.x = x
        self.w= w
        self.b = b
    def output(self):
        self.x = np.array(self.x)
        self.w = np.array(self.w)
        self.b = np.array(self.b)
        return np.dot(self.w.T, self.x) + self.b[:, np.newaxis]
    def dot_wx(self):
        self.x = np.array(self.x)
        self.w = np.array(self.w)
        return np.dot(self.w.T, self.x)
    def dot_b(self):
        self.b = np.array(self.b)
        return self.b[:, np.newaxis]
    def test_output(self):
        self.x = np.array(self.x)
        self.w = np.array(self.w)
        self.b = np.array(self.b)
        return np.dot(self.w.T, self.x) + self.b

#中間層のクラス(sigmoid, relu)
class MiddleLayer:
    def __init__(self, x, function):
        self.x = x
        self.function = function
    def output(self):
        return self.function(self.x)
    
# 中間層のクラス（dropout）
class Dropout:
    def __init__(self, x, rho):
        self.x = x
        self.rho = rho
    def output(self):
        rows, cols = self.x.shape
        indices = np.arange(rows)
        np.random.shuffle(indices)
        result_matrix = np.copy(self.x)
        for col in range(cols):
            selected_indices = indices[:self.rho]
            result_matrix[selected_indices, col] = 0
        return result_matrix
    
# 出力層のクラス
class OutputLayer:
    def __init__(self, x, softmax):
        self.x = x
        self.softmax = softmax
    def output(self):
        return self.softmax(self.x)

# クロスエントロピーを計算するクラス
class CrossEntropy:
    def __init__(self, x, onehot):
        self.x = x
        self.onehot = onehot
    def safelog(self):
        try:
            result = -np.log(self.x)
        except RuntimeWarning as e:
            result = np.where(self.x <= 0, 100, -np.log(self.x))
        return result
    def result(self):
        loglist = self.safelog()
        crossentropy = np.sum(self.onehot * loglist, axis=0)
        batch = loglist.shape[1]
        return np.sum(crossentropy) / batch

# softmaxとクロスエントロピー誤差の逆伝播クラス
class ErrorBackCrossEntropy:
    def __init__(self, x, onehot):
        self.x = x
        self.onehot = onehot
    def del_en_x(self):
        batch = self.x.shape[1]
        return (self.x - self.onehot) / batch
    
# 中間層の逆伝播のクラス
# シグモイド関数の逆伝播のクラス
class ErrorBackSigmoid:
    def __init__(self, y, del_en_y):
        self.y = y
        self.del_en_y = del_en_y
    def del_en_x(self):
        new_y = (1 - self.y) * self.y
        result = self.del_en_y * new_y
        return result
    
# relu関数の逆伝播のクラス
class ErrorBackReLU:
    def __init__(self, y, del_en_y):
        self.y = y
        self.del_en_y = del_en_y
    def del_en_x(self):
        new_y = np.where(self.y > 0, 1, 0)
        result = self.del_en_y * new_y
        return result
    
# dropoutの逆伝播クラス
class ErrorBackDropout:
    def __init__(self, y, del_en_y):
        self.y = y
        self.del_en_y = del_en_y
    def del_en_x(self):
        new_y = np.where(self.y == 0, 0, 1)
        result = self.del_en_y * new_y
        return result
    
#正規化層の誤差逆伝播クラス
class ErrorBackNormalize:
    def __init__(self, x, x_hat, del_en_y, gamma, beta, epsilon, mean, var, batch):
        self.x = x
        self.x_hat = x_hat
        self.del_en_y = del_en_y
        self.gamma = gamma
        self.beta = beta
        self.epsilon = epsilon
        self.mean = mean
        self.var = var
        self.batch = batch
    def del_en_x_hat(self):
        return self.del_en_y * self.gamma[:,np.newaxis]
    def del_en_var(self):
        mult = np.sum(self.del_en_x_hat()*(self.x - self.mean[:, np.newaxis]), axis=1)
        return (-1) / 2 * mult *  np.power((self.var + self.epsilon), -1.5 ) 
    def del_en_mean(self):
        e1 = np.sum(self.del_en_x_hat(), axis=1)  * (-1) / np.sqrt(self.var + self.epsilon)
        e2 = (-2) * self.del_en_var() * (np.sum(self.x, axis=1)/self.batch - self.mean)
        return e1 + e2
    def del_en_x(self):
        e1 = self.del_en_x_hat() / np.sqrt(self.var + self.epsilon)[:,np.newaxis]
        e2 = 2 / self.batch * (self.x - self.mean[:,np.newaxis]) * self.del_en_var()[:,np.newaxis]
        e3 = self.del_en_mean() / self.batch
        return e1 + e2 + e3[:,np.newaxis]
    def del_en_gamma(self):
        return np.sum(self.del_en_y * self.x, axis=1)
    def del_en_beta(self):
        return np.sum(self.del_en_y, axis=1)
    
# 全結合層の逆伝播クラス
class ErrorBackConnect:
    def __init__(self, x, del_en_y, w):
        self.x = x
        self.del_en_y = del_en_y
        self.w = w
    def del_en_x(self):
        return np.dot(self.w, self.del_en_y)
    def del_en_w(self):
        return np.dot(self.del_en_y, self.x.T)
    def del_en_b(self):
        return np.sum(self.del_en_y, axis=1)
    
# プーリング層の誤差逆伝播クラス
class ErrorBackPooling:
    def __init__(self, x, y, del_en_y, K, batch, max_index, R, R_conv):
        self.x = x #プーリング層の入力
        self.y = y #プーリング層の出力
        self.del_en_y = del_en_y
        self.K = K #フィルタ数
        self.batch = batch
        self.max_index = max_index #プーリング層で抽出された要素は1, そうでない要素は0となっている配列
        self.R = R #プーリングのサイズ
        self.R_conv = R_conv
    def reshape_del(self, m):
        self.m = m
        z, batch = self.m.shape
        dx = int(np.sqrt(int(z/self.K)))
        #これにより、batch*K*dx*dx配列に変化
        tensor_del = self.m.T.reshape(batch, self.K, dx, dx)
        return tensor_del
    def reshape_x(self, n):
        self.n = n
        dx, dy = self.n.shape
        newdy = int(np.sqrt(int(dy/self.batch)))
        #これにより batch * K * dx * dy配列に変化
        tensor_x = np.array(np.array_split(self.n, dy // int(dy/self.batch), axis=1)).reshape(self.batch, dx, newdy, newdy)
        #これにpoolingを適用する
        return tensor_x
    #こちらは、pooling層2の誤差逆伝播に限定することにする
    def del_en_x(self):
        del_en_y_reshaped = self.reshape_del(self.del_en_y)
        b, k, dx, dy =  self.reshape_x(self.x).shape
        expanded_del = del_en_y_reshaped.repeat(self.R, axis=2).repeat(self.R, axis=3)
        expanded_del = expanded_del.reshape(b, k, dx, dy)
        C = expanded_del * self.max_index
        return C.transpose(1, 0, 2, 3).reshape(k, -1)
    #こちらはpooling層1の誤差逆伝播に限定
    def del_en_x_sub(self):
        # b, k, dx, dy = self.y.shape
        b, k, h, w = self.reshape_x(self.x).shape
        # expanded_del = self.del_en_y[np.arange(self.del_en_y.shape[0]) % 9 == 4]
        # expanded_del = self.reshape_del(expanded_del)
        # expanded_del = expanded_del.repeat(self.R, axis=2).repeat(self.R, axis=3)
        # expanded_del = expanded_del.reshape(b, k, w, h)
        # dx, dy = self.x.shape
        expanded_del = col2im(self.del_en_y, self.y.shape, 3, 3, 1, 1)
        expanded_del = expanded_del.repeat(self.R, axis=2).repeat(self.R, axis=3)
        expanded_del = expanded_del.reshape(b, k, w, h)
        
        C = expanded_del * self.max_index
        return C.transpose(1, 0, 2, 3).reshape(k, -1)
    
# 畳み込み層の誤差逆伝播クラス
class ErrorBackConvolution:
    def __init__(self, x, del_en_y, w):
        self.x = x
        self.del_en_y = del_en_y
        self.w = w
    def del_en_x(self):
        return np.dot(self.w.T, self.del_en_y)
    def del_en_w(self):
        return np.dot(self.del_en_y, self.x.T)
    def del_en_b(self):
        return np.sum(self.del_en_y, axis=1)
    def del_en_x_sub(self):
        del_y = np.dot(self.w.T, self.del_en_y)
        expanded_del = del_y[np.arange(del_y.shape[0]) % 9 == 4]
        return expanded_del
    
# シグモイド関数
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# ソフトマックス関数
def softmax(x):
    x = x - np.max(x, axis = 0)
    return np.exp(np.array(x)) / np.sum(np.exp(np.array(x)), axis=0)

# リストの一致率を示す関数
def calculate_similarity(list1, list2):
    if len(list1) != len(list2):
        raise ValueError("リストの長さが異なります")

    # 一致した要素の数を数える
    match_count = sum(1 for a, b in zip(list1, list2) if a == b)

    # 一致率を計算
    similarity = (match_count / len(list1)) * 100  # パーセントで表示

    return similarity

# ReLU関数
def relu(x):
    return np.where(x > 0, x, 0)


In [None]:
#Mnistコンテスト本番ファイル（訓練用）
#畳み込み→活性化→プーリング→バッチ正規化→活性化→全結合→ソフトマックス
#畳み込み層のフィルタサイズは3*3、ストライドは1
#活性化関数には全てReLUを用いる
#プーリングはフィルタサイズ2*2、ストライドは2

np.random.seed(673)
#チャンネル数
ch = 1
#畳み込み層のフィルタサイズ
R = 3
#プーリング層のフィルタのサイズ
R_pool = 2
#フィルタの数
K = 30
#畳み込み層のストライド
s = 1
#プーリング層のストライド
s_pool = 2
#バッチ数
batch = 100
#2回目のプーリングが終了した時点でのデータサイズ
d = K * 14 * 14
#クラス数
C = 10
#フィルター行列
filter_matrix = np.array(np.random.normal(0, 1/np.sqrt(d), (K, R*R*ch)))
#バイアス項
bias_vector = np.array(np.random.normal(0, 1/np.sqrt(d), K))
#全結合層の行列
W_matrix = np.array(np.random.normal(0, 1/np.sqrt(d), (d, C)))
#全結合層のベクトル
b_vector = np.array(np.random.normal(0, 1/np.sqrt(d), C))
#正規化パラメータの初期値
gamma_init = np.ones(d)
beta_init = np.zeros(d)
mean_init = np.zeros(d)
var_init = np.zeros(d)

def mnistcontest_train(batch, W_init, b_init, gamma_init, beta_init, filter_init, bias_init, mean_init, var_init):
    #変数を初期化
    W = W_init
    b = b_init
    gamma = gamma_init
    beta = beta_init
    mean_cross_entropy = 0
    mean_mean = np.zeros(d)
    mean_var = np.zeros(d)
    filter_matrix = filter_init
    bias = bias_init
    #バッチ正規化前エポックにおけるバッチの平均と分散
    mean = mean_init
    var = var_init
    #学習率
    eta = 0.01
    #イプシロンの値
    epsilon = 1.0e-15
    #0~50000までを重複なく取得
    sampled_data =  np.random.choice(np.arange(0, 50000), size=(int(50000/batch), batch), replace=False)
    i = len(sampled_data)-1
    X_train = X[:, np.newaxis, :, :]
    while(i > 0):
        batch_index = sampled_data[i] 
        #正規化
        batch_matrix = np.array(X_train[batch_index])/255 
        #正解データ
        correct_labels = Y[batch_index] 
        #one-hot
        one_hot = np.array([np.eye(10)[y] for y in correct_labels]).T
        #入力
        input_layer = InputLayer(batch_matrix)
        #畳み込み
        convolution_layer = ConvolutionLayer(input_layer.output(), filter_matrix, bias, batch, R, s)
        #活性化1
        activate_layer1 = MiddleLayer(convolution_layer.output(), relu)
        #プーリング
        pooling_layer = PoolingLayer(activate_layer1.output(), batch, R_pool, s_pool)
        #プーリングした多次元配列を行列の形に整形
        preaffine = pooling_layer.output().reshape(batch, -1).T
        #バッチ正規化
        row_means = np.mean(preaffine, axis=1)
        mean_mean += row_means
        row_variances = np.var(preaffine, axis=1)
        mean_var += row_variances
        stdiv = np.sqrt(row_variances + epsilon)
        x_hat = (preaffine - row_means[:, np.newaxis]) / stdiv[:, np.newaxis]
        normalized_matrix = gamma[:,np.newaxis] * x_hat + beta[:,np.newaxis]
        #活性化2
        activate_layer2 = MiddleLayer(normalized_matrix, relu)
        #全結合
        connection_layer = ConnectionLayer(activate_layer2.output(), W, b)
        #ソフトマックス
        output_layer = OutputLayer(connection_layer.output(), softmax) 
        #クロスエントロピー誤差伝播
        errorback_crossentropy = ErrorBackCrossEntropy(output_layer.output(), one_hot)
        #全結合層の逆伝播
        errorback_connect = ErrorBackConnect(activate_layer2.output(), errorback_crossentropy.del_en_x(), W)
        #活性化2の誤差逆伝播
        errorback_relu2 = ErrorBackReLU(normalized_matrix, errorback_connect.del_en_x())
        #バッチ正規化層の誤差逆伝播
        errorback_normalize = ErrorBackNormalize(preaffine, x_hat, errorback_relu2.del_en_x(), 
                                                 gamma, beta, epsilon, row_means, row_variances, batch)
        #プーリングの誤差逆伝播
        errorback_pooling = ErrorBackPooling(activate_layer1.output(), pooling_layer.output(), errorback_normalize.del_en_x(), 
                                             K, batch, pooling_layer.max_index(), R_pool, R) 
        #活性化層1の誤差逆伝播
        errorback_relu1 = ErrorBackReLU(convolution_layer.output(), errorback_pooling.del_en_x())
        #畳み込み層の誤差逆伝播
        errorback_convolution = ErrorBackConvolution(convolution_layer.rerange(), errorback_relu1.del_en_x(), filter_matrix)
        #変数を更新
        W = W - eta * errorback_connect.del_en_w().T
        b = b - eta * errorback_connect.del_en_b()
        gamma = gamma - eta * errorback_normalize.del_en_gamma()
        beta = beta - eta * errorback_normalize.del_en_beta()
        filter_matrix = filter_matrix - eta * errorback_convolution.del_en_w()
        bias = bias - eta * errorback_convolution.del_en_b()
        i -= 1
        crossentropy = CrossEntropy(output_layer.output(), one_hot)
        mean_cross_entropy += crossentropy.result()
        # print(crossentropy.result())
        if(i==0):
            test_mean = mean_mean / len(sampled_data)
            test_var = mean_var / len(sampled_data)
            # test_mean = (test_mean + mean)/2
            # test_var = (test_var + var)/2
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/W", W)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/b", b)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/gamma", gamma)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/beta", beta)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/test_mean", test_mean)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/test_var", test_var)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/filter_matrix", filter_matrix)
            np.save("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/bias", bias)
            print("平均のクロスエントロピー"+str(mean_cross_entropy/len(sampled_data)))
            print("正解率:" + str(calculate_similarity(np.argmax(output_layer.output(), axis=0).tolist(), correct_labels)))
            print("テスト開始")
            test_matrix = X_train[50000:]
            testbatch, k, dx, dy = test_matrix.shape
            #正規化
            test_matrix = np.array(test_matrix) / 255
            answer_labels = Y[50000:]
            #入力層
            test_input_layer = InputLayer(test_matrix)
            #畳み込み
            test_convolution_layer = ConvolutionLayer(test_input_layer.output(), filter_matrix, bias, testbatch, R, s)
            #活性化1
            test_activate_layer1 = MiddleLayer(test_convolution_layer.output(), relu)
            #プーリング
            test_pooling_layer = PoolingLayer(test_activate_layer1.output(), testbatch, R_pool, s_pool)
            #行列に整形、正規化
            test_preaffine = test_pooling_layer.output().reshape(testbatch, -1).T
            e1 = (gamma / np.sqrt(test_var + epsilon))[:,np.newaxis] * test_preaffine
            e2 = beta - gamma * test_mean / np.sqrt(test_var + epsilon)
            normalized_output = e1 + e2[:,np.newaxis]
            #活性化2
            test_activate_layer2 = MiddleLayer(normalized_output, relu)
            #全結合
            test_connection_layer = ConnectionLayer(test_activate_layer2.output(), W, b)
            #ソフトマックス
            output_layer_test = OutputLayer(test_connection_layer.output(), softmax)
            print("テスト正解率:" + str(calculate_similarity(np.argmax(output_layer_test.output(), axis=0).tolist(), answer_labels)))
            

            
mnistcontest_train(100, W_matrix, b_vector, gamma_init, beta_init, filter_matrix, bias_vector, mean_init, var_init)
# j = 10
# while(j > 0):
#     W = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/W.npy")
#     b = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/b.npy")
#     gamma = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/gamma.npy")
#     beta = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/beta.npy")
#     filter_init = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/filter_matrix.npy")
#     bias = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/bias.npy")
#     mean = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/test_mean.npy")
#     var = np.load("/Users/mst923/OneDrive - Kyoto University/3年後期/計算機科学実験4/画像処理/mnist_test/test_var.npy")
#     mnistcontest_train(100, W, b, gamma, beta, filter_init, bias, mean, var)
#     j -= 1
#     print("epoch残り"+str(j))

平均のクロスエントロピー1.0865384597553776
正解率:13.0
テスト開始


In [None]:
#Mnistコンテスト本番ファイル（テスト用）
