In [None]:
from collections.abc import Callable, Sequence, Iterator, Iterable
from dataclasses import dataclass
from typing import NamedTuple, Self, cast
from pathlib import Path
from struct import unpack
from itertools import product, chain, islice
from math import log10, exp, sqrt
from random import random, shuffle

## Matrix

In [None]:
class MatrixDim(NamedTuple):
    m: int
    n: int

    @property
    def size(self) -> int:
        return self.m * self.n

    @property
    def T(self) -> 'MatrixDim':
        return MatrixDim(m=self.n, n=self.m)

    @property
    def is_vector(self) -> bool:
        return self.m == 1 or self.n == 1

    def __str__(self) -> str:
        return f'{self.m} by {self.n}'

In [None]:
@dataclass(slots=True)
class Matrix[Numeric: (bool, int, float)]:
    __values: tuple[Numeric, ...]
    __dim: MatrixDim
    __is_transposed: bool
    __render_decimals: int

    def __init__(
        self,
        __values: tuple[Numeric, ...],
        /,
        dim: MatrixDim,
        *,
        transposed: bool = False,
        render_decimals: int = 0,
    ) -> None:
        self.__values = __values
        self.__dim = dim
        self.__is_transposed = transposed
        self.__render_decimals = render_decimals
        if len(__values) != dim.size:
            raise ValueError(
                f'Cannot interpret {len(__values)} values as {self.dim} matrix'
            )
    
    @property
    def dim(self) -> MatrixDim:
        return self.__dim if not self.__is_transposed else self.__dim.T

    @property
    def T(self) -> 'Matrix[Numeric]':
        return Matrix(
            self.__values,
            dim=self.__dim,
            transposed=(not self.__is_transposed),
            render_decimals=self.__render_decimals,
        )

    @property
    def as_vector(self) -> Iterator[Numeric]:
        if not self.dim.is_vector:
            raise ValueError(f'{self.dim} matrix is not a vector')
        return iter(self.__values)

    def apply(self, __func: Callable[[Numeric], Numeric]) -> 'Matrix[Numeric]':
        return Matrix(
            tuple(map(__func, self.__values)),
            dim=self.dim,
            transposed=False,
            render_decimals=self.__render_decimals,
        )

    def __repr__(self) -> str:
        fmt: str = f'{(
            int(log10(max(map(abs, self.__values)) or 1))
        )}>.{self.__render_decimals}f'
        return f'[{'\n'.join(
            f'{' ' * (i != 0)}[{' '.join(
                f'{' ' * int(self[i, j] >= 0)}{self[i, j]:{fmt}}'
                for j in range(self.dim.n)
            )}]'
            for i in range(self.dim.m)
        )}]'

    def __len__(self) -> int:
        match self.dim:
            case (1, l) | (l, 1):
                return l
        raise ValueError(f'{self.dim} matrix has no length, use matrix.dim instead')

    def __getitem__(self, index: tuple[int, int], /) -> Numeric:
        return self.__values[(
            index[self.__is_transposed] * (
                self.dim.m if self.__is_transposed else self.dim.n
            ) + index[1 - self.__is_transposed]
        )]

    def __add__(self, __other: 'Matrix[Numeric]', /) -> 'Matrix[Numeric]':
        if self.dim != __other.dim:
            raise ValueError(f'Cannot add {self.dim} and {__other.dim} matrices')
        return Matrix(
            tuple(
                self[i, j] + __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
            dim=self.dim,
            transposed=False,
            render_decimals=max(self.__render_decimals, __other.__render_decimals),
        )

    def __iadd__(self, __other: 'Matrix[Numeric]') -> Self:
        if self.dim != __other.dim:
            raise ValueError(f'Cannot add {self.dim} and {__other.dim} matrices')
        self.__values = cast(
            tuple[Numeric, ...],
            tuple(
                self[i, j] + __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
        )
        return self

    def __sub__(self, __other: 'Matrix[Numeric]', /) -> 'Matrix[Numeric]':
        if self.dim != __other.dim:
            raise ValueError(f'Cannot subtract {self.dim} and {__other.dim} matrices')
        return Matrix(
            tuple(
                self[i, j] - __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
            dim=self.dim,
            transposed=False,
            render_decimals=max(self.__render_decimals, __other.__render_decimals),
        )

    def __isub__(self, __other: 'Matrix[Numeric]') -> Self:
        if self.dim != __other.dim:
            raise ValueError(f'Cannot subtract {self.dim} and {__other.dim} matrices')
        self.__values = cast(
            tuple[Numeric, ...],
            tuple(
                self[i, j] - __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
        )
        return self

    def __mul__(self, __other: 'Matrix[Numeric]', /) -> 'Matrix[Numeric]':
        if self.dim != __other.dim:
            raise ValueError(
                f'Hadamard requires equal dims, got {self.dim} and {__other.dim}'
            )
        return Matrix(
            tuple(
                self[i, j] * __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
            dim=self.dim,
            render_decimals=self.__render_decimals,
        )

    def __imul__(self, __other: 'Matrix[Numeric]', /) -> Self:
        if self.dim != __other.dim:
            raise ValueError(
                f'Hadamard requires equal dims, got {self.dim} and {__other.dim}'
            )
        self.__values = cast(
            tuple[Numeric, ...],
            tuple(
                self[i, j] * __other[i, j] for i, j
                in product(range(self.dim.m), range(self.dim.n))
            ),
        )
        return self

    def __matmul__(self, __other: 'Matrix[Numeric]', /) -> 'Matrix[Numeric]':
        if self.dim.n != __other.dim.m:
            raise ValueError(f'Cannot multiply {self.dim} and {__other.dim} matrices')
        return Matrix(
            tuple(
                sum(self[i, k] * __other[k, j] for k in range(self.dim.n))
                for i, j in product(range(self.dim.m), range(__other.dim.n))
            ),
            dim=MatrixDim(m=self.dim.m, n=__other.dim.n),
            transposed=False,
            render_decimals=max(self.__render_decimals, __other.__render_decimals),
        )

    def __pow__(self, __power: int, /) -> 'Matrix[Numeric]':
        return self.apply(lambda x: x ** __power)

    def __eq__(self, __other: object, /) -> bool:
        return isinstance(__other, Matrix) and self.dim == __other.dim and all(
            self[i, j] == __other[i, j] for i, j
            in product(range(self.dim.m), range(self.dim.n))
        )

    def __ne__(self, __other: object, /) -> bool:
        return not self == __other

In [None]:
def matrix[Numeric: (bool, int, float)](
    __values: Sequence[Sequence[Numeric]], /, *, render_decimals: int = 0
) -> Matrix[Numeric]:
    dim: MatrixDim = MatrixDim(
        m=len(__values), n=len(__values[0]) if len(__values) > 0 else 0
    )
    if not all(len(__values[i]) == dim.n for i in range(len(__values))):
        raise ValueError('Matrix rows have different lengths')
    return Matrix(tuple(chain(*__values)), dim, render_decimals=render_decimals)

In [None]:
def vector[Numeric: (bool, int, float)](
    __values: Sequence[Numeric], /, *, render_decimals: int = 0
) -> Matrix[Numeric]:
    return matrix([__values], render_decimals=render_decimals).T

In [None]:
def zeros(dim: MatrixDim, *, render_decimals: int = 0) -> Matrix[float]:
    return Matrix((0.0,) * dim.size, dim=dim, render_decimals=render_decimals)

In [None]:
def serialize_matrix[Numeric: (bool, int, float)](matrix: Matrix[Numeric]) -> str:
    return f'{matrix.dim.m}*{matrix.dim.n}:{(
        ','.join(
            str(matrix[i, j]) for i, j in product(
                range(matrix.dim.m), range(matrix.dim.n)
            )
        )
    )}'

In [None]:
def deserialize_matrix[Numeric: (bool, int, float)](
    string: str, dtype: type[Numeric] = float, render_decimals: int = 0
) -> Matrix[Numeric]:
    dim_string, values_string = string.split(sep=':', maxsplit=1)
    dim: MatrixDim = MatrixDim(*map(int, dim_string.split(sep='*', maxsplit=1)))
    return Matrix(
        tuple(map(dtype, values_string.split(sep=','))),
        dim=dim,
        render_decimals=render_decimals,
    )

## Functions

In [None]:
@dataclass(slots=True)
class Function[**Ts, T]:
    __func: Callable[Ts, T]
    __ddx_func: Callable[Ts, T]

    def __init__(
        self, __func: Callable[Ts, T], __ddx_func: Callable[Ts, T]
    ) -> None:
        self.__func = __func
        self.__ddx_func = __ddx_func

    def __call__(self, x: T) -> T:
        return self.__func(x)

    @property
    def ddx(self) -> Callable[Ts, T]:
        return self.__ddx_func

In [None]:
type ActivationFunction = Function[[float], float]
type LossFunction = Function[[Matrix[float], Matrix[float]], Matrix[float]]

In [None]:
def sigmoid(x: float) -> float:
    return 1 / (1 + exp(-x))

In [None]:
class Activation:
    NONE: ActivationFunction = Function(lambda x: x, lambda _: 1)
    RELU: ActivationFunction = Function(lambda x: x * (x > 0), lambda x: x > 0)
    SIGMOID: ActivationFunction = Function(
        sigmoid, lambda x: sigmoid(x) * (1 - sigmoid(x))
    )

In [None]:
def softmax(v: Matrix[float]) -> Matrix[float]:
    exponents: list[float] = [exp(x - max(v.as_vector)) for x in v.as_vector]
    return vector([e / sum(exponents) for e in exponents])

In [None]:
class Loss:
    MSE: LossFunction = Function(
        lambda x, y: (y - x) ** 2, lambda x, y: (x - y).apply(lambda v: 2 * v)
    )
    SOFTMAX_CROSSENTROPY: LossFunction = Function(
        lambda z, y: (softmax(z) * y.apply(lambda t: -1.0 if t > 0 else 0.0)),
        lambda z, y: softmax(z) - y,
    )

## Layer

In [None]:
@dataclass(slots=True)
class LinearLayer:
    __w: Matrix[float]
    __b: Matrix[float]
    __activation: ActivationFunction
    __last_input: Matrix[float] | None
    __last_z: Matrix[float] | None
    __grad_w: Matrix[float]
    __grad_b: Matrix[float]

    def __init__(
        self,
        dim: MatrixDim,
        activation: ActivationFunction,
        render_decimals: int,
    ) -> None:
        limit: float = sqrt(6.0 / (dim.m + dim.n))
        self.__w = Matrix(
            [(2 * random() - 1) * limit for _ in range(dim.size)],
            dim=dim,
            render_decimals=2,
        )
        self.__b = vector(
            [0.0 for _ in range(dim.m)],
            render_decimals=render_decimals,
        )
        self.__activation = activation
        self.__last_input = None
        self.__last_z = None
        self.__zero_gradients()

    def __len__(self) -> int:
        return len(self.__b)

    @property
    def weights(self) -> Matrix[float]:
        return self.__w

    @weights.setter
    def weights(self, w: Matrix[float], /) -> None:
        assert self.__w.dim == w.dim
        self.__w = w

    @property
    def biases(self) -> Matrix[float]:
        return self.__b

    @biases.setter
    def biases(self, b: Matrix[float], /) -> None:
        assert self.__b.dim == b.dim
        self.__b = b

    def __zero_gradients(self) -> None:
        self.__grad_w = zeros(self.__w.dim, render_decimals=0)
        self.__grad_b = zeros(self.__b.dim, render_decimals=0)

    def accumulate_grads(self, upstream: Matrix[float]) -> Matrix[float]:
        if self.__last_input is None or self.__last_z is None:
            raise RuntimeError('feed_forward must be called before accumulate_grads')
        delta: Matrix[float] = upstream * self.__last_z.apply(self.__activation.ddx)
        self.__grad_w += (delta @ self.__last_input.T)
        self.__grad_b += delta
        return self.__w.T @ delta

    def feed_forward(self, input: Matrix[float]) -> Matrix[float]:
        self.__last_input = input
        self.__last_z = self.__w @ input + self.__b
        return self.__last_z.apply(self.__activation)

    def apply_grads(self, *, learning_rate: float, batch_size: int) -> None:
        scale: float = learning_rate / float(batch_size)
        self.__w -= self.__grad_w.apply(lambda g: scale * g)
        self.__b -= self.__grad_b.apply(lambda g: scale * g)
        self.__zero_gradients()

In [None]:
@dataclass(frozen=True, slots=True)
class LayerDescriptor:
    n: int
    activation: ActivationFunction

In [None]:
def layer(
    n: int, activation: ActivationFunction = Activation.NONE
) -> LayerDescriptor:
    return LayerDescriptor(n=n, activation=activation)

## Batching

In [None]:
def batches[T](
    iterable: Iterable[T], *, size: int, shuffled: bool = True
) -> Iterable[list[T]]:
    iterator: Iterator[T] = iter(iterable)
    while batch := list(islice(iterator, size)):
        if shuffled:
            shuffle(batch)
        yield batch

## Model

In [None]:
@dataclass(slots=True)
class Model[Answer]:
    __input_size: int
    __layers: list[LinearLayer]
    __loss: LossFunction
    __transform: Callable[[Matrix[float]], Answer]

    def __init__(
        self,
        input_size: int,
        layers: list[LayerDescriptor],
        loss: LossFunction,
        transform: Callable[[Matrix[float]], Answer],
        *,
        render_decimals: int = 2,
    ) -> None:
        self.__input_size = input_size
        self.__layers = [
            LinearLayer(
                dim=MatrixDim(d.n, d_prev.n),
                activation=d.activation,
                render_decimals=render_decimals,
            )
            for d_prev, d in zip(
                chain([LayerDescriptor(input_size, Activation.NONE)], layers), layers
            )
        ]
        self.__loss = loss
        self.__transform = transform

    def __repr__(self) -> str:
        return self.summary

    @property
    def summary(self) -> str:
        return f'Model: {self.__input_size} -> {(
            ' -> '.join(f'L({len(layer)})' for layer in self.__layers)
        )} -> {self.__transform.__name__}'

    def __feed_forward(self, input: Matrix[float]) -> Matrix[float]:
        result: Matrix[float] = input
        for layer in self.__layers:
            result = layer.feed_forward(result)
        return result

    def __update_weights(
        self,
        batch: list[tuple[Matrix[float], Matrix[float]]],
        *,
        learning_rate: float = 0.01,
    ) -> None:
        if len(batch) == 0:
            return
        for x, y in batch:
            prediction: Matrix[float] = self.__feed_forward(x)
            upstream: Matrix[float] = self.__loss.ddx(prediction, y)
            for layer in reversed(self.__layers):
                upstream = layer.accumulate_grads(upstream)
        for layer in self.__layers:
            layer.apply_grads(learning_rate=learning_rate, batch_size=len(batch))

    def fit(
        self,
        xs: list[Matrix[float]],
        ys: list[Matrix[float]],
        *,
        batch_size: int = 16,
        epochs: int = 10,
        learning_rate: float = 0.01,
    ) -> None:
        batch_count: int = (len(xs) + 1) // batch_size
        print('+-------+' + '-' * (batch_count + 2))
        print('| Epoch |' + f' {'Batches':^{(batch_count + 1)}} ')
        print('+-------+' + '-' * (batch_count + 2), end=str())
        for epoch in range(epochs):
            print(f'\n| {(f'{epoch + 1}/{epochs}'):^5} |', end=' ')
            for batch in batches(zip(xs, ys, strict=True), size=batch_size):
                print(end='#')
                self.__update_weights(batch, learning_rate=learning_rate)

    def evaluate(self, xs: Iterable[Matrix[float]], answers: Iterable[int]) -> float:
        return sum(
            self.predict(x) == answer
            for x, answer in zip(xs, answers, strict=True)
        ) / len(xs)

    def predict(self, input: Matrix[float]) -> Answer:
        return self.__transform(self.__feed_forward(input))

    def save(self, folder: str, name: str | None = None) -> None:
        default_name: str = self.summary.replace(
            ' -> ', '_'
        ).replace('(', str()).replace(')', str())
        with Path(f'{folder}/{name or default_name}').open('w') as model_file:
            for layer in self.__layers:
                model_file.write(f'{serialize_matrix(layer.weights)}\n')
                model_file.write(f'{serialize_matrix(layer.biases)}\n')

    def load(self, folder: str, name: str) -> None:
        with Path(f'{folder}/{name}').open('r') as model_file:
            lines: list[str] = list(
                filter(lambda line: line != str(), model_file.readlines())
            )
            for i, (w, b) in enumerate(zip(lines[::2], lines[1::2])):
                self.__layers[i].weights = deserialize_matrix(w, dtype=float)
                self.__layers[i].biases = deserialize_matrix(b, dtype=float)

## Dataset

In [None]:
def read_idx_images(path: str) -> list[Matrix[float]]:
    with Path(path).open('rb') as images_file:
        magic, num, rows, cols = cast(
            tuple[int, int, int, int],
            unpack('>IIII', images_file.read(16)),
        )
        if magic != 2051:
            raise ValueError(f'Not an IDX image file (magic {magic})')
        size: int = rows * cols
        data: bytes = images_file.read()
        if len(data) != num * size:
            raise ValueError('Unexpected file length for image data')

        images: list[Matrix[float]] = []
        mv: memoryview[int] = memoryview(data)
        for i in range(num):
            start: int = i * size
            vals: tuple[float, ...] = tuple(
                v / 255.0 for v in mv[start:(start + size)].tolist()
            )
            images.append(Matrix(vals, dim=MatrixDim(size, 1), render_decimals=2))
        return images

In [None]:
def read_idx_labels(path: str | Path) -> list[int]:
    with Path(path).open('rb') as labels_file:
        magic, num = cast(tuple[int, int], unpack('>II', labels_file.read(8)))
        if magic != 2049:
            raise ValueError(f'Not an IDX label file (magic {magic})')
        data = labels_file.read()
        if len(data) != num:
            raise ValueError('Unexpected file length for label data')
        mv: memoryview[int] = memoryview(data)
        return mv.tolist()

In [None]:
def load_dataset(
    path: str, prepare_image: Callable[[float], float] | None = None
) -> tuple[list[Matrix[float]], list[int], list[Matrix[float]], list[int]]:
    root: Path = Path(path)
    train_images: list[Matrix[float]] = [
        image.apply(prepare_image) if prepare_image is not None else image
        for image in read_idx_images(
            root / 'train-images-idx3-ubyte' / 'train-images-idx3-ubyte'
        )
    ]
    train_labels: list[int] = read_idx_labels(
        root / 'train-labels-idx1-ubyte' / 'train-labels-idx1-ubyte'
    )
    test_images: list[Matrix[float]] = [
        image.apply(prepare_image) if prepare_image is not None else image
        for image in read_idx_images(
            root / 't10k-images-idx3-ubyte' / 't10k-images-idx3-ubyte'
        )
    ]
    test_labels: list[int] = read_idx_labels(
        root / 't10k-labels-idx1-ubyte' / 't10k-labels-idx1-ubyte'
    )
    return (train_images, train_labels, test_images, test_labels)

In [None]:
def one_hot(label: int, classes: int) -> Matrix[float]:
    vec: list[float] = [0.0] * classes
    vec[label] = 1
    return vector(vec)

## Learn

In [None]:
def argmax(matrix: Matrix[float]) -> int:
    return max(enumerate(matrix.as_vector), key=lambda element: element[1])[0]

In [None]:
model: Model[float] = Model(
    input_size=784,
    layers=[
        layer(n=16, activation=Activation.SIGMOID),
        layer(n=16, activation=Activation.SIGMOID),
        layer(n=10),
    ],
    loss=Loss.SOFTMAX_CROSSENTROPY,
    transform=argmax,
)

In [None]:
model.summary

In [None]:
x_train, y_train, x_test, y_test = load_dataset(path='mnist')

In [None]:
model.fit(
    xs=x_train[:1000],
    ys=[one_hot(y, classes=10) for y in y_train][:1000],
    batch_size=32,
    epochs=3,
    learning_rate=0.3,
)

In [None]:
model.evaluate(x_test[:1000], y_test[:1000])

In [None]:
model.save(folder='models', name='test.model')

In [None]:
m: Model[float] = Model(
    input_size=784,
    layers=[
        layer(n=16, activation=Activation.SIGMOID),
        layer(n=16, activation=Activation.SIGMOID),
        layer(n=10),
    ],
    loss=Loss.SOFTMAX_CROSSENTROPY,
    transform=argmax,
)

In [None]:
m.load(folder='models', name='test.model')

In [None]:
m.evaluate(x_test[:1000], y_test[:1000])

## Drawing

In [None]:
import tkinter as tk
from PIL import Image, ImageDraw
import numpy as np
from typing import Final

In [None]:
CANVAS_SIZE: Final[int] = 280
GRID_SIZE: Final[int] = 28
SCALE: Final[int] = CANVAS_SIZE // GRID_SIZE

In [None]:
class DrawMNIST:
    def __init__(self, model):
        self.model = model

        self.root = tk.Tk()
        self.root.title('MNIST Live Prediction')

        self.canvas = tk.Canvas(self.root, width=CANVAS_SIZE, height=CANVAS_SIZE, bg='black')
        self.canvas.pack(side=tk.LEFT)

        self.label_var = tk.StringVar()
        self.label = tk.Label(self.root, textvariable=self.label_var, font=('Arial', 24))
        self.label.pack(side=tk.RIGHT, padx=20)

        self.image = Image.new('L', (CANVAS_SIZE, CANVAS_SIZE), 0)
        self.draw = ImageDraw.Draw(self.image)

        self.canvas.bind('<B1-Motion>', self.paint)
        self.canvas.bind('<B3-Motion>', self.erase)

        self.update_prediction()
        self.root.mainloop()

    def paint(self, event):
        x1, y1 = (event.x - 8), (event.y - 8)
        x2, y2 = (event.x + 8), (event.y + 8)
        self.canvas.create_oval(x1, y1, x2, y2, fill='white', outline='white')
        self.draw.ellipse([x1, y1, x2, y2], fill=255)

    def erase(self, event):
        x1, y1 = (event.x - 8), (event.y - 8)
        x2, y2 = (event.x + 8), (event.y + 8)
        self.canvas.create_oval(x1, y1, x2, y2, fill='black', outline='black')
        self.draw.ellipse([x1, y1, x2, y2], fill=0)

    def get_matrix(self) -> Matrix[float]:
        img_small = self.image.resize((GRID_SIZE, GRID_SIZE), Image.Resampling.LANCZOS)
        arr = np.array(img_small).astype(np.float32) / 255.0
        flat = arr.flatten().tolist()
        return Matrix(tuple(flat), dim=MatrixDim(GRID_SIZE * GRID_SIZE, 1))

    def update_prediction(self):
        x = self.get_matrix()
        pred = self.model.predict(x)
        self.label_var.set(f'Prediction: {pred}')
        self.root.after(500, self.update_prediction)  # update every 0.5s

In [None]:
DrawMNIST(model)