In [None]:
!pip install tensorflow

In [None]:
from tensorflow.keras.datasets import mnist
import tensorflow as tf
from tensorflow.keras import optimizers

In [None]:
from typing import Any


class MyDense():
    def __init__(self, units, input_size, output_size, activation):
        self.units = units
        self.activation = activation
        # input_size - number of neurons in the previous layer
        # output_size - number of neurons in the current layer
        initial_weights = tf.random.uniform((input_size, output_size), minval=0, maxval=1e-1)
        self.weight = tf.Variable(initial_weights)
        self.bias = tf.Variable(tf.zeros([output_size]))

    def __call__(self, input) -> Any:
        return self.activation(tf.matmul(input, self.weight) + self.bias)
    
    @property
    def weights(self):
        return [self.weight, self.bias]


class MySequential():
    def __init__(self, layers):
        self.layers = layers

    def __call__(self, inputs) -> Any:
        x = inputs
        for layer in self.layers:
            x = layer(x)
        return x
    
    @property
    def weights(self):
        weights = []
        for layer in self.layers:
            weights += layer.weights
        return weights

    
model = MySequential([
    MyDense(units=512, input_size=28*28, output_size=512, activation=tf.nn.relu),
    MyDense(units=512, input_size=512, output_size=10, activation=tf.nn.softmax),
])

assert len(model.weights) == 4


class BatchGenerator():
    def __init__(self, batch_size, input_data, output_data):
        assert len(input_data) == len(output_data)
        self.input_data = input_data
        self.output_data = output_data
        self.batch_size = batch_size
        self.start = 0
        
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.start >= len(self.input_data):
            raise StopIteration
        start = self.start
        end = self.start + self.batch_size
        input_result = self.input_data[start : end]
        output_result = self.output_data[start: end]
        self.start += self.batch_size
        return input_result, output_result


optimizer = optimizers.SGD(learning_rate=1e-3)

def one_training_step(model, input_batch, output_batch):
    with tf.GradientTape() as tape:
        predicted_output = model(input_batch)
        per_sample_loss = tf.keras.losses.sparse_categorical_crossentropy(output_batch, predicted_output)
        avg_loss = tf.reduce_mean(per_sample_loss)
        dloss_dw = tape.gradient(avg_loss, model.weights)
        optimizer.apply_gradients(zip(dloss_dw, model.weights))
        return avg_loss
    
def fit(model, input_data, output_data, epochs, batch_size=128):
    for epoch in range(epochs):
        batch_generator = BatchGenerator(batch_size, input_data=input_data, output_data=output_data)
        print_counter = 0
        for input_batch, output_batch in batch_generator:
            loss = one_training_step(model, input_batch, output_batch)
            if print_counter % 100 == 0:
                print(f"Epoch {epoch} Loss {loss}")


(x_train, y_train), (x_test, y_test) = mnist.load_data()

print(x_train.shape, y_train.shape)
x_train = x_train.reshape((60000, 28 * 28))
x_train = x_train.astype("float32") / 255  
x_test = x_test.reshape((10000, 28 * 28))
x_test = x_test.astype("float32") / 255

fit(model, x_train, y_train, epochs=10, batch_size=128)
print(model(x_train[0:1]))