<a href="https://colab.research.google.com/github/root2116/direct-feedback-alignment/blob/main/dfa.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import cupy as np
import torch
from torchvision import datasets, transforms

def softmax(x):
    # ベクトルxの各要素から最大値を引く（オーバーフロー防止のため）
    e_x = np.exp(x - np.max(x))
    # 要素ごとのexpの和で各要素を割り、確率として解釈できるようにする
    return e_x / e_x.sum(axis=0)


def initialize_parameters():
    np.random.seed(1)
    W1 = np.zeros((400, 784))
    b1 = np.zeros((400, 1))
    W2 = np.zeros((400, 400))
    b2 = np.zeros((400, 1))
    W3 = np.zeros((400, 400))
    b3 = np.zeros((400, 1))
    W4 = np.zeros((10, 400))
    b4 = np.zeros((10, 1))

    B1 = np.random.randn(400, 10) * 0.01
    B2 = np.random.randn(400, 10) * 0.01
    B3 = np.random.randn(400, 10) * 0.01

    parameters = {"W1": W1,
                    "b1": b1,
                    "W2": W2,
                    "b2": b2,
                    "W3": W3,
                    "b3": b3,
                    "W4": W4,
                    "b4": b4,
                    "B1": B1,
                    "B2": B2,
                    "B3": B3}

    return parameters


def forward_propagation(X, parameters):
    W1 = parameters["W1"]
    b1 = parameters["b1"]
    W2 = parameters["W2"]
    b2 = parameters["b2"]
    W3 = parameters["W3"]
    b3 = parameters["b3"]
    W4 = parameters["W4"]
    b4 = parameters["b4"]

    A1 = np.dot(W1, X) + b1
    H1 = np.tanh(A1)
    A2 = np.dot(W2, H1) + b2
    H2 = np.tanh(A2)
    A3 = np.dot(W3, H2) + b3
    H3 = np.tanh(A3)
    AY = np.dot(W4, H3) + b4
    Y_ = softmax(AY)

    cache = {"A1": A1,
            "H1": H1,
            "A2": A2,
            "H2": H2,
            "A3": A3,
            "H3": H3,
            "AY": AY,
            "Y_": Y_}

    return Y_, cache


def direct_feedback_alignment(parameters, cache, X, Y):
    # batch size
    N = X.shape[1]

    B1 = parameters["B1"]
    B2 = parameters["B2"]
    B3 = parameters["B3"]

    H1 = cache["H1"]
    H2 = cache["H2"]
    H3 = cache["H3"]
    Y_ = cache["Y_"]

    dY_ = Y_ - Y
    dW4 = (1 / N) * np.dot(dY_, H3.T)
    db4 = (1 / N) * np.sum(dY_, axis=1, keepdims=True)

    dH3 = np.dot(B3, dY_)
    dA3 = dH3 * (1 - np.power(H3, 2))
    dW3 = (1 / N) * np.dot(dA3, H2.T)
    db3 = (1 / N) * np.sum(dA3, axis=1, keepdims=True)

    dH2 = np.dot(B2, dY_)
    dA2 = dH2 * (1 - np.power(H2, 2))
    dW2 = (1 / N) * np.dot(dA2, H1.T)
    db2 = (1 / N) * np.sum(dA2, axis=1, keepdims=True)

    dH1 = np.dot(B1, dY_)
    dA1 = dH1 * (1 - np.power(H1, 2))
    dW1 = (1 / N) * np.dot(dA1, X.T)
    db1 = (1 / N) * np.sum(dA1, axis=1, keepdims=True)

    gradients = {"dW1": dW1,
                "db1": db1,
                "dW2": dW2,
                "db2": db2,
                "dW3": dW3,
                "db3": db3,
                "dW4": dW4,
                "db4": db4}

    return gradients



def update_parameters(parameters, grads, learning_rate=0.01):
    parameters["W1"] -= learning_rate * grads["dW1"]
    parameters["b1"] -= learning_rate * grads["db1"]
    parameters["W2"] -= learning_rate * grads["dW2"]
    parameters["b2"] -= learning_rate * grads["db2"]
    parameters["W3"] -= learning_rate * grads["dW3"]
    parameters["b3"] -= learning_rate * grads["db3"]
    parameters["W4"] -= learning_rate * grads["dW4"]
    parameters["b4"] -= learning_rate * grads["db4"]
    return parameters


def compute_loss(Y_, Y):

    N = Y.shape[1]
    loss = (-1 / N) * np.sum(np.multiply(Y, np.log(Y_)))
    loss = np.squeeze(loss)

    return loss


def model(train_loader, test_loader, learning_rate=0.01, num_epochs=100):
    parameters = initialize_parameters()
    for epoch in range(num_epochs):
        total_loss = 0
        total_correct = 0
        total_samples = 0
        for i, (images, labels) in enumerate(train_loader):
            X = images.view(-1, 28 * 28)
            X = np.asarray(X).T
            Y = np.eye(10)[labels]
            Y = np.asarray(Y).T

            labels_gpu = np.asarray(labels)

            Y_, cache = forward_propagation(X, parameters)
            loss = compute_loss(Y_, Y)
            grads = direct_feedback_alignment(parameters, cache, X, Y)
            parameters = update_parameters(parameters, grads, learning_rate)

            total_loss += loss
            total_correct += np.sum(np.argmax(Y_, axis=0) == labels_gpu)
            total_samples += labels_gpu.shape[0]

            if i % 100 == 99:
                print('Epoch {}, iteration {}, loss : {}, accuracy : {}'.format(epoch+1, i+1, total_loss / 100, total_correct/ total_samples))
                total_loss = 0
                total_correct = 0
                total_samples = 0

        total_loss = 0
        total_correct = 0
        total_samples = 0
        for i, (images, labels) in enumerate(test_loader):
            X = images.view(-1, 28 * 28)
            X = np.asarray(X).T
            Y = np.eye(10)[labels]
            Y = np.asarray(Y).T

            labels_gpu = np.asarray(labels)

            Y_, cache = forward_propagation(X, parameters)
            loss = compute_loss(Y_, Y)

            total_loss += loss
            total_correct += np.sum(np.argmax(Y_, axis=0) == labels_gpu)
            total_samples += labels_gpu.shape[0]

        print('Test data - After epoch {}, loss : {}, accuracy : {}'.format(epoch+1, total_loss / len(test_loader), total_correct / total_samples))


    return parameters


def predict(X, parameters):
    Y_, cache = forward_propagation(X, parameters)
    predictions = np.argmax(Y_, axis=0)
    return predictions


def compute_accuracy(predictions, labels):
    return np.sum(predictions == labels) / len(labels)

In [4]:
if __name__=="__main__":
    batch_size = 64

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

    trainset = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=True, transform=transform)
    testset = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)


    parameters = model(train_loader, test_loader, learning_rate=0.01, num_epochs=10)

Epoch 1, iteration 100, loss : 2.302463397341995, accuracy : 0.11140625
Epoch 1, iteration 200, loss : 2.302335049448346, accuracy : 0.10828125
Epoch 1, iteration 300, loss : 2.3019446357066555, accuracy : 0.1178125
Epoch 1, iteration 400, loss : 2.3017477650043316, accuracy : 0.11234375
Epoch 1, iteration 500, loss : 2.3012529358083698, accuracy : 0.1078125
Epoch 1, iteration 600, loss : 2.290384509385251, accuracy : 0.151875
Epoch 1, iteration 700, loss : 2.2307191252596006, accuracy : 0.25625
Epoch 1, iteration 800, loss : 2.059526245338341, accuracy : 0.30546875
Epoch 1, iteration 900, loss : 1.7976610747519128, accuracy : 0.3684375
Test data - After epoch 1, loss : 1.565549474651598, accuracy : 0.4549
Epoch 2, iteration 100, loss : 1.478846270592119, accuracy : 0.48953125
Epoch 2, iteration 200, loss : 1.288680916806956, accuracy : 0.560625
Epoch 2, iteration 300, loss : 1.1688561595504903, accuracy : 0.5896875
Epoch 2, iteration 400, loss : 1.066881414933913, accuracy : 0.64875
E