In [1]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

def sigmoid(x):
    return 1 / (1+np.exp(-x))

In [7]:
from datetime import datetime      # datetime.now() 를 이용하여 학습 경과 시간 측정

class NeuralNetwork:
    
    def __init__(self, input_nodes, hidden_1_nodes, hidden_2_nodes, output_nodes, learning_rate):
        
        self.input_nodes = input_nodes
        self.hidden_1_nodes = hidden_1_nodes
        self.hidden_2_nodes = hidden_2_nodes
        self.output_nodes = output_nodes
        
        # 은닉층 가중치  W2 = (784 X 100) Xavier/He 방법으로 self.W2 가중치 초기화
        self.W2 = np.random.randn(self.input_nodes, self.hidden_1_nodes) / np.sqrt(self.input_nodes/2)
        self.b2 = np.random.rand(self.hidden_1_nodes)      
        
        # 출력층 가중치는 W3 = (100X10)  Xavier/He 방법으로 self.W3 가중치 초기화
        self.W3 = np.random.randn(self.hidden_1_nodes, self.hidden_2_nodes) / np.sqrt(self.hidden_1_nodes/2)
        self.b3 = np.random.rand(self.hidden_2_nodes)      
        
        self.W4 = np.random.randn(self.hidden_2_nodes, self.output_nodes) / np.sqrt(self.hidden_2_nodes/2)
        self.b4 = np.random.rand(self.output_nodes)
                        
        self.Z4 = np.zeros([1, output_nodes])
        self.A4 = np.zeros([1, output_nodes])
            
        # 출력층 선형회귀 값 Z3, 출력값 A3 정의 (모두 행렬로 표시)
        self.Z3 = np.zeros([1, hidden_2_nodes])
        self.A3 = np.zeros([1, hidden_2_nodes])
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 정의 (모두 행렬로 표시)
        self.Z2 = np.zeros([1, hidden_1_nodes])
        self.A2 = np.zeros([1, hidden_1_nodes])
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 정의 (모두 행렬로 표시)
        self.Z1 = np.zeros([1, input_nodes])    
        self.A1 = np.zeros([1, input_nodes])       
        
        # 학습률 learning rate 초기화
        self.learning_rate = learning_rate
        
    def feed_forward(self):  
        
        delta = 1e-7    # log 무한대 발산 방지
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 계산
        self.Z1 = self.input_data
        self.A1 = self.input_data
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 계산    
        self.Z2 = np.dot(self.A1, self.W2) + self.b2
        self.A2 = sigmoid(self.Z2)
        
        # 출력층 선형회귀 값 Z3, 출력값 A3 계산
        self.Z3 = np.dot(self.A2, self.W3) + self.b3
        self.A3 = sigmoid(self.Z3)
        
        self.Z4 = np.dot(self.A3, self.W4) + self.b4
        y = self.A4 = sigmoid(self.Z4)
        
        # cross entropy 
        return  -np.sum( self.target_data*np.log(y + delta) + (1-self.target_data)*np.log((1 - y)+delta ) )    
    
    def loss_val(self):

        delta = 1e-7    # log 무한대 발산 방지
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 계산
        self.Z1 = self.input_data
        self.A1 = self.input_data
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 계산    
        self.Z2 = np.dot(self.A1, self.W2) + self.b2
        self.A2 = sigmoid(self.Z2)
        
        # 출력층 선형회귀 값 Z3, 출력값 A3 계산
        self.Z3 = np.dot(self.A2, self.W3) + self.b3
        self.A3 = sigmoid(self.Z3)
        
        self.Z4 = np.dot(self.A3, self.W4) + self.b4
        y = self.A4 = sigmoid(self.Z4)
        
        # cross entropy 
        return  -np.sum( self.target_data*np.log(y + delta) + (1-self.target_data)*np.log((1 - y)+delta ) )    
   
    
    # 정확도 측정함수 
    def accuracy(self, test_input_data, test_target_data):
        
        matched_list = []
        not_matched_list = []
        
        
        for index in range(len(test_input_data)):
                        
            label = int(test_target_data[index])
                        
            # one-hot encoding을 위한 데이터 정규화 (data normalize)
            data = (test_input_data[index] / 255.0 * 0.99) + 0.01
                  
            # predict 를 위해서 vector 을 matrix 로 변환하여 인수로 넘겨줌
            predicted_num = self.predict(np.array(data, ndmin=2)) 
        
            if label == predicted_num:
                matched_list.append(index)
                
            else:
                
                not_matched_list.append(index)
        
        accuracy_val = (len(matched_list)/(len(test_input_data)))
        
        return accuracy_val, not_matched_list
    
    
    def train(self, input_data, target_data):   # input_data : 784 개, target_data : 10개
        
        self.target_data = target_data    
        self.input_data = input_data
        
        # 먼저 feed forward 를 통해서 최종 출력값과 이를 바탕으로 현재의 에러 값 계산
        loss_val = self.feed_forward()
    
        loss_4 = (self.A4-self.target_data) * self.A4 * (1-self.A4)     
        
        self.W4 = self.W4 - self.learning_rate * np.dot(self.A3.T, loss_4)        
        self.b4 = self.b4 - self.learning_rate * loss_4

        # 출력층 loss 인 loss_3 구함
        loss_3 = np.dot(loss_4, self.W4.T) * self.A3 * (1-self.A3)
        
        # 출력층 가중치 W3, 출력층 바이어스 b3 업데이트
        self.W3 = self.W3 - self.learning_rate * (np.dot(self.A2.T, loss_3))   
        
        self.b3 = self.b3 - self.learning_rate * loss_3  
        
        # 은닉층 loss 인 loss_2 구함        
        loss_2 = np.dot(loss_3, self.W3.T) * self.A2 * (1-self.A2)
        
        # 은닉층 가중치 W2, 은닉층 바이어스 b2 업데이트
        self.W2 = self.W2 - self.learning_rate * (np.dot(self.A1.T, loss_2))   
        
        self.b2 = self.b2 - self.learning_rate * loss_2
        
        
    def predict(self, input_data):        # input_data 는 행렬로 입력됨 즉, (1, 784) shape 을 가짐        
        
        Z2 = np.dot(input_data, self.W2) + self.b2
        A2 = sigmoid(Z2)
        
        Z3 = np.dot(A2, self.W3) + self.b3
        A3 = sigmoid(Z3)
        
        Z4 = np.dot(A3, self.W4) + self.b4
        y = A4 = sigmoid(Z4)
        
        predicted_num = np.argmax(y)
    
        return predicted_num
    
    def a(self, target_data):
        print("self.W4.shape = ", self.W4.shape)
        print("self.A3,shape = ", self.A3.shape)
        print("self.A3.T.shape = ", self.A3.T.shape)
        print("loss_4.shape = ", ((self.A4-target_data) * self.A4 * (1-self.A4)).shape)

In [3]:
# 0~9 숫자 이미지가 784개의 숫자 (28X28) 로 구성되어 있는 training data 읽어옴

try:
    training_data = np.loadtxt('./mnist_train.csv', delimiter=',', dtype=np.float32)

    print("training_data.shape = ", training_data.shape)
    print("training_data[0,0] = ", training_data[0,0], ", len(training_data[0]) = ", len(training_data[0]))
    
except Exception as err:
    
    print('Exception occur !!')

training_data.shape =  (60000, 785)
training_data[0,0] =  5.0 , len(training_data[0]) =  785


In [8]:
# hyper-parameter
i_nodes = training_data.shape[1] - 1    # input nodes 개수
h1_nodes = 30     # hidden 1 nodes
h2_nodes = 20
o_nodes = 10       # output nodes
lr = 0.1           # learning rate
epochs = 1         # epochs

# 손실함수 값을 저장할 list 생성
loss_val_list = []

nn = NeuralNetwork(i_nodes, h1_nodes, h2_nodes, o_nodes, lr)

start_time = datetime.now()

for i in range(epochs):
    
    for step in range(len(training_data)):  # train
    
        # input_data, target_data normalize        
        target_data = np.zeros(o_nodes) + 0.01    
        target_data[int(training_data[step, 0])] = 0.99
    
        input_data = ((training_data[step, 1:] / 255.0) * 0.99) + 0.01
    
        nn.train( np.array(input_data, ndmin=2), np.array(target_data, ndmin=2) )  # train 할 경우에 행렬로 입력
    
        if step % 1000 == 0:
            print("epochs = ", i, ", step = ", step,  ", current loss_val = ", nn.loss_val())
        
        # 손실함수 값 저장
        loss_val_list.append(nn.loss_val())        
        
end_time = datetime.now() 
print("\nelapsed time = ", end_time - start_time) 

epochs =  0 , step =  0 , current loss_val =  9.799139001959393
epochs =  0 , step =  1000 , current loss_val =  3.179463067991308
epochs =  0 , step =  2000 , current loss_val =  3.4776956545196698
epochs =  0 , step =  3000 , current loss_val =  3.9398199253403914
epochs =  0 , step =  4000 , current loss_val =  2.1267593419370154
epochs =  0 , step =  5000 , current loss_val =  1.6443237450531472
epochs =  0 , step =  6000 , current loss_val =  1.307701912962202
epochs =  0 , step =  7000 , current loss_val =  2.795701718285013
epochs =  0 , step =  8000 , current loss_val =  0.9526092258973469
epochs =  0 , step =  9000 , current loss_val =  1.3943025700515792
epochs =  0 , step =  10000 , current loss_val =  0.98194315334467
epochs =  0 , step =  11000 , current loss_val =  1.0403011163726377
epochs =  0 , step =  12000 , current loss_val =  1.0348025589462089
epochs =  0 , step =  13000 , current loss_val =  1.1997847517563458
epochs =  0 , step =  14000 , current loss_val =  0.6

In [None]:
# 0~9 숫자 이미지가 784개의 숫자 (28X28) 로 구성되어 있는 test data 읽어옴

try:
    
    test_data = np.loadtxt('./mnist_test.csv', delimiter=',', dtype=np.float32)

    test_input_data = test_data[ : , 1: ]
    test_target_data = test_data[ : , 0 ]

    print("test_data.shape = ", test_data.shape)
    print("test_data[0,0] = ", test_data[0,0], ", len(test_data[0]) = ", len(test_data[0]))

    # measure accuracy
    (accuracy_ret, false_list) = nn.accuracy(test_input_data, test_target_data)   

    print('Accuracy = ', np.round(100*accuracy_ret, 3), ' %')
    
except Exception as err:
    
    print('Exception occur !!')

In [None]:
# 손실함수 추세 확인
Y_DATA_LIST = []

for index in range(0, len(loss_val_list), 500):
    Y_DATA_LIST.append(loss_val_list[index])
    
plt.title('MNIST Loss Value Trend')
plt.xlabel('data index ( X 500)')
plt.ylabel('loss value')
plt.grid()
plt.plot(Y_DATA_LIST, color='b')
plt.show()

In [10]:
n = NeuralNetwork(784, 30, 20, 10, 2)

for step in range(len(training_data)):
    target_data = np.zeros(10) + 0.01
    target_data[int(training_data[step, 0])] = 0.99

n.a(np.array(target_data, ndmin=2))

self.W4.shape =  (20, 10)
self.A3,shape =  (1, 20)
self.A3.T.shape =  (20, 1)
loss_4.shape =  (1, 10)
