In [1]:
import numpy
import os
import matplotlib.pyplot as plt
import urllib.request
%matplotlib inline

In [2]:
# neural network class definition
class neuralNetwork:
    
    # initialise the neural network
    def __init__(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
        # 各層のノード数を設定
        self.nodes_i = input_nodes  # 入力層のノード数
        self.nodes_h = hidden_nodes # 隠れ層のノード数
        self.nodes_o = output_nodes # 出力層のノード数
        
        # 各層間の重みの設定
        self.w_ih = numpy.random.normal(0.0, pow(self.nodes_i, -0.5), (self.nodes_h, self.nodes_i)) # 隠れ層のノード数　×　入力層のノード数　の重みの行列を宣言
        self.w_ho = numpy.random.normal(0.0, pow(self.nodes_h, -0.5), (self.nodes_o, self.nodes_h)) # 出力層のノード数　×　隠れ層のノード数　の重みの行列を宣言
        
        # learning rateの設定
        self.lr = learning_rate
        
        # 活性化関数
        self.sigmoid   = lambda x: 1 / (1 + numpy.exp(-x)) # シグモイド関数　　　sigmoid()  = 1 / (1 + exp(-1))　
        self.sigmoid_d = lambda y: y * (1 - y)             # シグモイド関数微分　sigmoid'() = sigmoid(x)(1 - sigmoid(x))
        
        pass
    
    def set_weight(self, w_ih, w_ho):
        if (
            w_ih.shape == (self.nodes_h, self.nodes_i) and
            w_ho.shape == (self.nodes_o, self.nodes_h)
        ):
            self.w_ih = w_ih
            self.w_ho = w_ho
        else:
            print("重み行列の形がモデルのものと一致しません")
        

    def forward_propagation(self, inputs_list):
        # 入力を 784 × 1 の行列に変換
        inputs = numpy.array(inputs_list, ndmin=2).T

        # y = w・x 
        # z = f(y)
        # x:入力, y:積和演算結果, z:出力 
        
        # 隠れ層
        self.x_h = inputs
        self.y_h = numpy.dot(self.w_ih, self.x_h)
        self.z_h = self.sigmoid(self.y_h)
        
        # 出力層
        self.x_o = self.z_h
        self.y_o = numpy.dot(self.w_ho, self.x_o)
        self.z_o = self.sigmoid(self.y_o)
        
        pass
    

    def back_propagation(self, targets_list):
        # 正解行列を 10 × 1 の行列に変換
        targets = numpy.array(targets_list, ndmin=2).T
        
        # 重みw_hoの更新のための勾配を計算
        # ∂E/∂w_ho
        # = ∂y_o/∂w_ho・∂E/∂w_ho
        # = ∂y_o/∂w_ho・∂zo/∂y_o・∂E/∂z_o
        # ∂y_o/∂w_o = x_o
        # ∂z_o/∂y_o = sigmoid_d(z_o)
        # ∂E/∂z_o = ∂(((z_o - target)^2)/2)/∂z_o = (z_o - target)
        # => ∂E/∂w_o = (z_o - target)・sigmoid_d(y_o)・x_o
        
        # delta_ho = ∂z_o/∂y_o・∂E/∂z_o = (z_o - target)・sigmoid_d(z_o)
        delta_ho = (self.z_o - targets) * self.sigmoid_d(self.z_o) # 出力層のノード数 × 1
        # grade_ho = ∂E/∂w_o = delta_ho・x_o
        grad_ho = numpy.dot(delta_ho, self.x_o.T) # 出力層のノード数 × 隠れ層のノード数

        
        # 重みw_ihの更新のための勾配を計算
        # ∂E/∂w_ih
        # = ∂y_h/∂w_hi・∂E/∂w_hi
        # = ∂y_h/∂w_hi・∂z_h/∂y_h・∂E/∂z_h
        # ∂y_h/∂w_h = x_h
        # ∂z_h/∂y_h = sigmoid_d(z_h)
        # ∂E/∂z_h
        # = ∂E/∂x_o
        # = ∂y_o/∂x_o・∂z_o/∂y_o・∂E/∂z_o
        # = w_ho・delta_ho
        # => ∂E/∂w_ih = w_ho・delta_ho・sigmoid_d(y_h)・x_h
        
        # delta_ih = ∂z_h/∂y_h・∂E/∂z_h = w_ho・delta_ho・sigmoid_d(z_h)
        delta_ih = numpy.dot(self.w_ho.T, delta_ho) * self.sigmoid_d(self.z_h) # 隠れ層のノード数 × 1
        # grade_ih = ∂E/∂wo = delta_ih・xo
        grad_ih = numpy.dot(delta_ih, self.x_h.T) # 隠れ層のノード数　×　入力層のノード数
        
        # 重みw_ho更新
        self.w_ho -= self.lr * grad_ho
        # 重みw_ih更新
        self.w_ih -= self.lr * grad_ih
        
    def get_loss(self, targets_list):
        # 正解行列を 10 × 1 の行列に変換
        targets = numpy.array(targets_list, ndmin=2).T
        return ((targets - self.z_o) ** 2).sum() / 2
        
    def query(self, inputs_list):
        # 順伝搬で推論
        self.forward_propagation(inputs_list)
        return self.z_o
    
    def train(self, inputs_list, targets_list):
        # 順伝搬で推論
        self.forward_propagation(inputs_list)
        # 逆伝搬で学習
        self.back_propagation(targets_list)
        pass



In [3]:
def test(nn, test_data_list):
    scorecard = []

    for record in test_data_list:
        all_values = record.split(',')
        correct_label = int(all_values[0])
        inputs = (numpy.asfarray(all_values[1:]) / 255.0 * 0.99) + 0.01
        outputs = nn.query(inputs)
        label = numpy.argmax(outputs)
        if (label == correct_label):
            scorecard.append(1)
        else:
            scorecard.append(0)
            pass
        pass
    scorecard_array = numpy.asarray(scorecard)
    accuracy = scorecard_array.sum() / scorecard_array.size
    return accuracy

In [4]:
def train(nn, train_data_list, epochs, test_data_list = []):
    loss_list = []
    acc_list  = []
    e_list    = []

    for e in range(epochs):
        loss = 0
        for record in train_data_list:
            all_values = record.split(',')
            inputs = (numpy.asfarray(all_values[1:]) / 255.0 * 0.99) + 0.01
            targets = numpy.zeros(output_nodes) + 0.01
            targets[int(all_values[0])] = 0.99
            nn.train(inputs, targets)
            loss += nn.get_loss(targets)
            pass
        loss = loss / len(train_data_list)
        loss_list.append(loss)
        if len(test_data_list) != 0:
            acc_list.append(test(nn, test_data_list))
        e_list.append(e + 1)
        pass
    fig1 = plt.figure()
    plt.plot(e_list, loss_list)
    fig1.suptitle("Loss")
    
    fig2 = plt.figure()
    plt.plot(e_list, acc_list)
    fig2.suptitle("Accuracy")

In [5]:
# number of input, hidden and output nodes
input_nodes  = 784
hidden_nodes = 200
output_nodes = 10

# learning rate
learning_rate = 0.1

# create instance of neural network
nn = neuralNetwork(input_nodes,hidden_nodes,output_nodes, learning_rate)

In [6]:
# load the mnist training data CSV file into a list
if not os.path.exists("dataset"):
    os.makedirs("dataset")
if not os.path.exists("dataset/mnist_train.csv"):
    urllib.request.urlretrieve(
        "https://pjreddie.com/media/files/mnist_train.csv",
        "dataset/mnist_train.csv"
    )
    
train_data_file = open("dataset/mnist_train.csv", 'r')
train_data_list = train_data_file.readlines()
train_data_file.close()

In [7]:
# load the mnist test data CSV file into a list
if not os.path.exists("dataset"):
    os.makedirs("dataset")
if not os.path.exists("dataset/mnist_test.csv"):
    urllib.request.urlretrieve(
        "https://pjreddie.com/media/files/mnist_test.csv",
        "dataset/mnist_test.csv"
    )
test_data_file = open("dataset/mnist_test.csv", 'r')
test_data_list = test_data_file.readlines()
test_data_file.close()

In [8]:
# 学習済みの重みをセットする時に実行
#w_ih = numpy.load("weight_data/w_ih.npy")
#w_ho = numpy.load("weight_data/w_ho.npy")

w_ih = numpy.loadtxt("weight/w_ih.csv", delimiter=',')
w_ho = numpy.loadtxt("weight/w_ho.csv", delimiter=',')

nn.set_weight(w_ih, w_ho)

In [8]:
# 学習時に実行
#train(nn, train_data_list, 10) # 学習だけ
train(nn, train_data_list, 5, test_data_list) # 精度を確認しながら評価

KeyboardInterrupt: 

In [None]:
# 精度評価する時に実行
test(nn, test_data_list)

In [None]:
# 重みの取り出す時に実行
if not os.path.exists("weight"):
    os.makedirs("weight")
#numpy.save("weight_data/w_ih.npy", nn.w_ih)
#numpy.save("weight_data/w_ho.npy", nn.w_ho)

numpy.savetxt("weight/w_ih.csv", nn.w_ih, delimiter=',')
numpy.savetxt("weight/w_ho.csv", nn.w_ho, delimiter=',')

In [None]:
# outputを確認するときに実行
# ニューラルネットにテスト用のデータセットを入力したときの出力をcsvに出力
if not os.path.exists("output"):
    os.makedirs("output")

outtput_list = []
for record in test_data_list:
    all_values = record.split(',')
    inputs = (numpy.asfarray(all_values[1:]) / 255.0 * 0.99) + 0.01
    outputs = nn.query(inputs)
    output_list.append(outputs)
    pass

numpy.savetxt("output/test_dataset_output.csv", nn.w_ih, delimiter=',')