In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from time import time as stamp
from abc import ABC, abstractmethod
"""ver.2.2"""
"""추가해야할 것 : 1. Xavier, He 초기화
                3. batch normalization
                4. early stopping
                5. nesterov momentum optimizer
"""

np.random.seed(42)

def OneHot(label_data, num_class):
    return np.identity(num_class)[label_data]

def Loss_Cross_Entropy(pred_y, true_y, batch_size):
    return -np.sum(t * np.log(output_layer.y + 1e-8)) / batch_size

def Loss_RMSE(pred_y, true_y):
    return np.sqrt((pred_y - true_y)**2) / 2.0

class Model:
    """Model Declaration"""
    def __init__(self, name=None):
        self.name = name
        self.LayerList = []
        
    def add(self, layer):
        """Layer adding method"""
        self.LayerList.append(layer)
    
    def _forward(self, x_data, is_train):
        """Forward propagation method"""
        for layer in self.LayerList:
            if layer.type[1] == 'dropout':
                x_data = layer.forward(x_data, is_train)
            else:
                x_data = layer.forward(x_data)
        return x_data
    
    def _backprop(self, y_data):
        """Backward propagation method"""
        for layer in self.LayerList[::-1]:
            y_data = layer.backprop(y_data)
        return y_data
    
    def _update(self, alpha):
        """Parameters update method"""
        for layer in self.LayerList:
            layer.update(alpha)
            
    def fit(self, X_train, y_train, X_test, y_test, batch_size, epochs, alpha=0.01):
        """Fitting function"""
        num_train = X_train.shape[0]
        num_test = X_test.shape[0]
        num_batch = num_train // batch_size
        
        self._forward(X_train, False)
        print(f"[0/{epochs} epochs] ",end='')
        self.score(X_train, y_train, X_test, y_test)
        
        t0 = stamp()
        for epoch in range(epochs):
            rand_idx = np.arange(num_train)
            np.random.shuffle(rand_idx)
            for mini_batch in range(num_batch):
                mb_index = rand_idx[mini_batch*batch_size:(mini_batch + 1)*batch_size]
                x = X_train[mb_index, :]
                y = y_train[mb_index, :]
                
                self._forward(x, True)
                self._backprop(y)
                self._update(alpha)
            
            print(f"[{epoch+1}/{epochs} epochs] ",end='')
            self.score(X_train, y_train, X_test, y_test)
            
        t1 = stamp()
        total_time = t1-t0
        minute = total_time // 60
        second = round(total_time % 60, 2)
        
        print(f"Training complete! {minute}minutes, {second}seconds")
        
    def score(self, X_train, y_train, X_test, y_test):
        """Score method"""
        num_train = X_train.shape[0]
        num_test = X_test.shape[0]
        
        predict_y1 = self._forward(X_train, False)
        count_train = np.sum(np.argmax(predict_y1, axis=1) == np.argmax(y_train, axis=1))

        predict_y2 = self._forward(X_test, False)
        count_test = np.sum(np.argmax(predict_y2, axis=1) == np.argmax(y_test, axis=1))

        print(f"Train Accuracy : {round(count_train/num_train*100,3)}%,",
              f"Test Accuracy : {round(count_test/num_test*100,3)}%")

    def predict(self, x):
        """Predict method"""
        return self._forward(x, False)
    
    def total_parameters(self):
        self.total_parameters = 0
        for layer in self.LayerList:
            if layer.type != 'Drop':
                self.total_parameters += layer.W.size + layer.b.size
        return self.total_parameters
    
class Activation(ABC):
    """Activation function Abstract Class"""
    @abstractmethod
    def __init__(self):
        self.type = None
    
    @abstractmethod
    def forward(self):
        pass
    
    @abstractmethod
    def backprop(self):
        pass
    
class Sigmoid(Activation):
    """Sigmoid activation function"""
    def __init__(self):
        self.type = ('activation', 'sigmoid')
        
    def forward(self, x):
        self.x = x
        self.y = 1 / (1 + np.exp(-x))
        return self.y
    
    def backprop(self, dy):
        return dy * (np.exp(-self.x)) / ((np.exp(-self.x)+1)**2)
    
class Linear(Activation):
    """Linear activation function"""
    def __init__(self):
        self.type = ('activation', 'linear')
        
    def forward(self, x):
        self.x = x
        self.y = x
        return self.y
    
    def backprop(self, dy):
        return dy
    
class ReLU(Activation):
    """Reductified Linear Unit activation function"""
    def __init__(self):
        self.type = ('activation', 'relu')
    
    def forward(self, x):
        self.x = x
        self.y = np.where(self.x > 0, x, 0)
        return self.y
    
    def backprop(self, dy):
        return dy * np.where(self.x > 0, 1, 0)

class Softmax(Activation):
    """Softmax activation function"""
    def __init__(self):
        self.type = ('activation', 'softmax')
    
    def forward(self, x):
        if x.ndim == 1:
            x = x - np.max(x)
            self.pred_y = np.exp(x) / np.sum(np.exp(x))
            return self.pred_y
        
        elif x.ndim == 2:
            x = x - np.max(x, axis=1).reshape(-1,1)
            self.pred_y = np.exp(x) / np.sum(np.exp(x), axis=1).reshape(-1,1)
            return self.pred_y
        
        else:
            raise Exception("꺆꺆꺆")
            
    def backprop(self, true_y):
        return self.pred_y - true_y

class Leaky_ReLU(Activation):
    """Leaky ReLU activation function"""
    def __init__(self):
        self.type = ('activation', 'leaky_relu')
        
    def forward(self, x):
        self.x = x
        self.y = np.where(self.x > 0, x, 0.01 * x)
        return self.y
    
    def backprop(self, dy):
        return dy * np.where(self.x > 0, 1, 0.01)

class ELU(Activation):
    """Exponential Linear Unit activation function"""
    def __init__(self):
        self.type = ('activation', 'elu')
        self.alpha = 1.0
        
    def forward(self, x):
        self.x = x
        self.y = np.where(self.x > 0, x, self.alpha * (np.exp(self.x)-1))
        return self.y
    
    def backprop(self, dy):
        return dy * np.where(self.x > 0, 1, self.alpha * np.exp(self.x))

class tanh(Activation):
    """Hyperbolic tangent activation function"""
    def __init__(self):
        self.type = ('activation', 'tanh')
        
    def forward(self, x):
        self.x = x
        self.y = (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))
        return self.y
    
    def backprop(self, dy):
        return dy * (1 - self.y) * (1 + self.y)
    
class PReLU(Activation):
    """Parametric ReLU activation function"""
    def __init__(self, alpha=0.05):
        self.type = ('activation', 'prelu')
        self.alpha = alpha
        
    def forward(self, x):
        self.x = x
        self.y = np.where(self.x > 0, x, self.alpha * x)
        return self.y
    
    def backprop(self, dy):
        return dy * np.where(self.x > 0, 1, self.alpha)
    
class Optimizer(ABC):
    """Optimizer Abstract Class"""
    @abstractmethod
    def __init__(self):
        self.type = None
    
    @abstractmethod
    def update(self, alpha):
        pass
    
class SGD(Optimizer):
    """Stochastic Gradient Descent optimizer"""
    def __init__(self):
        self.type = ('optimizer', 'SGD')
    
    def update(self, params, grads, alpha=0.01):
        self.alpha = alpha
        for key in params.keys():
            params[key] -= self.alpha * grads[key]
            
class Momentum(Optimizer):
    """Momentum optimizer"""
    def __init__(self, momentum=0.9):
        self.type = ('optimizer', 'momentum')
        self.momentum = momentum
        self.v = None
        
    def update(self, params, grads, alpha=0.01):
        self.alpha = alpha
        if self.v is None:
            self.v = {}
            for key, val in params.items():
                self.v[key] = np.zeros_like(val)
                
        for key in params.keys():
            self.v[key] = self.momentum * self.v[key] - self.alpha * grads[key]
            params[key] += self.v[key]
            
class AdaGrad(Optimizer):
    """Adaptive Gradient optimizer"""
    def __init__(self):
        self.type = ('optimizer', 'adagrad')
        self.h = None
    
    def update(self, params, grads, alpha=0.01):
        self.alpha = alpha
        if self.h is None:
            self.h = {}
            for key, val in params.items():
                self.h[key] = np.zeros_like(val)
                
        for key in params.keys():
            self.h[key] += grads[key] * grads[key]
            params[key] -= self.alpha * grads[key] / (np.sqrt(self.h[key]) + 1e-8)
            
class RMSprop(Optimizer):
    """Root Mean Square Propagation optimizer"""
    def __init__(self, rho=0.99):
        self.type = ('optimizer', 'RMSprop')
        self.rho = rho
        self.h = None
    
    def update(self, params, grads, alpha=0.01):
        self.alpha = alpha
        if self.h is None:
            self.h = {}
            for key, val in params.items():
                self.h[key] = np.zeros_like(val)
        
        for key in params.keys():
            self.h[key] += self.rho
            self.h[key] += (1 - self.rho) * grads[key] * grads[key]
            params[key] -= self.alpha * grads[key] / (np.sqrt(self.h[key]) + 1e-8)
            
class Adam(Optimizer):
    """Adapive Moment esimation optimizer"""
    def __init__(self, beta1=0.9, beta2=0.999):
        self.type = ('optimizer', 'adam')
        self.beta1 = beta1
        self.beta2 = beta2
        self.iter = 0
        self.m = None
        self.v = None

    def update(self,params, grads, alpha=0.001):
        self.alpha = alpha
        if self.m is None:
            self.m, self.v = {}, {}
            for key, val in params.items():
                self.m[key] = np.zeros_like(val)
                self.v[key] = np.zeros_like(val)
                
        self.iter += 1
        alpha_t = self.alpha * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter)
        
        for key in params.keys():
            self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key])
            self.v[key] += (1 - self.beta2) * (grads[key]**2 - self.v[key])
            
            params[key] -= alpha_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-8)

class Layer(ABC):
    """Layer Abstract Class"""
    @abstractmethod
    def __init__(self):
        self.type = None
    
    @abstractmethod
    def forward(self):
        pass
    
    @abstractmethod
    def backprop(self):
        pass
    
    def update(self, alpha):
        pass
    
class Dense(Layer):
    """Fully Connected Layer"""
    def __init__(self, pre_neurons, neurons, name=None, activation='relu',
                 optimizer='Adam'):
        self.W = np.random.randn(pre_neurons, neurons) * np.sqrt(1.0 / neurons)
        self.b = np.random.randn(neurons)
        self.params = {'Weight' : self.W, 'bias' : self.b}
        self.grads = {}
        
        self.name = name
        self.type = ('layer','dense')
        
        self.activation_dict = {'sigmoid' : Sigmoid(),
                                'relu' : ReLU(),
                                'softmax' : Softmax(),
                                'linear' : Linear(),
                                'elu' : ELU(),
                                'leakey_relu' : Leaky_ReLU(),
                                'prelu' : PReLU(),
                                'tanh' : tanh()}
        
        self.optimizer_dict = {'SGD' : SGD(),
                               'Momentum' : Momentum(),
                               'Adagrad' : AdaGrad(),
                               'RMSprop' : RMSprop(),
                               'Adam' : Adam()}
        
        self.activation = self.activation_dict[activation]
        self.optimizer = self.optimizer_dict[optimizer]
        
    def forward(self, x):
        self.x = x
        self.z = self.x @ self.W + self.b
        self.y = self.activation.forward(self.z)
        
        return self.y
    
    def backprop(self, dy):
        delta = self.activation.backprop(dy)

        self.dW = self.x.T @ delta
        self.db = np.sum(delta, axis=0)    
        self.dx = delta @ self.W.T
        
        self.grads = {'Weight' : self.dW, 'bias' : self.db}
        
        return self.dx
    
    def update(self, alpha):
        self.optimizer.update(self.params, self.grads, alpha)
        
    def __str__(self):
        """Print weight and bias"""
        return (f"{self.name}'s Layer W\n{self.W}\n\n{self.name}'s Layer b\n{self.b}")
    
class Dropout(Layer):
    """Dropout layer"""
    def __init__(self, dropout_ratio, name=None):
        self.dropout_ratio = dropout_ratio
        self.name = name
        self.type = ('layer','dropout')
        
    def forward(self, x, is_train):
        if is_train:
            rand = np.random.rand(*x.shape)
            self.dropout = np.where(rand > self.dropout_ratio, 1, 0)
            self.y = x * self.dropout
        else:
            self.y = (1-self.dropout_ratio)*x
        return self.y
        
    def backprop(self, dy):
        self.dx = dy * self.dropout
        return self.dx
    
    def __str__(self):
        """Print Layer feature"""
        return (f"{self.name}'s dropout ratio is {self.dropout_ratio}")

In [None]:
mnist = tf.keras.datasets.mnist
np.random.seed(26)

(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train[:4000].reshape(-1,28*28).astype(np.float32)/255
X_test = X_test[:1000].reshape(-1,28*28).astype(np.float32)/255

num_class = 10
y_train = y_train[:4000]
y_test = y_test[:1000]
y_train = OneHot(y_train, num_class)
y_test = OneHot(y_test, num_class)

#plt.imshow(X_train[0].reshape(28,28),cmap='gray')

n_in = 784  
n_mid1 = 200
n_mid2 = 100
n_out = 10

model = Model()
a = Dense(n_in, n_mid1, optimizer='Adam', activation = 'elu', name = 'D1')
model.add(a)
model.add(Dropout(0.5, name = 'P1'))
model.add(Dense(n_mid1, n_mid2, optimizer='Adam', activation = 'elu',name = 'D2'))
model.add(Dropout(0.5, name = 'P2'))
model.add(Dense(n_mid2, n_out, optimizer='Adam', activation = 'softmax',name = 'out'))
model.fit(X_train, y_train, X_test, y_test, batch_size=20, epochs=10, alpha=0.01)
