In [2]:
import numpy as np
import math
import random
import pandas as pd
import plotly.express as px
import multiprocessing


def sigmoid(z: np.array):
    return 1 / (1 + np.exp(-np.clip(z, -100, 100)))


def tanh(z: np.array):
    z = np.clip(z, -100, 100)
    expZ = np.exp(z)
    expZ_minus = np.exp(-z)
    return (expZ - expZ_minus) / (expZ + expZ_minus)


def reLU(z: np.array):
    return np.maximum(0.0, z)


def leakyReLU(z: np.array):
    return np.maximum(0.01 * z, z)

In [62]:
class Model_Old:
    class Layer:
        def __init__(self, layerHeight, prevLayer, nextLayer, activationFunc="sigmoid"):
            self.layerHeight = layerHeight
            self.w = np.random.randn(layerHeight, prevLayer.layerHeight) * 0.01
            self.b = np.zeros((layerHeight, 1))
            self.dw = np.zeros((layerHeight, prevLayer.layerHeight))
            self.db = np.zeros((layerHeight, 1))
            self.a = None
            if activationFunc == "sigmoid":
                self.activation = sigmoid
                self.da_dz = lambda a: np.multiply(a, (1 - a))
            elif activationFunc == "tanh":
                self.activation = np.tanh
                self.da_dz = lambda a: (1 - np.square(a))
            elif activationFunc == "relu":
                self.activation = reLU
                self.da_dz = lambda a: np.where(a > 0, 1,
                                                0)  #just taking advantage of the fact a = z for positive and a = 0 for negative
            elif activationFunc == "leaky_relu":
                self.activation = leakyReLU
                self.da_dz = lambda a: np.where(a > 0, 1, 0.01)
            self.prevLayer = prevLayer if prevLayer else None
            self.nextLayer = nextLayer if nextLayer else None

        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.a = (self.activation((self.w @ a_Prev) + self.b) *
                      (np.random.random((self.layerHeight, a_Prev.shape[1])) < keepActiveProb))
            self.a /= keepActiveProb
            # self.a = self.activation((self.w @ a_Prev) + self.b)
            return self.nextLayer.predict(self.a, keepActiveProb)

        def propagate(self, dz_Next: np.array, trainingStep):
            da = self.nextLayer.w.T @ dz_Next
            dz = self.da_dz(self.a) * da
            self.dw = (0.9 * self.dw) + 0.1 * ((dz @ self.prevLayer.a.T) / dz.shape[1])
            self.db = (0.9 * self.db) + 0.1 * np.mean(dz, axis=1, keepdims=True)
            self.prevLayer.propagate(dz, trainingStep)
            self.w -= trainingStep * self.dw
            self.b -= trainingStep * self.db

    class Head(Layer):
        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.a = sigmoid((self.w @ a_Prev) + self.b)
            return self.a

        def propagate(self, realY, trainingStep):
            dz = self.a - realY
            self.dw = (0.9 * self.dw) + 0.1 * ((dz @ self.prevLayer.a.T) / dz.shape[1])
            self.db = (0.9 * self.db) + 0.1 * np.mean(dz, axis=1, keepdims=True)
            self.prevLayer.propagate(dz, trainingStep)
            self.w -= trainingStep * self.dw
            self.b -= trainingStep * self.db

    class Input(Layer):
        def __init__(self, layerHeight, nextLayer):
            self.layerHeight = layerHeight
            self.a = None
            self.nextLayer = nextLayer if nextLayer else None

        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.a = a_Prev
            return self.nextLayer.predict(self.a, keepActiveProb)

        def propagate(self, realY, trainingStep):
            return

    """activationFunc = "sigmoid" | "tanh" | "relu" | "leaky_relu" """

    def __init__(self, xSize, layers, activationFunc="sigmoid"):
        self.head = None

        def createLayer(prevLayer, depth):
            if depth >= len(layers):
                layer = self.Head(1, prevLayer, None)
                self.head = layer
                return layer
            else:
                layer = self.Layer(layers[depth], prevLayer, None, activationFunc=activationFunc)
                layer.nextLayer = createLayer(layer, depth + 1)
                return layer

        self.input = self.Input(xSize, None)
        self.input.nextLayer = createLayer(self.input, 0)
        self.trainingData = pd.DataFrame([], columns=['Name', 'Iteration', 'Value'])

    def predict(self, a: np.array, keepActiveProb=1):
        return self.input.predict(a, keepActiveProb)

    def train(self, x: np.array, y: np.array, trainingStep, trainingIterations, debug=False,
              debugStep=100, x_dev=None, y_dev=None, keepActiveProb=1):
        if debug:
            lastW = list()
            lastB = list()
            currentLayer = self.input.nextLayer
            while currentLayer:
                lastW.append(currentLayer.w.copy())
                lastB.append(currentLayer.b.copy())
                currentLayer = currentLayer.nextLayer
        for tr_i in range(trainingIterations):
            self.predict(x, keepActiveProb)
            self.head.propagate(y, trainingStep)
            if x_dev is not None and tr_i % debugStep == 0:
                self.predict(x_dev)
                self.trainingData.loc[len(self.trainingData)] = ('ErrorDev', tr_i // debugStep, -np.mean(
                    (y_dev * np.log(np.clip(self.head.a, 0.001, 1))) + (
                            (1 - y_dev) * np.log(np.clip(1 - self.head.a, 0.001, 1)))))
                self.trainingData.loc[len(self.trainingData)] = (
                    'PrecisionDev', tr_i // debugStep, np.mean((y_dev == (self.head.a > 0.5)).astype(int)))
            if tr_i % debugStep == 0:
                self.predict(x)
                self.trainingData.loc[len(self.trainingData)] = ('Error', tr_i // debugStep, -np.mean(
                    (y * np.log(np.clip(self.head.a, 0.001, 1))) + (
                            (1 - y) * np.log(np.clip(1 - self.head.a, 0.001, 1)))))
                self.trainingData.loc[len(self.trainingData)] = (
                    'Precision', tr_i // debugStep, np.mean((y == (self.head.a > 0.5)).astype(int)))


In [40]:
class Model:
    class Layer:
        def __init__(self, layerHeight, prevLayer, nextLayer, activationFunc="sigmoid"):
            self.layerHeight = layerHeight
            self.w = np.random.randn(layerHeight, prevLayer.layerHeight) * np.sqrt(1 / prevLayer.layerHeight)
            self.b = np.zeros((layerHeight, 1))
            self.v_dw = np.zeros((layerHeight, prevLayer.layerHeight))
            self.v_db = np.zeros((layerHeight, 1))
            self.s_dw = np.zeros((layerHeight, prevLayer.layerHeight))
            self.s_db = np.zeros((layerHeight, 1))
            self.t = 1
            self.a = None
            self.d = None
            if activationFunc == "sigmoid":
                self.activation = sigmoid
                self.da_dz = lambda a: np.multiply(a, (1 - a))
            elif activationFunc == "tanh":
                self.activation = np.tanh
                self.da_dz = lambda a: (1 - np.square(a))
            elif activationFunc == "relu":
                self.activation = reLU
                self.da_dz = lambda a: np.where(a > 0, 1,
                                                0)  #just taking advantage of the fact a = z for positive and a = 0 for negative
            elif activationFunc == "leaky_relu":
                self.activation = leakyReLU
                self.da_dz = lambda a: np.where(a > 0, 1, 0.01)
            self.prevLayer = prevLayer if prevLayer else None
            self.nextLayer = nextLayer if nextLayer else None

        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.d = np.random.random((self.layerHeight, a_Prev.shape[1])) < keepActiveProb
            self.a = self.activation((self.w @ a_Prev) + self.b) * self.d
            self.a /= keepActiveProb
            # self.a = self.activation((self.w @ a_Prev) + self.b)
            return self.nextLayer.predict(self.a, keepActiveProb)

        def propagate(self, dz_Next: np.array, trainingStep: float, l2_lambda=0):
            da = (self.nextLayer.w.T @ dz_Next) * self.d
            dz = self.da_dz(self.a) * da
            dw = ((dz @ self.prevLayer.a.T) / dz.shape[1]) + ((l2_lambda / dz.shape[1]) * self.w)
            db = np.mean(dz, axis=1, keepdims=True)

            beta1 = 0.9
            beta2 = 0.99
            self.v_dw = (beta1 * self.v_dw) + ((1 - beta1) * dw)
            self.v_db = (beta1 * self.v_db) + ((1 - beta1) * db)
            self.s_dw = (beta2 * self.s_dw) + ((1 - beta2) * np.square(dw))
            self.s_db = (beta2 * self.s_db) + ((1 - beta2) * np.square(db))
            v_dw_cor = self.v_dw / (1 - (beta1 ** self.t))
            v_db_cor = self.v_db / (1 - (beta1 ** self.t))
            s_dw_cor = self.s_dw / (1 - (beta2 ** self.t))
            s_db_cor = self.s_db / (1 - (beta2 ** self.t))

            self.prevLayer.propagate(dz, trainingStep, l2_lambda)
            self.w -= trainingStep * (v_dw_cor / (np.sqrt(s_dw_cor) + (10 ** -8)))
            self.b -= trainingStep * (v_db_cor / (np.sqrt(s_db_cor) + (10 ** -8)))
            self.t += 1

    class Head(Layer):
        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.a = sigmoid((self.w @ a_Prev) + self.b)
            return self.a

        def propagate(self, realY, trainingStep, l2_lambda=0):
            dz = self.a - realY
            dw = ((dz @ self.prevLayer.a.T) / dz.shape[1]) + ((l2_lambda / dz.shape[1]) * self.w)
            db = np.mean(dz, axis=1, keepdims=True)

            beta1 = 0.9
            beta2 = 0.999
            self.v_dw = (beta1 * self.v_dw) + ((1 - beta1) * dw)
            self.v_db = (beta1 * self.v_db) + ((1 - beta1) * db)
            self.s_dw = (beta2 * self.s_dw) + ((1 - beta2) * np.square(dw))
            self.s_db = (beta2 * self.s_db) + ((1 - beta2) * np.square(db))
            v_dw_cor = self.v_dw / (1 - (beta1 ** self.t))
            v_db_cor = self.v_db / (1 - (beta1 ** self.t))
            s_dw_cor = self.s_dw / (1 - (beta2 ** self.t))
            s_db_cor = self.s_db / (1 - (beta2 ** self.t))

            self.prevLayer.propagate(dz, trainingStep)
            self.w -= trainingStep * (v_dw_cor / (np.sqrt(s_dw_cor) + (10 ** -8)))
            self.b -= trainingStep * (v_db_cor / (np.sqrt(s_db_cor) + (10 ** -8)))
            self.t += 1

    class Input(Layer):
        def __init__(self, layerHeight, nextLayer):
            self.layerHeight = layerHeight
            self.a = None
            self.w = np.identity(self.layerHeight)
            self.b = np.zeros((layerHeight, 1))
            self.nextLayer = nextLayer if nextLayer else None

        def predict(self, a_Prev: np.array, keepActiveProb: float):
            self.a = (self.w @ a_Prev) + self.b
            return self.nextLayer.predict(self.a, keepActiveProb)

        def propagate(self, realY, trainingStep, l2_lambda=0):
            return

    def __init__(self, xSize, layers, activationFunc="sigmoid"):
        """activationFunc = "sigmoid" | "tanh" | "relu" | "leaky_relu" """
        self.head = None

        def createLayer(prevLayer, depth):
            if depth >= len(layers):
                layer = self.Head(1, prevLayer, None)
                self.head = layer
                return layer
            else:
                layer = self.Layer(layers[depth], prevLayer, None, activationFunc=activationFunc)
                layer.nextLayer = createLayer(layer, depth + 1)
                return layer

        self.input = self.Input(xSize, None)
        self.input.nextLayer = createLayer(self.input, 0)
        self.trainingData = pd.DataFrame([], columns=['Name', 'Iteration', 'Value'])

    def predict(self, a: np.array, keepActiveProb=1.0):
        return self.input.predict(a, keepActiveProb)

    def train(self, X: np.array, Y: np.array, trainingStep, trainingIterations,
              debugStep=100, x_dev=None, y_dev=None, epochSize=1024, keepActiveProb=1.0, l2_reg_lambda=0):
        """
        epochSize ~ 128 | 256 | 512 \n
        keepActiveProb ~ ratio of active neurons {>0.8} \n
        l2_reg_lambda ~ weight decay coefficient {<2.0} \n
        """
        trainSetMean = np.mean(X, axis=1, keepdims=True)
        trainSetR2 = np.mean(np.square(X - trainSetMean), axis=1, keepdims=True)
        self.input.w = self.input.w / np.sqrt(trainSetR2)
        self.input.b = -trainSetMean / np.sqrt(trainSetR2)
        epochs = X.shape[1] // epochSize
        display('epochs Amount: ' + str(epochs))
        for tr_i in range(trainingIterations):
            perm_indices = np.random.permutation(X.shape[1])
            x = X[:, perm_indices]
            y = Y[:, perm_indices]
            for e in range(epochs):
                self.predict(x[:, epochSize * e: epochSize * (e + 1)], keepActiveProb)
                self.head.propagate(y[:, epochSize * e: epochSize * (e + 1)], trainingStep, l2_reg_lambda)
            if x_dev is not None and tr_i % debugStep == 0:
                self.predict(x_dev)
                self.trainingData.loc[len(self.trainingData)] = ('ErrorDev', tr_i // debugStep, -np.mean(
                    (y_dev * np.log(np.clip(self.head.a, 0.001, 1))) + (
                            (1 - y_dev) * np.log(np.clip(1 - self.head.a, 0.001, 1)))))
                self.trainingData.loc[len(self.trainingData)] = (
                    'PrecisionDev', tr_i // debugStep, np.mean((y_dev == (self.head.a > 0.5)).astype(int)))
            if tr_i % debugStep == 0:
                self.predict(x)
                self.trainingData.loc[len(self.trainingData)] = ('Error', tr_i // debugStep, -np.mean(
                    (y * np.log(np.clip(self.head.a, 0.001, 1))) + (
                            (1 - y) * np.log(np.clip(1 - self.head.a, 0.001, 1)))))
                self.trainingData.loc[len(self.trainingData)] = (
                    'Precision', tr_i // debugStep, np.mean((y == (self.head.a > 0.5)).astype(int)))


In [80]:
#Prepare Training Set
trainCasesAmount = 100000
digitAmount = 10
x = np.zeros((digitAmount, trainCasesAmount), dtype=float)
y = np.zeros((1, trainCasesAmount), dtype=float)
for j in range(trainCasesAmount):
    if random.random() < 0.5:
        startI = min(int(-10 * (random.random() - 1)), 8)
        x[:startI, j] = [0]
        for i in range(startI, digitAmount):
            x[i, j] = np.random.randint(0 if i > startI else 1, 10, 1)
        ch = random.randint(startI, 9)
        x[ch, j] = x[-(ch - startI) - 1, j] + 1 if x[-(ch - startI) - 1, j] < 9 else x[-(ch - startI) - 1, j] - 1
        y[0, j] = 0
    else:
        startI = min(int(-10 * (random.random() - 1)), 9)
        for i in range(startI, startI + ((digitAmount - startI) // 2) + 1):
            x[i, j] = np.random.randint(0 if i > startI else 1, 10, 1)
            x[-(i - startI) - 1, j] = x[i, j]
        y[0, j] = 1


Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)


Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)



In [81]:
#Prepare Dev Set
devCasesAmount = 1000
digitAmount = 10
x_dev = np.zeros((digitAmount, trainCasesAmount), dtype=float)
y_dev = np.zeros((1, trainCasesAmount), dtype=float)
for j in range(trainCasesAmount):
    if random.random() < 0.5:
        startI = min(int(-10 * (random.random() - 1)), 8)
        x_dev[:startI, j] = [0]
        for i in range(startI, digitAmount):
            x_dev[i, j] = np.random.randint(0 if i > startI else 1, 10, 1)
        ch = random.randint(startI, 9)
        x_dev[ch, j] = x_dev[-(ch - startI) - 1, j] + 1 if x_dev[-(ch - startI) - 1, j] < 9 else x_dev[-(
                    ch - startI) - 1, j] - 1
        y_dev[0, j] = 0
    else:
        startI = min(int(-10 * (random.random() - 1)), 9)
        for i in range(startI, startI + ((digitAmount - startI) // 2) + 1):
            x_dev[i, j] = np.random.randint(0 if i > startI else 1, 10, 1)
            x_dev[-(i - startI) - 1, j] = x_dev[i, j]
        y_dev[0, j] = 1


Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)


Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)



In [84]:
np.random.seed(69)
model0 = Model(10, [20, 20], activationFunc="tanh")
model0.train(x, y, 0.01, 101, x_dev=x_dev, y_dev=y_dev, epochSize=512, debugStep=10)

'epochs Amount: 195'

In [70]:
np.random.seed(69)
model0_minus = Model_Old(10, [20, 20], activationFunc="tanh")
model0_minus.train(x, y, 1, 2001, x_dev=x_dev, y_dev=y_dev, debugStep=10)

In [74]:
np.random.seed(69)
model1 = Model(10, [5, 5, 5, 5, 5, 5, 5, 5, 5, 5], activationFunc="tanh")
model1.train(x, y, 0.5, 100000, 1, devSet=True, x_dev=x_dev, y_dev=y_dev)

In [66]:
np.random.seed(69)
model2 = Model(10, [10, 10], activationFunc="tanh")
model2.train(x, y, 0.1, 200000, 0.95, devSet=True, x_dev=x_dev, y_dev=y_dev)

In [85]:
px.line(model0.trainingData, x='Iteration', y='Value', color='Name')

In [72]:
px.line(model0_minus.trainingData, x='Iteration', y='Value', color='Name')

In [86]:
inp = input()
while inp != 'stop':
    if len(inp) < 10:
        inp = "0" * (10 - len(inp)) + inp
    print(inp + ' is ' + str(model0.predict(np.array([[int(i)] for i in inp])).item()))
    inp = input()

0000012321 is 0.9990021579224168
0001230321 is 0.9997475021556602
0000123321 is 0.9719768613909314
0000987789 is 0.99987919977332
0009877789 is 0.9999456915073768
0009871789 is 0.9998252622405971
0000123345 is 4.192842101716593e-11
0000014326 is 0.00010593673523073125


In [87]:
model0.predict(x)

array([[1.01927915e-02, 2.88419880e-04, 2.98866777e-04, ...,
        9.99264029e-01, 6.86984466e-04, 9.99999825e-01]])