In [1]:
### chan 2020/11/24
### build a cnn for nmist classification task based on numpy

In [2]:
import numpy as np
from layers import *
from loss import loss_MSE, loss_CrossEntropy
from activation import ReLU_forward, ReLU_backward

In [3]:
#from temp_modules import *
#from temp_optimizers import *

In [4]:
# 定义权重、神经元、梯度
weights = {}
weights_scale = 1e-2
filters = 1
fc_units=64
weights["K1"] = weights_scale * np.random.randn(1, filters, 3, 3).astype(np.float64)
weights["b1"] = np.zeros(filters).astype(np.float64)
weights["W2"] = weights_scale * np.random.randn(filters * 13 * 13, fc_units).astype(np.float64)
weights["b2"] = np.zeros(fc_units).astype(np.float64)
weights["W3"] = weights_scale * np.random.randn(fc_units, 10).astype(np.float64)
weights["b3"] = np.zeros(10).astype(np.float64)

# 初始化神经元和梯度
nuerons={}
gradients={}

In [5]:
# 定义前向传播
def forward(X):
    nuerons["conv1"]=conv_forward(X.astype(np.float64),weights["K1"],weights["b1"])
    #print(nuerons["conv1"].shape)
    nuerons["conv1_relu"]=ReLU_forward(nuerons["conv1"])
    #print(nuerons["conv1_relu"].shape)
    nuerons["maxp1"]=pooling_max_forward(nuerons["conv1_relu"].astype(np.float64),poolKernel=(2,2))
    #print(nuerons["maxp1"].shape)

    nuerons["flatten"]=flatten_forward(nuerons["maxp1"])
    #print(nuerons["flatten"].shape)
    
    nuerons["fc2"]=fc_forward(nuerons["flatten"],weights["W2"],weights["b2"])
    #print(nuerons["fc2"].shape)
    nuerons["fc2_relu"]=ReLU_forward(nuerons["fc2"])
    #print(nuerons["fc2_relu"].shape)
    nuerons["y"]=fc_forward(nuerons["fc2_relu"],weights["W3"],weights["b3"])
    #print(nuerons["y"].shape)

    return nuerons["y"]

# 定义反向传播
def backward(X,y_true):
    #print('backward')
    loss,dy=loss_CrossEntropy(nuerons["y"],y_true)
    #print(dy.shape)
    gradients["W3"],gradients["b3"],gradients["fc2_relu"]=fc_backward(dy,weights["W3"],nuerons["fc2_relu"])
    #print(gradients["fc2_relu"].shape)
    gradients["fc2"]=ReLU_backward(gradients["fc2_relu"],nuerons["fc2"])
    #print(gradients["fc2"].shape)
    gradients["W2"],gradients["b2"],gradients["flatten"]=fc_backward(gradients["fc2"],weights["W2"],nuerons["flatten"])
    #print(gradients["flatten"].shape)
    gradients["maxp1"]=flatten_backward(gradients["flatten"],nuerons["maxp1"])
    #print(gradients["maxp1"].shape)
    gradients["conv1_relu"]=pooling_max_backward(gradients["maxp1"].astype(np.float64),nuerons["conv1_relu"].astype(np.float64),poolKernel=(2,2))
    #print(gradients["conv1_relu"].shape)
    gradients["conv1"]=ReLU_backward(gradients["conv1_relu"],nuerons["conv1"])
    #print(gradients["conv1"].shape)
    #print(weights["K1"].shape)
    #print(X.shape)
    gradients["K1"],gradients["b1"],_=conv_backward(gradients["conv1"],weights["K1"],X)
    #print('check...')
    #print(gradients["K1"].shape)
    return loss

In [6]:
from temp_load_mnist import load_mnist_datasets
from temp_utils import to_categorical
train_set, val_set, test_set = load_mnist_datasets('mnist.pkl.gz')
train_x,val_x,test_x=np.reshape(train_set[0],(-1,1,28,28)),np.reshape(val_set[0],(-1,1,28,28)),np.reshape(test_set[0],(-1,1,28,28))
train_y,val_y,test_y=to_categorical(train_set[1]),to_categorical(val_set[1]),to_categorical(test_set[1])

In [7]:
# 随机选择训练样本
train_num = train_x.shape[0]
def next_batch(batch_size):
    idx=np.random.choice(train_num,batch_size)
    return train_x[idx],train_y[idx]

x,y= next_batch(16)
print("x.shape:{},y.shape:{}".format(x.shape,y.shape))

x.shape:(16, 1, 28, 28),y.shape:(16, 10)


In [8]:
def _copy_weights_to_zeros(weights):
    result = {}
    result.keys()
    for key in weights.keys():
        result[key] = np.zeros_like(weights[key])
    return result

class SGD(object):
    """
    小批量梯度下降法
    """

    def __init__(self, weights, lr=0.01, momentum=0.9, decay=1e-5):
        """

        :param weights: 权重，字典类型
        :param lr: 初始学习率
        :param momentum: 动量因子
        :param decay: 学习率衰减
        """
        self.v = _copy_weights_to_zeros(weights)  # 累积动量大小
        self.iterations = 0  # 迭代次数
        self.lr = self.init_lr = lr
        self.momentum = momentum
        self.decay = decay

    #def iterate(self, layers):
    def iterate(self, gradients):
        """
        迭代一次
        :param m: 模型
        :return:
        """
        # 更新学习率
        self.lr = self.init_lr / (1 + self.iterations * self.decay)
        
        for key in weights.keys():
            #self.v[key] = self.momentum * self.v[key] + self.lr * gradients[key]
            #weights[key] -= self.v[key]
            weights[key] = weights[key] - self.lr * gradients[key]
            
            
        
        '''
        # 更新动量和梯度
        for layer in layers:
            for key in layer.weights.keys():
                self.v[key] = self.momentum * self.v[key] + self.lr * layer.gradients[key]
                layer.weights[key] -= self.v[key]
        '''

        # 更新迭代次数
        self.iterations += 1

In [9]:
# 获取精度
def get_accuracy(X,y_true):
    y_predict=forward(X)
    return np.mean(np.equal(np.argmax(y_predict,axis=-1),
                            np.argmax(y_true,axis=-1)))

In [10]:
# 初始化变量
batch_size=2
steps = 2000

# 更新梯度
sgd=SGD(weights,lr=0.01,decay=1e-6)

for s in range(steps):
    X,y=next_batch(batch_size)
    #print(X.shape)

    # 前向过程
    forward(X)
    # 反向过程
    loss=backward(X,y)
    #print(gradients.keys())
    #print(weights.keys())
    #print(nuerons.keys())
    
    sgd.iterate(gradients)
    # 参数更新

    if s % 100 ==0:
        print("\n step:{} ; loss:{}".format(s,loss))
        idx=np.random.choice(len(val_x),200)
        print(" train_acc:{};  val_acc:{}".format(get_accuracy(X,y),get_accuracy(val_x[idx],val_y[idx])))

print("\n final result test_acc:{};  val_acc:{}".
      format(get_accuracy(test_x,test_y),get_accuracy(val_x,val_y)))
print('finish')


 step:0 ; loss:9.99931237273234
 train_acc:0.5;  val_acc:0.12

 step:100 ; loss:8.321827419791397
 train_acc:0.0;  val_acc:0.09

 step:200 ; loss:8.501653935431559
 train_acc:0.0;  val_acc:0.055

 step:300 ; loss:8.10843869756178
 train_acc:0.0;  val_acc:0.09

 step:400 ; loss:7.668313177789439
 train_acc:0.0;  val_acc:0.13

 step:500 ; loss:2.414854949532359
 train_acc:1.0;  val_acc:0.44

 step:600 ; loss:2.237092409344888
 train_acc:0.5;  val_acc:0.365

 step:700 ; loss:3.0440249681280602
 train_acc:0.5;  val_acc:0.645

 step:800 ; loss:1.7848914735825891
 train_acc:1.0;  val_acc:0.71

 step:900 ; loss:1.2125240359629217
 train_acc:1.0;  val_acc:0.65

 step:1000 ; loss:1.9117553711634487
 train_acc:0.5;  val_acc:0.73

 step:1100 ; loss:2.3730064389762093
 train_acc:1.0;  val_acc:0.715

 step:1200 ; loss:1.6752918593607373
 train_acc:0.0;  val_acc:0.835

 step:1300 ; loss:2.032027264632596
 train_acc:1.0;  val_acc:0.725

 step:1400 ; loss:1.4220047564024276
 train_acc:0.5;  val_acc:0

In [11]:
# 随机查看预测结果
import matplotlib.pyplot as plt

idx=np.random.choice(test_x.shape[0],3)
x,y=test_x[idx],test_y[idx]
y_predict = forward(x)
for i in range(3):
    plt.figure(figsize=(3,3))
    plt.imshow(np.reshape(x[i],(28,28)))
    plt.show()
    print("y_true:{},y_predict:{}".format(np.argmax(y[i]),np.argmax(y_predict[i])))

<Figure size 300x300 with 1 Axes>

y_true:7,y_predict:7


<Figure size 300x300 with 1 Axes>

y_true:1,y_predict:1


<Figure size 300x300 with 1 Axes>

y_true:5,y_predict:3
