<a href="https://colab.research.google.com/github/ykitaguchi77/ErrorDiffusion/blob/main/MNIST_ErrorDiffusion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Error Diffusion MNIST**

https://qiita.com/pocokhc/items/f7ab56051bb936740b8f

In [13]:

import math
import random

random.seed(10)  # 乱数のシードを設定


def sign(x):  # 符号関数の定義
    if x == 0:
        return 0
    return 1 if x > 0 else -1


def relu(x):  # ReLU関数の定義
    return x if x > 0 else 0


def relu_derivative(x):  # ReLU関数の導関数の定義
    return 1 if x > 0 else 0


def sigmoid(x, u0=0.4):  # シグモイド関数の定義
    return 1 / (1 + math.exp(-2 * x / u0))


def sigmoid_derivative(x):  # シグモイド関数の導関数の定義
    return sigmoid(x) * (1 - sigmoid(x))


def linear(x):  # 線形関数の定義
    return x


def linear_derivative(x):  # 線形関数の導関数の定義
    return 1


class Neuron:  # ニューロンクラスの定義
    def __init__(
        self,
        in_neurons: list["Neuron"],  # 入力ニューロンのリスト
        ntype: str,  # ニューロンのタイプ（"p": positive, "n": negative）
        alpha: float = 0.8,  # 学習率
        activation=sigmoid,  # 活性化関数
        activation_derivative=sigmoid_derivative,  # 活性化関数の導関数
    ) -> None:
        self.ntype = ntype
        self.alpha = alpha
        self.activation = activation
        self.activation_derivative = activation_derivative

        # --- 重みの初期化
        self.weights = []
        for n in in_neurons:
            if ntype == "p":
                if n.ntype == "p":
                    ope = 1
                else:
                    ope = -1
            else:
                if n.ntype == "p":
                    ope = -1
                else:
                    ope = 1
            self.weights.append(random.random() * ope)

        # --- 演算子の設定
        self.operator = 1 if ntype == "p" else -1
        self.weights_operator = [n.operator for n in in_neurons]

        # --- 更新インデックスの設定
        self.upper_idx_list = []
        self.lower_idx_list = []
        for i, n in enumerate(in_neurons):
            if n.ntype == "p":
                self.upper_idx_list.append(i)
            else:
                self.lower_idx_list.append(i)

    def forward(self, x):  # 順伝播の計算
        assert len(self.weights) == len(x)
        y = [x[i] * self.weights[i] for i in range(len(self.weights))]
        y = sum(y)
        self.prev_in = x
        self.prev_out = y
        y = self.activation(y)
        return y

    def update_weight(self, delta_out, direct: str):  # 重みの更新
        grad = self.activation_derivative(abs(self.prev_out))

        if direct == "upper":
            indices = self.upper_idx_list
        else:
            indices = self.lower_idx_list

        for idx in indices:
            _old_w = self.weights[idx]
            delta = self.alpha * self.prev_in[idx]
            delta *= grad
            delta *= delta_out * self.operator * self.weights_operator[idx]
            self.weights[idx] += delta

            # --- デバッグ用の出力
            s = f"{idx:2d}"
            s += f", ot_in {self.prev_in[idx]:5.2f}"
            s += f", f'({self.prev_out:5.2f})={grad:5.2f}"
            s += f", del_ot {delta_out:5.2f}"
            s += f", d {delta:6.3f}"
            s += f", {self.operator:2d} {self.weights_operator[idx]:2d}"
            s += f", w {_old_w:5.2f} -> {self.weights[idx]:5.2f}"
            # print(s)

    def __str__(self):  # ニューロンの情報を文字列で返す
        s = f"{self.ntype} {self.operator:2d}"
        arr = []
        for i in range(len(self.weights)):
            o = "+" if i in self.upper_idx_list else "-"
            arr.append(f"{self.weights[i]:6.3f}({self.weights_operator[i]:2d},{o})")
        s += " [" + ", ".join(arr) + "]"
        return s


class ThreeLayerModel:  # 3層ニューラルネットワークモデルの定義
    def __init__(
        self,
        input_num: int,  # 入力層のニューロン数
        hidden_num: int,  # 中間層のニューロン数
        alpha: float = 0.8,  # 学習率
        beta: float = 0.8,  # バイアス
    ) -> None:
        self.beta = beta

        # 中間層のバイアスニューロン
        hd_p = Neuron([], "p")
        hd_n = Neuron([], "n")

        # 入力層のニューロン
        inputs: list[Neuron] = []
        for i in range(input_num):
            inputs.append(Neuron([], "p"))
            inputs.append(Neuron([], "n"))

        # 中間層のニューロン
        self.hidden_neurons: list[Neuron] = []
        for i in range(hidden_num):
            self.hidden_neurons.append(
                Neuron(
                    [hd_p, hd_n] + inputs,
                    ntype=("p" if i % 2 == 1 else "n"),
                    alpha=alpha,
                    activation=sigmoid,
                    activation_derivative=sigmoid_derivative,
                )
            )

        # 出力層のニューロン
        self.out_neuron = Neuron(
            [hd_p, hd_n] + self.hidden_neurons,
            "p",
            alpha=alpha,
            activation=sigmoid,
            activation_derivative=sigmoid_derivative,
        )

    def forward(self, inputs):  # 順伝播の計算
        # 入力層
        x = []
        for n in inputs:
            x.append(n)  # p
            x.append(n)  # n

        # 中間層
        x = [h.forward([self.beta, self.beta] + x) for h in self.hidden_neurons]

        # 出力層
        x = self.out_neuron.forward([self.beta, self.beta] + x)

        return x

    def train(self, inputs, target):  # 学習
        x = self.forward(inputs)

        # --- 重みの更新（ED法）
        diff = target - x
        if diff > 0:
            direct = "upper"
        else:
            direct = "lower"
            diff = -diff
        self.out_neuron.update_weight(diff, direct)
        for h in self.hidden_neurons:
            h.update_weight(diff, direct)

        return diff


def main_one():  # 恒等関数の学習
    model = ThreeLayerModel(1, hidden_num=1)

    # --- 学習ループ
    dataset = [
        [0, 1],
        [1, 1],
    ]
    for i in range(100):
        x, target = dataset[random.randint(0, len(dataset)) - 1]
        metric = model.train([x], target)

        # --- 予測
        y = model.forward([x])
        print(f"{i} in{x:5.2f} -> {y:5.2f}, target {target:5.2f}, metric {metric:5.2f}")

    print("--- result ---")
    for x, target in dataset:
        y = model.forward([x])
        print(f"{x:5.2f} -> {y:5.2f}, target {target:5.2f}")

    # --- 最終的な重み
    print("--- weights ---")
    print(model.out_neuron)
    for n in model.hidden_neurons:
        print(n)


def main_not():  # NOT関数の学習
    model = ThreeLayerModel(1, hidden_num=2)

    # --- 学習ループ
    dataset = [
        [0, 1],
        [1, 0],
    ]
    for i in range(100):
        x, target = dataset[random.randint(0, len(dataset)) - 1]
        metric = model.train([x], target)

        # --- 予測
        y = model.forward([x])
        print(f"{i} in{x:5.2f} -> {y:5.2f}, target {target:5.2f}, metric {metric:5.2f}")

    print("--- result ---")
    for x, target in dataset:
        y = model.forward([x])
        print(f"{x:5.2f} -> {y:5.2f}, target {target:5.2f}")

    # --- 最終的な重み
    print("--- weights ---")
    print(model.out_neuron)
    for n in model.hidden_neurons:
        print(n)


def main_xor():  # XOR関数の学習
    model = ThreeLayerModel(2, hidden_num=16)

    # --- 学習ループ
    dataset = [
        [0, 0, 1.0],
        [1, 0, 0.0],
        [0, 1, 0.0],
        [1, 1, 1.0],
    ]
    for i in range(100):
        x1, x2, target = dataset[random.randint(0, len(dataset)) - 1]
        metric = model.train([x1, x2], target)

        # --- 予測
        y = model.forward([x1, x2])
        print(f"{i} in[{x1:5.2f},{x2:5.2f}] -> {y:5.2f}, target {target:5.2f}, metric {metric:5.2f}")

    print("--- result ---")
    for x1, x2, target in dataset:
        y = model.forward([x1, x2])
        print(f"[{x1:5.2f},{x2:5.2f}] -> {y:5.2f}, target {target:5.2f}")

    # --- 最終的な重み
    print("--- weights ---")
    print(model.out_neuron)
    for n in model.hidden_neurons:
        print(n)


if __name__ == "__main__":
    main_one()  # 恒等関数の学習を実行
    main_not()  # NOT関数の学習を実行
    main_xor()  # XOR関数の学習を実行

0 in 1.00 ->  0.53, target  1.00, metric  0.58
1 in 1.00 ->  0.61, target  1.00, metric  0.47
2 in 1.00 ->  0.67, target  1.00, metric  0.39
3 in 0.00 ->  0.59, target  1.00, metric  0.54
4 in 0.00 ->  0.68, target  1.00, metric  0.41
5 in 1.00 ->  0.81, target  1.00, metric  0.20
6 in 1.00 ->  0.82, target  1.00, metric  0.19
7 in 1.00 ->  0.83, target  1.00, metric  0.18
8 in 0.00 ->  0.77, target  1.00, metric  0.27
9 in 1.00 ->  0.86, target  1.00, metric  0.15
10 in 0.00 ->  0.80, target  1.00, metric  0.22
11 in 1.00 ->  0.87, target  1.00, metric  0.13
12 in 1.00 ->  0.88, target  1.00, metric  0.13
13 in 0.00 ->  0.83, target  1.00, metric  0.19
14 in 0.00 ->  0.84, target  1.00, metric  0.17
15 in 0.00 ->  0.85, target  1.00, metric  0.16
16 in 0.00 ->  0.86, target  1.00, metric  0.15
17 in 1.00 ->  0.90, target  1.00, metric  0.10
18 in 0.00 ->  0.87, target  1.00, metric  0.14
19 in 0.00 ->  0.87, target  1.00, metric  0.13
20 in 1.00 ->  0.91, target  1.00, metric  0.09
21

In [17]:
import math
import random
import numpy as np
from tensorflow.keras.datasets import mnist
import tensorflow as tf

# GPUを使用するための設定
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

random.seed(10)  # 乱数のシードを設定


def sign(x):  # 符号関数の定義
    if x == 0:
        return 0
    return 1 if x > 0 else -1


def relu(x):  # ReLU関数の定義
    return x if x > 0 else 0


def relu_derivative(x):  # ReLU関数の導関数の定義
    return 1 if x > 0 else 0


def sigmoid(x, u0=0.4):  # シグモイド関数の定義
    return 1 / (1 + math.exp(-2 * x / u0))


def sigmoid_derivative(x):  # シグモイド関数の導関数の定義
    return sigmoid(x) * (1 - sigmoid(x))


def linear(x):  # 線形関数の定義
    return x


def linear_derivative(x):  # 線形関数の導関数の定義
    return 1


class Neuron:  # ニューロンクラスの定義
    def __init__(
        self,
        in_neurons: list["Neuron"],  # 入力ニューロンのリスト
        ntype: str,  # ニューロンのタイプ（"p": positive, "n": negative）
        alpha: float = 0.8,  # 学習率
        activation=sigmoid,  # 活性化関数
        activation_derivative=sigmoid_derivative,  # 活性化関数の導関数
    ) -> None:
        self.ntype = ntype
        self.alpha = alpha
        self.activation = activation
        self.activation_derivative = activation_derivative

        # --- 重みの初期化
        self.weights = []
        self.weights_operator = []
        self.upper_idx_list = []
        self.lower_idx_list = []
        for i, n in enumerate(in_neurons):
            if n.ntype == "p":
                if ntype == "p":
                    ope = 1
                else:
                    ope = -1
            else:
                if ntype == "p":
                    ope = -1
                else:
                    ope = 1
            self.weights.append(random.random() * ope)
            self.weights_operator.append(n.operator)
            if n.ntype == "p":
                self.upper_idx_list.append(i)
            else:
                self.lower_idx_list.append(i)

        # --- 演算子の設定
        self.operator = 1 if ntype == "p" else -1
        self.weights_operator = [n.operator for n in in_neurons]

        # --- 更新インデックスの設定
        self.upper_idx_list = []
        self.lower_idx_list = []
        for i, n in enumerate(in_neurons):
            if n.ntype == "p":
                self.upper_idx_list.append(i)
            else:
                self.lower_idx_list.append(i)

    def forward(self, x):  # 順伝播の計算
        assert len(self.weights) == len(x), f"Weight length ({len(self.weights)}) does not match input length ({len(x)})"
        y = [x[i] * self.weights[i] for i in range(len(self.weights))]
        y = sum(y)
        self.prev_in = x
        self.prev_out = y
        y = self.activation(y)
        return y

    def update_weight(self, delta_out, direct: str):  # 重みの更新
        grad = self.activation_derivative(abs(self.prev_out))

        if direct == "upper":
            indices = self.upper_idx_list
        else:
            indices = self.lower_idx_list

        for idx in indices:
            _old_w = self.weights[idx]
            delta = self.alpha * self.prev_in[idx]
            delta *= grad
            delta *= delta_out * self.operator * self.weights_operator[idx]
            self.weights[idx] += delta

            # --- デバッグ用の出力
            s = f"{idx:2d}"
            s += f", ot_in {self.prev_in[idx]:5.2f}"
            s += f", f'({self.prev_out:5.2f})={grad:5.2f}"
            s += f", del_ot {delta_out:5.2f}"
            s += f", d {delta:6.3f}"
            s += f", {self.operator:2d} {self.weights_operator[idx]:2d}"
            s += f", w {_old_w:5.2f} -> {self.weights[idx]:5.2f}"
            # print(s)

    def __str__(self):  # ニューロンの情報を文字列で返す
        s = f"{self.ntype} {self.operator:2d}"
        arr = []
        for i in range(len(self.weights)):
            o = "+" if i in self.upper_idx_list else "-"
            arr.append(f"{self.weights[i]:6.3f}({self.weights_operator[i]:2d},{o})")
        s += " [" + ", ".join(arr) + "]"
        return s


class ThreeLayerModel:  # 3層ニューラルネットワークモデルの定義
    def __init__(
        self,
        input_num: int,  # 入力層のニューロン数
        hidden_num: int,  # 中間層のニューロン数
        output_num: int,  # 出力層のニューロン数
        alpha: float = 0.8,  # 学習率
    ) -> None:
        # 入力層のニューロン
        inputs: list[Neuron] = []
        for i in range(input_num):
            inputs.append(Neuron([], "p"))

        # 中間層のニューロン
        prev_neurons = inputs
        self.hidden_neurons: list[Neuron] = []
        for i in range(hidden_num):
            self.hidden_neurons.append(
                Neuron(
                    prev_neurons,
                    ntype=("p" if i % 2 == 1 else "n"),
                    alpha=alpha,
                    activation=sigmoid,
                    activation_derivative=sigmoid_derivative,
                )
            )
            prev_neurons = [self.hidden_neurons[-1]]

        # 出力層のニューロン
        self.out_neurons: list[Neuron] = []
        for i in range(output_num):
            self.out_neurons.append(
                Neuron(
                    prev_neurons,
                    "p",
                    alpha=alpha,
                    activation=sigmoid,
                    activation_derivative=sigmoid_derivative,
                )
            )

    @tf.function
    def forward(self, inputs):  # 順伝播の計算
        # 入力層
        x = inputs

        # 中間層
        for h in self.hidden_neurons:
            x = h.forward(x)

        # 出力層
        outputs = []
        for n in self.out_neurons:
            output = n.forward(x)
            outputs.append(output)
        outputs = tf.concat(outputs, axis=0)

        return outputs

    @tf.function
    def train(self, inputs, targets):  # 学習
        with tf.GradientTape() as tape:
            outputs = self.forward(inputs)
            loss = tf.reduce_mean(tf.square(targets - outputs))

        # --- 重みの更新（ED法）
        for i in range(len(self.out_neurons)):
            diff = targets[i] - outputs[i]
            if diff > 0:
                direct = "upper"
            else:
                direct = "lower"
                diff = -diff
            self.out_neurons[i].update_weight(diff, direct)
            for h in self.hidden_neurons:
                h.update_weight(diff, direct)

        return loss

def main_mnist():  # MNISTの手描き文字の学習
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # 画像を0-1の範囲に正規化
    x_train = x_train.astype('float32') / 255
    x_test = x_test.astype('float32') / 255

    # 画像を1次元に変換
    x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
    x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

    # ラベルをone-hotエンコーディング
    y_train = np.eye(10)[y_train]
    y_test = np.eye(10)[y_test]

    model = ThreeLayerModel(input_num=784, hidden_num=128, output_num=10)

    # --- 学習ループ
    num_epochs = 10
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loss = 0
        for i in range(len(x_train)):
            inputs = x_train[i]
            targets = y_train[i]
            loss = model.train(inputs, targets)
            train_loss += loss
        train_loss /= len(x_train)
        print(f"Train Loss: {train_loss:.4f}")

        # --- テストデータでの評価
        correct = 0
        for i in range(len(x_test)):
            inputs = x_test[i]
            targets = y_test[i]
            outputs = model.forward(inputs)
            predicted = np.argmax(outputs)
            if predicted == np.argmax(targets):
                correct += 1
        accuracy = correct / len(x_test)
        print(f"Test Accuracy: {accuracy:.4f}")


if __name__ == "__main__":
    main_mnist()  # MNISTの手描き文字の学習を実行

Epoch 1/10


TypeError: in user code:

    File "<ipython-input-17-ee3664874aba>", line 201, in train  *
        outputs = self.forward(inputs)
    File "<ipython-input-17-ee3664874aba>", line 187, in forward  *
        x = h.forward(x)
    File "<ipython-input-17-ee3664874aba>", line 101, in forward  *
        y = self.activation(y)
    File "<ipython-input-17-ee3664874aba>", line 30, in sigmoid  *
        return 1 / (1 + math.exp(-2 * x / u0))

    TypeError: must be real number, not SymbolicTensor


In [None]:
import tensorflow as tf

model = tf.keras.models.Sequential(
    [
        tf.keras.layers.Input(shape=(28 * 28)),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(16, activation="sigmoid"),
        tf.keras.layers.Dense(1, activation="sigmoid"),
    ]
)


if __name__ == "__main__":
    main_one()
    main_not()
    main_xor()