In [1]:
import numpy as np
import math
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import cv2
from sklearn.metrics import accuracy_score

In [2]:
RANDOM_STATE = 111
BATCH_SIZE = 500
EPOCHS = 1

In [3]:
dir_path = '../data/'

train, train_labels = [], []
with open(dir_path + 'train.txt', 'r') as f:
  for line in f:
    data = line.split(" ")
    train.append(dir_path + data[0])
    train_labels.append(data[1].replace('\n', ''))


val, val_labels = [], []
with open(dir_path + 'val.txt', 'r') as f:
  for line in f:
    data = line.split(" ")
    val.append(dir_path + data[0])
    val_labels.append(data[1].replace('\n', ''))


test, test_labels = [], []
with open(dir_path + 'test.txt', 'r') as f:
  for line in f:
    data = line.split(" ")
    test.append(dir_path + data[0])
    test_labels.append(data[1].replace('\n', ''))

print(f'訓練資料共{len(train)}筆')
print(f'驗證資料共{len(val)}筆')
print(f'測試資料共{len(test)}筆')

訓練資料共63325筆
驗證資料共450筆
測試資料共450筆


### 資料前處理

In [4]:
def data_preprocess(paths):
    features = []
    for path in tqdm(paths):
        # 讀取image，並轉為灰階影像
        img = cv2.imread(path, 0)
        img = cv2.resize(img, (32, 32))
        # 標準化(MaxAbs)，灰階影像最大為255
        img = img / 255
        features.append(img)
        
    return features

In [5]:
%%time
train_features = data_preprocess(train)
val_features = data_preprocess(val)
test_features = data_preprocess(test)

  0%|          | 0/63325 [00:00<?, ?it/s]

  0%|          | 0/450 [00:00<?, ?it/s]

  0%|          | 0/450 [00:00<?, ?it/s]

CPU times: user 34.3 s, sys: 3.13 s, total: 37.4 s
Wall time: 42 s


### OneHotEncoding

In [6]:
# 轉成np.array再轉成oneHotEncoding
def OneHotEncoding(labels):
    return np.eye(50)[np.array(list(map(int, labels)))]

### shuffle tool

In [7]:
def shuffle(x, y):
    index = np.arange(x.shape[0])
    np.random.shuffle(index)
    return x[index], y[index]

In [8]:
X_train, Y_train = shuffle(np.array(train_features), OneHotEncoding(train_labels))
X_val, Y_val = shuffle(np.array(val_features), OneHotEncoding(val_labels))
X_test, Y_test = shuffle(np.array(test_features), OneHotEncoding(test_labels))

### 評估指標

In [9]:
def top1_acc(labels, pro):
    correct = []
    for i, p in enumerate(pro):
        pred_y = np.argsort(p)[-1]
        if int(labels[i]) == pred_y:
            correct.append(1)
        else:
            correct.append(0)
    
    return round(sum(correct) / len(correct), 4)

def top5_acc(labels, pro):
    correct = []
    for i, p in enumerate(pro):
        top5_y = np.argsort(p)[-5:]
        if int(labels[i]) in list(top5_y):
            correct.append(1)
        else:
            correct.append(0)
    
    return round(sum(correct) / len(correct), 4)

### Activation Function
- sigmoid
- [softmax](https://zhuanlan.zhihu.com/p/105722023)

In [10]:
class sigmoid():
    def __init__(self):
        self.x = None
        
    def forward(self, x):
        self.x = x
        return 1.0 / (1.0 + np.exp(-x))

    def backward(self, grad):
        x = self.x
        return grad * self.forward(x) * (1-self.forward(x))

class softmax():
    def __init__(self):
        self.prob = None
        
    def forward(self, x):
        y = np.exp(x)
        z = y / np.sum(y, axis=1, keepdims=True)
        self.prob = z
        return z

    def backward(self, y_true):
        return self.prob - y_true

### nn_layer
- https://github.com/toxtli/lenet-5-mnist-from-scratch-numpy/blob/master/app.py
- ChatGPT

In [11]:
class conv():
    def __init__(self, input_size, output_size, kernel, stride=1, padding=0, bias=True):
        self.input_size = input_size      # input neurons
        self.output_size = output_size    # output neurons (#Kernels)
        self.kernel = kernel              # kernel size
        self.stride = stride              # kernel move size
        self.pad = padding            # add extra zero data outside image
        
        # weight initialization
        self.w = {'value': np.random.RandomState(RANDOM_STATE).rand(output_size, input_size, kernel, kernel), 'grad': 0}
        # self.w = {'val': np.random.normal(0.0, np.sqrt(2/input_size), (output_size,input_size,F,F)), 'grad': 0}
        self.b = {'value': np.random.RandomState(RANDOM_STATE).rand(output_size), 'grad': 0}
        self.X = None

    def forward(self, X):
        X = np.pad(X, ((0,0),(0,0), (self.pad, self.pad), (self.pad, self.pad)), 'constant')
        # N為訓練資料大小, H & W為圖片的向量長寬
        (N, input_size, H, W) = X.shape
        
        # convolution後的圖片向量長寬
        CH = int((H + 2*self.pad - self.kernel)/self.stride + 1)
        CW = int((W + 2*self.pad - self.kernel)/self.stride + 1)
        Y = np.zeros((N, self.output_size, CH, CW))
        
        
        for n in range(N):  # 每筆資料
            for c in range(self.output_size):  # 每個kernel
                for h in range(CH):
                    for w in range(CW):
                        # data * weight
                        Y[n, c, h, w] = np.sum(X[n, :, h:h+self.kernel, w:w+self.kernel] * self.w['value'][c, :, :, :]) + self.b['value'][c]

        self.X = X # backward會用到
        return Y

    def backward(self, grad):
        X = self.X
        (N, input_size, H, W) = X.shape
        CH = int((H + 2*self.pad - self.kernel)/self.stride + 1)
        CW = int((W + 2*self.pad - self.kernel)/self.stride + 1)

        dX = np.zeros(X.shape)
        dw = np.zeros(self.w['value'].shape)
        db = np.zeros(self.b['value'].shape)

        # dW
        for out in range(self.output_size):
            for inp in range(input_size):
                for h in range(self.kernel):
                    for w in range(self.kernel):
                        dw[out, inp, h, w] = np.sum(X[:, inp, h:h+CH, w:w+CW] * grad[:, out, :, :])

        # db
        for out in range(self.output_size):
            db[out] = np.sum(grad[:, out, :, :])
        
        # dX
        W_rot = np.rot90(np.rot90(self.w['value']))
        grad_pad = np.pad(grad, ((0,0),(0,0), (self.kernel, self.kernel), (self.kernel, self.kernel)), 'constant')
        for n in range(N):
            for inp in range(input_size):
                for h in range(H):
                    for w in range(W):
                        dX[n, inp, h, w] = np.sum(W_rot[:, inp, :, :] * grad_pad[n, :, h:h+self.kernel, w:w+self.kernel])

        return dX
    
class maxPool():
    def __init__(self, kernel, stride):
        self.kernel = kernel
        self.stride = stride
        self.mask = None

    def forward(self, X):
        # if H % 2 == 1:
        #     self.ori_shape = True
        #     X = np.pad(X, ((0,0),(0,0),(0,1),(0,1)), 'constant')
        #     N, C_in, H, W = X.shape
        
        (N, input_size, H, W) = X.shape
        # convolution後的圖片向量長寬
        S = self.stride
        F = self.kernel
        CH = int((H - F)/S + 1)
        CW = int((W - F)/S + 1)
        Y = np.zeros((N, input_size, CW, CH))
        M = np.zeros(X.shape) # mask
        
        for n in range(N):
            for inp in range(input_size):
                for h in range(CH):
                    for w in range(CW):
                        # 回傳kernel內最大值
                        Y[n, inp, h, w] = np.max(X[n, inp, h*S:h*S+F, w*S:w*S+F])
                        # 將kernel內最大值的index轉化為座標
                        i, j = np.unravel_index(X[n, inp, h*S:h*S+F, w*S:w*S+F].argmax(), (F,F))
                        M[n, inp, h*S+i, w*S+j] = 1
                        
        self.mask = M
        return Y

    def backward(self, grad):
        M = self.mask
        F = self.kernel
        (N, input_size, H, W) = M.shape
        grad = np.array(grad)
        
        # dX
        dX = np.zeros(M.shape)
        for n in range(N):
            for inp in range(input_size):
                dX[n, inp, :, :] = grad[n, inp, :, :].repeat(F, axis=0).repeat(F, axis=1)
        return dX*M

    
class FC():
    def __init__(self, input_size, output_size):
        self.w = {'value': np.random.RandomState(RANDOM_STATE).rand(input_size, output_size), 'grad': 0}
        self.b = {'value': np.random.RandomState(RANDOM_STATE).rand(output_size), 'grad': 0}
        self.X = None
        
    def forward(self, X):
        self.X = X
        return np.dot(X, self.w['value']) + self.b['value']

    def backward(self, grad):
        X = self.X
        dX = np.dot(grad, self.w['value'].T)
        self.w['grad'] = np.dot(X.T, grad)
        self.b['grad'] = np.sum(grad, axis=0)
        # self.update_params()
        return dX

    def update_params(self, lr=0.001):
        self.w['value'] -= lr * self.w['grad']
        self.b['value'] -= lr * self.b['grad']

### Loss Function

In [12]:
def CrossEntropy(y_pred, y_true):
    # np.seterr(divide = 'ignore')
    loss = -np.sum(np.multiply(y_true , np.log(y_pred)))
    # loss = -np.sum(np.multiply(y_true , np.log(y_pred, where=y_pred>0)))
    return loss

### Optimization
- [SGDMomentum](https://www.sciencedirect.com/topics/engineering/momentum-coefficient)
- velocity = momentum * velocity + (1 - momentum) * gradient
- weight = weight - (learning_rate * velocity + regularization * weight)

In [13]:
class SGDMomentum():
    def __init__(self, params, lr, momentum, reg):
        self.len = len(params)
        self.parameters = params
        self.velocities = []
        
        for param in self.parameters:
            self.velocities.append(np.zeros(param['value'].shape))
            
        self.lr = lr
        # Momentum coefficient: A parameter between 0 and 1 used to increase the rate at which the weight factors are adjusted
        self.momentum = momentum  
        # the L2 regularization coefficient
        self.reg = reg

    def step(self):
        for i in range(self.len):
            self.velocities[i] = self.momentum * self.velocities[i] + (1 - self.momentum) * self.parameters[i]['grad']  
            self.parameters[i]['value'] -= (self.lr * self.velocities[i] + self.reg * self.parameters[i]['value'])

### LeNet5
- https://github.com/toxtli/lenet-5-mnist-from-scratch-numpy/blob/master/app.py
- ChatGPT

In [14]:
class LeNet5():
    def __init__(self):
        self.conv1  = conv(1, 6, 5)
        self.sigmoid1 = sigmoid()
        self.pool1 = maxPool(2, 2)
        self.conv2 = conv(6, 16, 5)
        self.sigmoid2 = sigmoid()
        self.pool2 = maxPool(2,2)
        self.FC1 = FC(16*5*5, 120)
        self.sigmoid3 = sigmoid()
        self.FC2 = FC(120, 84)
        self.sigmoid4 = sigmoid()
        self.FC3 = FC(84, 50)
        self.softmax = softmax()
        
        self.p2_shape = None

    def forward(self, x):
        h1 = self.conv1.forward(x)
        a1 = self.sigmoid1.forward(h1)
        p1 = self.pool1.forward(a1)
        
        h2 = self.conv2.forward(p1)
        a2 = self.sigmoid2.forward(h2)
        p2 = self.pool2.forward(a2)
        self.p2_shape = p2.shape
        
        fl = p2.reshape(x.shape[0], -1) # Flatten
        h3 = self.FC1.forward(fl)
        a3 = self.sigmoid3.forward(h3)
        h4 = self.FC2.forward(a3)
        a4 = self.sigmoid4.forward(h4)
        h5 = self.FC3.forward(a4)
        a5 = self.softmax.forward(h5)
        return a5
    
    def backward(self, grad):
        grad = self.FC3.backward(grad)
        grad = self.sigmoid4.backward(grad)
        grad = self.FC2.backward(grad)
        grad = self.sigmoid3.backward(grad)
        grad = self.FC1.backward(grad)
        
        grad = grad.reshape(self.p2_shape) # reshape
        grad = self.pool2.backward(grad)
        grad = self.sigmoid2.backward(grad)
        grad = self.conv2.backward(grad)
        
        grad = self.pool1.backward(grad)
        grad = self.sigmoid1.backward(grad)
        grad = self.conv1.backward(grad)
        
    def get_params(self):
        return [self.conv1.w, self.conv1.b, self.conv2.w, self.conv2.b, self.FC1.w,
                self.FC1.b, self.FC2.w, self.FC2.b, self.FC3.w, self.FC3.b]

    def set_params(self, params):
        [self.conv1.w, self.conv1.b, self.conv2.w, self.conv2.b, self.FC1.w, 
         self.FC1.b, self.FC2.w, self.FC2.b, self.FC3.w, self.FC3.b] = params

### Improved LeNet5
- Activation function: x = Sigmoid(x) ==> x = x*sigmoid(x)<br>dy/dx = sigmoid(x) + x * sigmoid'(x);   sigmoid'(x) = sigmoid(x) * (1 - sigmoid(x))
- kernel size: 5x5 ==> 3x3
- Increase one convolution layer to LeNet5 (any position).

In [15]:
class ImprovedLeNet5():
    def __init__(self):
        self.conv1  = conv(1, 5, 3)
        self.sigmoid1 = sigmoid()
        self.pool1 = maxPool(2, 2)
        self.conv2 = conv(5, 10, 3) # new conv layer
        self.sigmoid2 = sigmoid()
        self.pool2 = maxPool(2,2)
        self.conv3 = conv(10, 16, 3)
        self.sigmoid3 = sigmoid()
        self.pool3 = maxPool(2,2)
        self.FC1 = FC(16*3*3, 120)
        self.sigmoid4 = sigmoid()
        self.FC2 = FC(120, 84)
        self.sigmoid5 = sigmoid()
        self.FC3 = FC(84, 50)
        self.softmax = softmax()
        
        self.p3_shape = None

    def forward(self, x):
        h1 = self.conv1.forward(x)
        a1 = self.sigmoid1.forward(h1)
        a11 = self.a1 * self.h1
        p1 = self.pool1.forward(a11)
        
        h2 = self.conv2.forward(p1)
        a2 = self.sigmoid2.forward(h2)
        a21 = self.a2 * self.h2
        p2 = self.pool2.forward(a21)
        
        h3 = self.conv3.forward(p2)
        a3 = self.sigmoid3.forward(h3)
        a31 = self.a3 * self.h3
        p3 = self.pool3.forward(a31)
        self.p3_shape = p3.shape
        
        fl = p3.reshape(x.shape[0], -1) # Flatten
        h4 = self.FC1.forward(fl)
        a4 = self.sigmoid4.forward(h4)
        a41 = self.a4 * self.h4
        
        h5 = self.FC2.forward(a41)
        a5 = self.sigmoid5.forward(h5)
        a51 = self.a5 * self.h5
        
        h6 = self.FC3.forward(a51)
        a6 = self.softmax.forward(h6)
        return a6
    
    def backward(self, grad):
        # dy/dx = sigmoid(x) + x * sigmoid'(x)
        grad = self.FC3.backward(grad) 
        grad1 = grad * self.a5   # sigmoid(x)
        grad2 = self.sigmoid5.backward(grad * self.h5) # x * sigmoid'(x)
        grad = grad1 + grad2
        
        grad = self.FC2.backward(grad)
        grad1 = grad * self.a4   
        grad2 = self.sigmoid4.backward(grad * self.h4) 
        grad = grad1 + grad2
        
        grad = self.FC1.backward(grad)
        grad = grad.reshape(self.p3_shape)
        
        grad = self.pool3.backward(grad)
        grad1 = grad * self.a3   
        grad2 = self.sigmoid3.backward(grad * self.h3) 
        grad = grad1 + grad2
        grad = self.conv3.backward(grad)
        
        grad = self.pool2.backward(grad)
        grad1 = grad * self.a2   
        grad2 = self.sigmoid2.backward(grad * self.h2) 
        grad = grad1 + grad2
        grad = self.conv2.backward(grad)
        
        grad = self.pool1.backward(grad)
        grad1 = grad * self.a1   
        grad2 = self.sigmoid1.backward(grad * self.h1) 
        grad = grad1 + grad2
        grad = self.conv1.backward(grad)
        
    def get_params(self):
        return [self.conv1.w, self.conv1.b, self.conv2.w, self.conv2.b, self.conv3.w, self.conv3.b,
                self.FC1.w, self.FC1.b, self.FC2.w, self.FC2.b, self.FC3.w, self.FC3.b]

    def set_params(self, params):
        [self.conv1.w, self.conv1.b, self.conv2.w, self.conv2.b, self.conv3.w, self.conv3.b,
         self.FC1.w, self.FC1.b, self.FC2.w, self.FC2.b, self.FC3.w, self.FC3.b] = params

### Train

In [16]:
class train_LeNet5():
    def __init__(self, batch, epoch, model):
        self.epoch = epoch
        self.batch = batch
        self.model = model
        self.optim = SGDMomentum(self.model.get_params(), lr=1e-4, momentum=0.8, reg=3e-4)
        
    def train(self, X_train, Y_train, X_val, Y_val):
        X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1], X_train.shape[2])
        X_val = X_val.reshape(X_val.shape[0], 1, X_val.shape[1], X_val.shape[2])
        
        loss_list = []
        loss_batch_list = []
        for epoch in range(self.epoch):
            print(f'----------Epoch {epoch+1}----------')
            train_acc = 0
            train_top5_acc = 0
            train_loss = 0
            for i in range(0, len(X_train), self.batch):
                if i % (10*self.batch) == 0 and i != 0:
                    train_time = i / self.batch
                    train_batch_acc = train_acc / train_time
                    train_batch_top5_acc = train_top5_acc / train_time
                    train_batch_loss = train_loss / train_time
                    loss_list.append([train_batch_acc, train_batch_top5_acc, train_batch_loss])
                    print(f"Batch {int(train_time)}： train_acc: {train_batch_acc:.4f} ｜ train_top5_acc: {train_batch_top5_acc:.4f} | train_loss: {train_batch_loss:.4f}")
                    
                X_batch = X_train[i: i + self.batch]
                Y_batch = Y_train[i: i + self.batch]
                probs = self.model.forward(X_batch)
                # upstream gradient
                grad = probs - Y_batch
                self.model.backward(grad)
                self.optim.step()
                
                # Evaluate the accuracy and loss
                train_acc += accuracy_score(np.argmax(Y_batch, axis=1), np.argmax(probs, axis=1))
                train_top5_acc += top5_acc(np.argmax(Y_batch, axis=1), probs)
                train_loss += CrossEntropy(probs, Y_batch)
                
            # 計算平均準確率及訓練損失
            train_time = math.ceil(len(X_train)/self.batch)
            train_acc = train_acc / train_time
            train_top5_acc = train_top5_acc / train_time
            train_loss = train_loss / train_time
            
            # 計算驗證準確率及損失
            val_probs = self.model.forward(X_val)
            val_acc =  accuracy_score(np.argmax(Y_val, axis=1), np.argmax(val_probs, axis=1))
            val_top5_acc = top5_acc(np.argmax(Y_val, axis=1), val_probs)
            val_loss = CrossEntropy(val_probs, Y_val)
            
            loss_list.append([train_acc, train_top5_acc, train_loss, val_acc, val_top5_acc, val_loss])
            print(f"Epoch {epoch+1}： train_acc: {train_acc:.4f} | train_loss: {train_loss:.4f} | val_acc: {val_acc:.4f} | val_loss: {val_loss:.4f}")
            
        return loss_list, loss_batch_list
    
    def predict(self, X_test, Y_test):
        test_probs = self.model.forward(X_test)
        test_acc = accuracy_score(np.argmax(Y_test, axis=1), np.argmax(test_probs, axis=1))
        test_top5_acc = top5_acc(np.argmax(Y_test, axis=1), test_probs)
        test_loss = CrossEntropy(test_probs, Y_test)
        print(f"\nTest：\ntest_acc: {test_acc:.4f} | test_top5_acc: {test_top5_acc:.4f} | test_loss: {test_loss:.4f}")
        return [test_acc, test_top5_acc, test_loss]

### result

In [17]:
class plot():
    def __init__(self, result):
        self.result = result
        self.plot_acc()
        self.plot_top5_acc()
        self.plot_loss()
        
    def plot_acc(self):
        plt.plot([x[0] for x in self.result])
        plt.plot([x[3] for x in self.result])
        plt.title('Top1 Accuracy History')
        plt.ylabel('Top1 Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['train', 'val'], loc='upper left')
        plt.savefig("top5_acc.png")
        plt.show()

    def plot_top5_acc(self):
        plt.plot([x[1] for x in self.result])
        plt.plot([x[4] for x in self.result])
        plt.title('Top5 Accuracy History')
        plt.ylabel('Top5 Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['train', 'val'], loc='upper left')
        plt.savefig("top5_acc.png")
        plt.show()

    def plot_loss(self):
        plt.plot([x[2] for x in self.result])
        plt.plot([x[5] for x in self.result])
        plt.title('Loss History')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['train', 'val'], loc='upper left')
        plt.savefig("loss.png")
        plt.show()

In [None]:
#LeNet5
LeNet5 = train_LeNet5(BATCH_SIZE, EPOCHS, LeNet5())
LeNet5_train, LeNet5_batch = LeNet5.train(X_train, Y_train, X_val, Y_val)
LeNet5_test = LeNet5.predict(X_test, Y_test)
plot(LeNet5_train)

----------Epoch 1----------
Batch 10： train_acc: 0.0196 ｜ train_top5_acc: 0.1006 | train_loss: 5464.6227
Batch 20： train_acc: 0.0199 ｜ train_top5_acc: 0.0991 | train_loss: 4886.8011
Batch 30： train_acc: 0.0198 ｜ train_top5_acc: 0.0995 | train_loss: 4589.5477
Batch 40： train_acc: 0.0202 ｜ train_top5_acc: 0.0995 | train_loss: 4421.1214
Batch 50： train_acc: 0.0201 ｜ train_top5_acc: 0.0988 | train_loss: 4319.6659
Batch 60： train_acc: 0.0198 ｜ train_top5_acc: 0.0977 | train_loss: 4251.6670
Epoch 1： train_acc: 0.0197 | train_loss: 4189.0553 | val_acc: 0.0200 | val_loss: 1762.7380
----------Epoch 2----------
Batch 10： train_acc: 0.0196 ｜ train_top5_acc: 0.0966 | train_loss: 3912.1741
Batch 20： train_acc: 0.0191 ｜ train_top5_acc: 0.0963 | train_loss: 3912.4555
Batch 30： train_acc: 0.0195 ｜ train_top5_acc: 0.0965 | train_loss: 3912.4588
Batch 40： train_acc: 0.0202 ｜ train_top5_acc: 0.0987 | train_loss: 3912.1091


In [None]:
# Improved LeNet5
Improved_LeNet5 = train_LeNet5(BATCH_SIZE, EPOCHS, ImprovedLeNet5())
Improved_LeNet5_train, Improved_LeNet5_batch = LeNet5.train(X_train, Y_train, X_val, Y_val)
Improved_LeNet5_test = LeNet5.predict(X_test, Y_test)
plot(Improved_LeNet5_train)