# Train Notebook is based on Keras 3.0 with TF


- tensorflow-2.15.0
- keras_cv-0.8.1
- keras-3.0.4


In [None]:
!pip install -q /kaggle/input/kerasv3-lib-ds/keras_cv-0.8.2-py3-none-any.whl --no-deps

!pip install -q /kaggle/input/kerasv3-lib-ds/tensorflow-2.15.0.post1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl --no-deps

!pip install -q /kaggle/input/kerasv3-lib-ds/keras-3.0.4-py3-none-any.whl --no-deps

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"  # you can also use jax, tensorflow or torch


import keras_cv
import keras
#from tensorflow import keras
from keras import ops
import tensorflow as tf

import cv2
import pandas as pd
import numpy as np
from glob import glob

import joblib
import shutil
import json

import matplotlib.pyplot as plt 


import sys
import platform


import matplotlib.pyplot as plt
import scipy
import joblib
from scipy.signal import spectrogram



In [None]:
print("ОС: ", platform.system())
print("Архитектура: ", platform.machine())
print("Версия Python: ", sys.version_info)

In [None]:
print(tf.__version__)
try:
    print(keras.__version__)
except:
    print('Tf.Keras')
print(keras_cv.__version__)

In [None]:
# Определите и инициализируйте TPU
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
    print('TPU active')
except ValueError:
    tpu = None
    print('TPU non active')

# Создайте стратегию распределения
if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
else:
    gpus = tf.config.list_physical_devices('GPU')
    print(gpus)
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)
            
    if len(gpus) >= 2:
        strategy = tf.distribute.MirroredStrategy()
    else:
        strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU

print("Number of replicas:", strategy.num_replicas_in_sync)
print(tf.config.list_physical_devices())

# Config

In [None]:
class Config:
    
    DEV = "TPU" if tpu else "GPU"
    if tpu:
        CACHE=True
        CACHE_DIR=""
    else:
        CACHE=False
        CACHE_DIR=""
        
    VERSION = 1
    DF_VERSION = 1
    MIX = 1 # USE MIXED PRECISION
    SEED = 42
    IMAGE_SIZE = (128, 128) 
    BATCH_SIZE = 256* strategy.num_replicas_in_sync
    EPOCHS = 20
    EARLY_STOPPING = 4 # Количество эпох через которое надо остановить обучение если нет улучшения результата
    LR_MODE = 'cos' #'exp' , 'step' - режим измененмя скорости обучения
    
    CLASS_NAMES = ['BrassMono-BoldItalic', 
                   'GhastlyPanicCyr', 
                   'ambidexter_regular',
                   'GaneshaType-Regular', 
                   'AlumniSansCollegiateOne-Italic',
                   'AlumniSansCollegiateOne-Regular', 
                   'BrassMono-Italic',
                   'better-vcr-5.2', 
                   'ArefRuqaaInk-Bold', 
                   'Aguante-Regular',
                   'BrassMono-Bold', 
                   'ArefRuqaaInk-Regular', 
                   'Realest-Extended',
                   'BrassMono-Regular', 
                   'TanaUncialSP']
    LABEL2NAME = dict(enumerate(CLASS_NAMES))
    NAME2LABEL = {v:k for k, v in LABEL2NAME.items()}
    NUM_CLASSES = len(CLASS_NAMES)
    AUTOTUNE = tf.data.AUTOTUNE
    PRESET = 'efficientnetv2_b2_imagenet'   #'efficientnetv2_b2_imagenet','efficientnetv2_s_imagenet', 'efficientnetv2_b0_imagenet_classifier'
                                    #"yolo_v8_l_backbone_coco", "yolo_v8_m_backbone_coco"# Name of pretrained MODEL
                                    #"densenet121_imagenet"
                                    #"csp_darknet_l", "csp_darknet_l_imagenet"
                                    #"vitdet_large_sa1b"
    fold = 2 # Which fold to set as validation data
    VERBOSE = 1  # Verbosity
    LR_START, LR_MAX, LR_MIN = 5e-5, 1e-4 , 1e-6
    DATASET_DIR = '/kaggle/input/kvant-fonts-ds'

CFG = Config()
keras.utils.set_random_seed(seed=CFG.SEED)


In [None]:
# USE MIXED PRECISION

if CFG.MIX:
    tf.config.optimizer.set_experimental_options({"auto_mixed_precision": True})
    print('Mixed precision enabled')
else:
    print('Using full precision')

# Read dataset

In [None]:
# Исходный путь

root_dir = CFG.DATASET_DIR
data = []

# Проходим по всем папкам в исходном каталоге
for font_name in os.listdir(root_dir):
    font_dir = os.path.join(root_dir, font_name)
    
    # Проходим по всем папкам внутри папки шрифта
    for image_type in os.listdir(font_dir):
        image_dir = os.path.join(font_dir, image_type)
        
        # Проходим по всем файлам внутри папки типа изображения
        for file in os.listdir(image_dir):
            file_path = os.path.join(image_dir, file)
            
            # Добавляем информацию в список
            data.append([font_name, image_type, file_path])

# Создаем DataFrame
df = pd.DataFrame(data, columns=['font_name', 'image_type', 'path'])

In [None]:
df.head()

In [None]:
df['font_name'].unique()

In [None]:
CFG.NAME2LABEL, CFG.LABEL2NAME, CFG.NUM_CLASSES

In [None]:
df['class_label'] = df.font_name.map(CFG.NAME2LABEL)


In [None]:
class InversionLayer(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(InversionLayer, self).__init__(**kwargs)

    def call(self, data, training=None):
        if training:
            data["images"] = 1.0 - data["images"]
        return data
    

def build_augmenter(dim=CFG.IMAGE_SIZE):
    augmenters = [
        #RandAugment только для 3х каналов
        keras_cv.layers.RandAugment(value_range=(0, 1), augmentations_per_image=1, magnitude=0.1,input_shape=(None, 128, 128, 1)),
        keras_cv.layers.RandomFlip(mode="horizontal_and_vertical"),
        #keras_cv.layers.MixUp(alpha=2.0),
        keras_cv.layers.RandomCutout(height_factor=(0.2, 0.5), width_factor=(0.3, 0.6)), 
        keras_cv.layers.RandomShear(
                                        x_factor=0.2,
                                        y_factor=0.4,
                                        interpolation="bilinear",
                                        fill_mode="reflect",
                                        fill_value=0.3,
                                        bounding_box_format=None,
                                        seed=CFG.SEED
                                    ),
        InversionLayer() # добавляем слой инверсии
    ]
    
    def augment(img, label):
        data = {"images":img, "labels":label}
        for augmenter in augmenters:
            if tf.random.uniform([]) < 0.2:
                data = augmenter(data, training=True)
        return data["images"], data["labels"]
    
    return augment


def build_decoder(with_labels=True, 
                  target_size=CFG.IMAGE_SIZE, 
                  dtype=32, 
                ):
    
    def decode_img(path):
        file_bytes = tf.io.read_file(path)
        image = tf.io.decode_png(file_bytes, channels=1, dtype=tf.uint8)
        image = tf.image.resize(image, CFG.IMAGE_SIZE, method="bilinear")
        image = tf.cast(image, tf.float32) / 255.0
        
        # Mono channel to 3 channels to use "ImageNet" weights
        #tf.print(image.shape)
        image = tf.tile(image, [1, 1, 3])
        
        return image


        


    def decode_label(label):
        label = tf.one_hot(label, CFG.NUM_CLASSES)
        label = tf.cast(label, tf.float32)
        label = tf.reshape(label, [CFG.NUM_CLASSES])
        return label
    
    def decode_with_labels(path, label=None):
        img = decode_img(path)
        label = decode_label(label)
        return (img, label)
    
    return decode_with_labels if with_labels else decode_img


def build_dataset( 
                  paths, 
                  labels=None, 
                  batch_size=32, 
                  cache=True,
                  decode_fn=None,
                  augment_fn=None,
                  augment=False, 
                  repeat=True, 
                  shuffle=1024, 
                  cache_dir="", 
                  drop_remainder=False):
    if cache_dir != "" and cache is True:
        os.makedirs(cache_dir, exist_ok=True)
    
    if decode_fn is None:
        decode_fn = build_decoder(labels is not None)
    
    if augment_fn is None:
        augment_fn = build_augmenter()
    
    AUTO = tf.data.experimental.AUTOTUNE
    slices = (paths) if labels is None else (paths, labels)
    
    ds = tf.data.Dataset.from_tensor_slices(slices)
    ds = ds.map(decode_fn, num_parallel_calls=AUTO)
    ds = ds.cache(cache_dir) if cache else ds
    ds = ds.repeat() if repeat else ds
    if shuffle: 
        ds = ds.shuffle(shuffle, seed=CFG.SEED)
        opt = tf.data.Options()
        opt.experimental_deterministic = False
        ds = ds.with_options(opt)
    ds = ds.batch(batch_size, drop_remainder=drop_remainder)
    ds = ds.map(augment_fn, num_parallel_calls=AUTO) if augment else ds
    ds = ds.prefetch(AUTO)
    return ds

# 🔪 | Data Split

In the following code snippet, the data is divided into `5` folds. Note that, the `groups` argument is used to prevent any overlap of patients between the training and validation sets, thus avoiding potential **data leakage** issues. Additionally, each split is stratified based on the `class_label`, ensuring a uniform distribution of class labels in each fold.

In [None]:
from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=CFG.SEED)

df["fold"] = -1
df.reset_index(drop=True, inplace=True)
for fold, (train_idx, valid_idx) in enumerate(skf.split(df, y=df["class_label"])):
    df.loc[valid_idx, "fold"] = fold
df.groupby(["fold", "font_name"])[["path"]].count()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Создаем фигуру и оси
fig, axs = plt.subplots(5, 1, figsize=(10, 20))

# Для каждой группы строим гистограмму
for fold in range(5):
    # Выбираем подмножество данных для данной группы
    subset = df[df["fold"] == fold]
    
    # Строим гистограмму
    sns.countplot(data=subset, x="class_label", ax=axs[fold])
    
    # Устанавливаем заголовок
    axs[fold].set_title(f"Fold {fold}")
    
# Показываем график
plt.tight_layout()
plt.show()

In [None]:
df

In [None]:
# Sample from  data
sample_df = df.sample(frac=1).reset_index(drop=True)
train_df = sample_df[sample_df.fold != CFG.fold]
valid_df = sample_df[sample_df.fold == CFG.fold]
print(f"# Num Train: {len(train_df)} | Num Valid: {len(valid_df)}")

# Train
train_paths = train_df.path.values
train_labels = train_df.class_label.values

train_ds = build_dataset(paths=train_paths,  
                         labels=train_labels,
                         batch_size=CFG.BATCH_SIZE,
                         repeat=True, 
                         shuffle=True, 
                         augment=False, 
                         cache=False,
                         #cache_dir=CFG.CACHE_DIR
                        )

# Valid
valid_paths = valid_df.path.values
valid_labels = valid_df.class_label.values

valid_ds = build_dataset(paths=valid_paths,  
                         labels=valid_labels,
                         batch_size=CFG.BATCH_SIZE,
                         repeat=None, 
                         shuffle=None, 
                         augment=None, 
                         cache=False,
                         #cache_dir=CFG.CACHE_DIR
                        )

In [None]:

imgs, tars = next(iter(train_ds))
print(imgs.shape)
SIZE = imgs.shape[1:]
min_value = np.min(imgs)
max_value = np.max(imgs)
print("Минимальное значение в матрице: ", min_value)
print("Maximal значение в матрице: ", max_value)
num_imgs = CFG.BATCH_SIZE
plt.figure(figsize=(4*4, num_imgs//4*5))
for i in range(num_imgs):
    plt.subplot(num_imgs//4, 4, i + 1)
    img = imgs[i].numpy()[...,0]  # Adjust as per your image data format
    tar = CFG.LABEL2NAME[np.argmax(tars[i].numpy())]
    plt.imshow(img, cmap='gray')
    plt.title(f"Target: {tar}")
    plt.axis('off')
    
plt.tight_layout()
plt.show()

## 🔍 | Loss & Metric¶

The evaluation metric in this competition is KL Divergence, defined as,

D
KL
(
P
∥
Q
)
=
∑
i
 
P
(
i
)
log
(
P
(
i
)
Q
(
i
)
)
 
Where:

P
  is the true distribution.
Q
  is the predicted distribution.
Interestingly, as KL Divergence is differentiable, we can directly use it as our loss function. Thus, we don't need to use a third-party metric like Accuracy to evaluate our model. Therefore, valid_loss can stand alone as an indicator for our evaluation. In keras, we already have impelementation for KL Divergence loss so we only need to import it.

In [None]:
LOSS = tf.keras.losses.KLDivergence()

## 🤖 | Modeling¶

This notebook uses the EfficientNetV2 B2 from KerasCV's collection of pretrained models. To explore other models, simply modify the preset in the CFG (config). Check the KerasCV website for a list of available pretrained models.

In [None]:
def build_model():
    # Detect hardware, return appropriate distribution strategy
    
    with strategy.scope():
        # Определение входных данных
        inputs = keras.Input(shape=(128, 128, 1))

        # Слой 1
        x = keras.layers.Conv2D(64, (3, 3), activation='relu')(inputs)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.MaxPooling2D(pool_size=(2, 2))(x)

        # Слой 2
        x = keras.layers.Conv2D(128, (3, 3), activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.MaxPooling2D(pool_size=(2, 2))(x)

        # Слой 3
        x = keras.layers.Conv2D(256, (3, 3), activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)

        # Слой 4
        x = keras.layers.Conv2D(256, (3, 3), activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)

        # Слой 5
        x = keras.layers.Conv2D(256, (3, 3), activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)
        gap = keras.layers.GlobalAveragePooling2D()
        # Полносвязные слои
        x = gap(x)
        x = keras.layers.Dense(4096, activation='relu')(x)
        x = keras.layers.Dropout(0.5)(x)
        x = keras.layers.Dense(4096, activation='relu')(x)
        x = keras.layers.Dropout(0.5)(x)
        outputs = keras.layers.Dense(CFG.NUM_CLASSES, activation='softmax', name= 'outputs')(x)  # Предполагается, что у вас 1000 классов

        # Создание модели
        model = keras.Model(inputs=inputs, outputs=outputs)

        # Компиляция модели
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
        
    return model

model = build_model()
# Model Sumamry
print(model.summary())

In [None]:
def build_model():
    with strategy.scope():

        #backbone = keras_cv.models.YOLOV8Backbone.from_preset("yolo_v8_xl_backbone_coco")
        model = keras_cv.models.ImageClassifier.from_preset(
        
            CFG.PRESET,
            #pooling="avg", 
            #activation="softmax",
            num_classes=CFG.NUM_CLASSES  
        )

        # Compile the model  
        model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4),
                      loss='categorical_crossentropy', 
                      metrics=['accuracy']
                     )
    return model
model = build_model()
# Model Sumamry
print(model.summary())

## Early stopping round

In [None]:
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=CFG.EARLY_STOPPING,
    restore_best_weights=True
)

## ⚓ | LR Schedule
A well-structured learning rate schedule is essential for efficient model training, ensuring optimal convergence and avoiding issues such as overshooting or stagnation.

- lr_ramp_ep: Это количество эпох, в течение которых скорость обучения увеличивается от lr_start до lr_max. Это помогает модели медленно “разогреваться” перед тем, как перейти к более высокой скорости обучения.
- lr_sus_ep: Это количество эпох, в течение которых скорость обучения остается на уровне lr_max. Это позволяет модели “устояться” на определенной скорости обучения перед тем, как начать затухание.
- lr_decay: Это коэффициент затухания, который определяет, насколько быстро скорость обучения уменьшается после “поддержания”. Значение меньше 1 означает, что скорость обучения будет уменьшаться с каждой эпохой.
- lr_start: Это начальная скорость обучения. Это значение, с которого начинается “разогрев” скорости обучения.
- lr_max: Это максимальная скорость обучения, которую модель может достичь в процессе “разогрева”. После достижения этого значения, скорость обучения либо остается на этом уровне в течение определенного количества эпох (lr_sus_ep), либо начинает затухать.
- lr_min: Это минимальная скорость обучения, которую модель может достичь в процессе затухания. Это значение, к которому стремится скорость обучения после “разогрева” и “поддержания”.

In [None]:
import math


def get_lr_callback(batch_size=CFG.BATCH_SIZE, 
                    mode='cos', 
                    epochs=CFG.EPOCHS, 
                    plot=False, 
                    lr_start = CFG.LR_START, 
                    lr_max = CFG.LR_MAX, 
                    lr_min = CFG.LR_MIN):

    #lr_start, lr_max, lr_min = 1e-5, 6e-5 , 1e-7 #2e-6 * batch_size
    lr_ramp_ep, lr_sus_ep, lr_decay = epochs//5, 1, 0.1
    
    def lrfn(epoch):  # Learning rate update function
        if epoch < lr_ramp_ep: lr = (lr_max - lr_start) / lr_ramp_ep * epoch + lr_start
        elif epoch < lr_ramp_ep + lr_sus_ep: lr = lr_max
        elif mode == 'exp': lr = (lr_max - lr_min) * lr_decay**(epoch - lr_ramp_ep - lr_sus_ep) + lr_min
        elif mode == 'step': lr = lr_max * lr_decay**((epoch - lr_ramp_ep - lr_sus_ep) // 2)
        elif mode == 'cos':
            decay_total_epochs, decay_epoch_index = epochs - lr_ramp_ep - lr_sus_ep + 3, epoch - lr_ramp_ep - lr_sus_ep
            phase = math.pi * decay_epoch_index / decay_total_epochs
            lr = (lr_max - lr_min) * 0.5 * (1 + math.cos(phase)) + lr_min
        return lr

    if plot:  # Plot lr curve if plot is True
        plt.figure(figsize=(10, 5))
        plt.plot(np.arange(epochs), [lrfn(epoch) for epoch in np.arange(epochs)], marker='o')
        plt.xlabel('epoch'); plt.ylabel('lr')
        plt.title('LR Scheduler')
        plt.show()

    return keras.callbacks.LearningRateScheduler(lrfn, verbose=False)  # Create lr callback

In [None]:
lr_cb = get_lr_callback(CFG.BATCH_SIZE, mode=CFG.LR_MODE, plot=True)

## 💾 | Model Checkpointing & CSV logger

In [None]:
ckpt_cb = keras.callbacks.ModelCheckpoint(f"/kaggle/working/best_model.keras",
                                         monitor='val_loss',
                                         save_best_only=True,
                                         save_weights_only=False,
                                         mode='min')

In [None]:
csv_logger = keras.callbacks.CSVLogger('training_log.csv', separator=',', append=False)

In [None]:
with strategy.scope():
    history = model.fit(
        train_ds, 
        epochs=CFG.EPOCHS,
        callbacks=[
            lr_cb, 
            ckpt_cb, 
            csv_logger,
            early_stopping
        ], 
        steps_per_epoch=len(train_df)//CFG.BATCH_SIZE,
        validation_data=valid_ds, 
        verbose=CFG.VERBOSE
    )

# |Results of train

In [None]:
plt.plot(history.history["loss"], label="loss")
plt.plot(history.history["val_loss"], label="val loss")
plt.legend()
plt.show()

In [None]:
best_epoch = np.argmin(history.history['val_loss'])
best_loss = history.history['val_loss'][best_epoch]
best_acc = history.history['val_accuracy'][best_epoch]

print(f'>>>> BEST Loss  : {best_loss:.3f}\n>>>> BEST Acc   : {best_acc:.3f}\n>>>> BEST Epoch : {best_epoch}\n')