
# Contents

0. Setup
1. Acquisition of Data
2. Model Building


# Section 0: Setup

## Prerequisites are essential by definition

In [21]:
import pathlib
import mlcroissant as mlc
import contextlib
import zipfile, rarfile, tarfile
import re
import pickle
import kagglehub
import io
from PIL import Image
from typing import Any, Callable, Generator
from IPython.display import display, clear_output
import secrets
import ipywidgets as widgets
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [22]:
%%html

<style>
.cell-output-ipywidget-background{
    background-color: transparent !important;
}

.jp-OutputArea-output{
    background-color: transparent;
    color: white;
}
</style>

In [23]:
dataset_url: str = "https://www.kaggle.com/datasets/xainano/handwrittenmathsymbols"

jsonld_path: pathlib.Path = pathlib.Path("./croissants/handwrittenmathsymbols-metadata.json")
dataset_data_archive_path: pathlib.Path = pathlib.Path("./data.rar")
image_dict_pickle_path: pathlib.Path = pathlib.Path("./pickles/image_dict.pkl")
model_path: pathlib.Path = pathlib.Path("./keras_models/model.keras")

load_image_dict: bool = True
load_model: bool = True

image_display_resize: tuple[int, int] = (200, 200)
image_display_resampling: int = Image.Resampling.BICUBIC
dataset_image_resize: tuple[int, int] = (45, 45)
dataset_image_resampling: str = tf.image.ResizeMethod.BICUBIC

dataset_generator_shards: int = 4
dataset_shuffle_buffer_size: int | None = 10000
train_split: float = 0.8
dataset_batch_size: int = 32

model_init_hidden_layers: list[layers.Layer] = [
    layers.Conv2D(64, 3, padding = "same", activation = "relu"),
    layers.Conv2D(64, 3, padding = "same", activation = "relu"),
    layers.SpatialDropout2D(0.1),
    layers.MaxPooling2D(),
    layers.Conv2D(128, 3, padding = "same", activation = "relu"),
    layers.Conv2D(128, 3, padding = "same", activation = "relu"),
    layers.SpatialDropout2D(0.1),
    layers.MaxPooling2D(),
    layers.Conv2D(256, 3, padding = "same", activation = "relu"),
    layers.Conv2D(256, 3, padding = "same", activation = "relu"),
    layers.Conv2D(256, 3, padding = "same", activation = "relu"),
    layers.SpatialDropout2D(0.2),
    layers.MaxPooling2D(),
    layers.Conv2D(512, 3, padding = "same", activation = "relu"),
    layers.Conv2D(512, 3, padding = "same", activation = "relu"),
    layers.Conv2D(512, 3, padding = "same", activation = "relu"),
    layers.SpatialDropout2D(0.3),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(4096, activation = "relu"),
    layers.Dropout(0.4),
    layers.Dense(4096, activation = "relu")
]

early_stopping_patience: int = 3
train_epochs: int = 16


# Section 1: Acquisition of Data

## The very foundation of analysis

In [24]:
metadata: mlc.Metadata = mlc.Dataset(jsonld = jsonld_path).metadata

print(f"\n\n\n\x1b[2J\x1b[93;1mDataset at {dataset_url}\x1b[0m\n\n{metadata.name}\nPublished: {metadata.date_published}")



  -  [Metadata(Handwritten math symbols dataset)] Property "http://mlcommons.org/croissant/citeAs" is recommended, but does not exist.





[2J[93;1mDataset at https://www.kaggle.com/datasets/xainano/handwrittenmathsymbols[0m

Handwritten math symbols dataset
Published: 2017-01-15 16:49:28.723000


In [25]:
image_dict: dict[str, list[bytes]] = {}

@contextlib.contextmanager
def open_archive(path: pathlib.Path) -> Generator[zipfile.ZipFile | rarfile.RarFile | tarfile.TarFile, None, None]:
    if not path.is_file():
        raise FileNotFoundError(f"File not found")

    file: zipfile.ZipFile | rarfile.RarFile | tarfile.TarFile | None = None

    match path.suffix:
        case ".zip":
            file =  zipfile.ZipFile(path)
        case ".rar":
            file =  rarfile.RarFile(path)
        case suffix if re.search(suffix, r"\.tar(\.[^ \n]+)?"):
            file = tarfile.open(path)
        case _:
            raise ValueError("File type not supported")
    try:
        yield file
    finally:
        if file is not None:
            file.close()

def archive_type_switch(archive_file: zipfile.ZipFile | rarfile.RarFile | tarfile.TarFile, input: tuple[Any, ...], zip_or_rar_callback: Callable[..., Any], tar_callback: Callable[[Any], Any]) -> Any:
    match type(archive_file):
        case zipfile.ZipFile | rarfile.RarFile:
            return zip_or_rar_callback(*input)
        case tarfile.TarFile:
            return tar_callback(*input)

def dict_get_data_archive(image_dict: dict[str, list[bytes]], path: pathlib.Path) -> None:
    with open_archive(path) as archive_file:
        for entry in archive_type_switch(archive_file, (archive_file, ), lambda x: x.infolist(), lambda x: x.get_members()):
            if archive_type_switch(archive_file, (entry, ), lambda x: x.is_dir(), lambda x: x.isdir()):
                continue

            with archive_file.open(entry, "r") as image_file:
                entry_label: str = entry.filename.split('/')[-2]

                image_dict.setdefault(entry_label, []).append(archive_type_switch(archive_file, (archive_file, ), lambda x: x.read(), lambda x: x.extractfile()))
                print(f"Added {entry.filename} with label {entry_label}")

try:
    assert load_image_dict

    with open(image_dict_pickle_path, "rb") as pickle_file:
        print("Loading from pickle")
        image_dict = pickle.load(pickle_file)
except Exception:
    with open(image_dict_pickle_path, "wb") as pickle_file:
        print("Downloading dataset")
        dataset_path: pathlib.Path = pathlib.Path(kagglehub.dataset_download(dataset_url[dataset_url.rfind("datasets") + len("datasets") + 1:]))
        print("Fetching from dataset")
        dict_get_data_archive(image_dict, dataset_path.joinpath(dataset_data_archive_path))
        pickle.dump(image_dict, pickle_file)

del open_archive, archive_type_switch, dict_get_data_archive
print("Done!")

Loading from pickle
Done!


In [26]:
def show_image_n_label(output: widgets.Output, image_dict: dict[str, list[bytes]], image_display_size: tuple[int, int], resampling: int) -> None:
    with output:
        random_label: str = secrets.choice(list(image_dict.keys()))
        image_bytes: bytes = secrets.choice(image_dict[random_label])

        clear_output(wait = True)
        display(Image.open(io.BytesIO(image_bytes)).resize(image_display_size, resampling))
        print(f"Label: {random_label}")

image_display_size: tuple[int, int] = (200, 200)
output: widgets.Output = widgets.Output()
next_button: widgets.Button = widgets.Button(description = "Next Image")

next_button.on_click(lambda button: show_image_n_label(output, image_dict, image_display_size, image_display_resampling))
next_button.click()
display(output)
display(next_button)

Output()

Button(description='Next Image', style=ButtonStyle())

In [27]:
def get_shard_keys(num_shards: int, shard_index: int, image_dict: dict[str, list[bytes]]) -> list[str]:
    shard_keys: list[str] = []
    num_keys: int = len(image_dict.keys())

    for i in range(shard_index, num_keys, num_shards):
        shard_keys.append(list(image_dict.keys())[i])

    return shard_keys

def sharded_generator_factory(num_shards: int, shard_index: int, image_dict: dict[str, list[bytes]], label_lookup: layers.StringLookup) -> Callable[[], Generator[tuple[bytes, tf.Tensor], None, None]]:
    def shard_generator() -> Generator[tuple[bytes, tf.Tensor], None, None]:
        for key in get_shard_keys(num_shards, shard_index, image_dict):
            for image_bytes in image_dict[key]:
                yield image_bytes, tf.squeeze(label_lookup(key))

    return shard_generator

def preprocess_image_bytes(image_bytes: bytes) -> tf.Tensor:
    image: tf.Tensor = tf.io.decode_image(image_bytes, channels = 3)

    image.set_shape([None, None, 3])
    image = tf.image.resize_with_pad(image, dataset_image_resize[0], dataset_image_resize[1], dataset_image_resampling)
    image = tf.image.convert_image_dtype(image, tf.float32)
    return image

image_dict_key_list: list[str] = list(image_dict.keys())
image_size: tuple[int, int] = Image.open(io.BytesIO(list(image_dict.values())[0][0])).size

output_signature: tuple[tf.TensorSpec] = (
    tf.TensorSpec(shape = (), dtype = tf.string),
    tf.TensorSpec(shape = (len(image_dict_key_list), ), dtype = tf.int32)
)

label_lookup: layers.StringLookup = layers.StringLookup(vocabulary = image_dict_key_list, num_oov_indices = 0, output_mode = "one_hot")

sharded_datsets: list[tf.data.Dataset] = [
    tf.data.Dataset.from_generator(
        sharded_generator_factory(dataset_generator_shards, i, image_dict, label_lookup),
        output_signature = output_signature
    )
    for i in range(dataset_generator_shards)
]

dataset: tf.data.Dataset = tf.data.Dataset.from_tensor_slices(sharded_datsets).interleave(
    lambda x: x,
    cycle_length = dataset_generator_shards,
    num_parallel_calls = tf.data.AUTOTUNE,
    deterministic = False
)

dataset = dataset.map(lambda x, y: (preprocess_image_bytes(x), y), num_parallel_calls = tf.data.AUTOTUNE, deterministic = False)

In [28]:
def get_dataset_length(image_dict: dict[str, list[bytes]]) -> int:
    total_length: int = 0

    for value in image_dict.values():
        total_length += len(value)

    return total_length

dataset_count: int = get_dataset_length(image_dict)

dataset = dataset.shuffle(dataset_count if dataset_shuffle_buffer_size is None else dataset_shuffle_buffer_size)

train_elements: int = int(dataset_count * train_split)
train_dataset: tf.data.Dataset = dataset.take(train_elements)
val_dataset: tf.data.Dataset = dataset.skip(train_elements)

print(f"Counted {dataset_count} total elements\nTraining set: {train_elements}\nValidation set: {dataset_count - train_elements}")

train_dataset = train_dataset.batch(dataset_batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(dataset_batch_size).prefetch(tf.data.AUTOTUNE)

Counted 375974 total elements
Training set: 300779
Validation set: 75195



# Section 2: Model Building

## What is Image Classification without an Image Classifier?

In [None]:
def init_image_classifier(input_shape: tuple[int, ...], hidden_layers: list[layers.Layer], output_shape: int) -> keras.Sequential:
    model: keras.Sequential = keras.Sequential()

    model.add(layers.Input(input_shape))

    for layer in hidden_layers:
        model.add(layer)

    model.add(layers.Dense(output_shape, activation = "softmax"))
    return model

try:
    assert load_model

    model: keras.Sequential = keras.models.load_model(model_path)
    
    print(f"Loaded model from {str(model_path)}")
except:
    model: keras.Sequential = init_image_classifier((image_size[0], image_size[1], 3), model_init_hidden_layers, len(image_dict_key_list))

    model.compile(optimizer = "adam", loss = "categorical_crossentropy", metrics = ["accuracy"])
    print(f"Compiled new model")
finally:
    early_stopping_callback: keras.callbacks.EarlyStopping = keras.callbacks.EarlyStopping(
        monitor = "val_loss",
        patience = early_stopping_patience,
        restore_best_weights = True,
        mode = "min"
    )

    checkpoint_callback: keras.callbacks.ModelCheckpoint = keras.callbacks.ModelCheckpoint(
        filepath = model_path,
        monitor = "val_loss",
        save_best_only = True,
        mode = "min"
    )

    model.fit(train_dataset, validation_data = val_dataset,epochs = train_epochs, callbacks = [early_stopping_callback, checkpoint_callback], verbose = 1)

Compiled new model
Epoch 1/16


2025-06-23 17:11:47.617269: I tensorflow/core/kernels/data/shuffle_dataset_op.cc:452] ShuffleDatasetV3:61: Filling up shuffle buffer (this may take a while): 2996 of 10000
2025-06-23 17:12:07.612313: I tensorflow/core/kernels/data/shuffle_dataset_op.cc:452] ShuffleDatasetV3:61: Filling up shuffle buffer (this may take a while): 9259 of 10000
2025-06-23 17:12:10.000888: I tensorflow/core/kernels/data/shuffle_dataset_op.cc:482] Shuffle buffer filled.


     78/Unknown [1m57s[0m 162ms/step - accuracy: 0.0000e+00 - loss: nan

KeyboardInterrupt: 