In [None]:
import tensorflow as tf

import os
import numpy as np


import os
import numpy as np
import cv2
import numpy as np
import matplotlib.pyplot as plt
from skimage.morphology import skeletonize
from skimage.filters import threshold_otsu
from skimage.measure import regionprops, label
from scipy.ndimage import distance_transform_edt, center_of_mass
from PIL import Image
from collections import defaultdict
from skimage import morphology


In [None]:
user = '7MK'

In [None]:

base_dir = user+"_inverted"
confusion_matrix_name = user+"_confusion_matrix.png"


In [None]:
import os
import shutil
import random

train_dir = base_dir + '_train'
test_dir = base_dir + '_test'
validation_dir = base_dir + '_validation'

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
os.makedirs(validation_dir, exist_ok=True) 


for category in os.listdir(base_dir):
    category_path = os.path.join(base_dir, category)
    if not os.path.isdir(category_path):
        continue

    train_cat_dir = os.path.join(train_dir, category)
    test_cat_dir = os.path.join(test_dir, category)
    validation_cat_dir = os.path.join(validation_dir, category) 
    
    os.makedirs(train_cat_dir, exist_ok=True)
    os.makedirs(test_cat_dir, exist_ok=True)
    os.makedirs(validation_cat_dir, exist_ok=True) 

    s1_files = []
    other_files = []
    for fname in os.listdir(category_path):
        if os.path.isfile(os.path.join(category_path, fname)):
            if '_s1_' in fname:
                s1_files.append(fname)
            elif '_s2_' in fname or '_s3_' in fname:
                other_files.append(fname)

    for fname in s1_files:
        shutil.copy2(os.path.join(category_path, fname), os.path.join(test_cat_dir, fname))
    
    # print(f"  Copied TEST {len(other_files)} files to '{test_cat_dir}'")

    num_test_files = 4
    

    validation_files = random.sample(other_files, num_test_files)
    train_files = [f for f in other_files if f not in validation_files]

    for fname in validation_files:
        shutil.copy2(os.path.join(category_path, fname), os.path.join(validation_cat_dir, fname))
    

    for fname in train_files:
        shutil.copy2(os.path.join(category_path, fname), os.path.join(train_cat_dir, fname))



    print(f"  Copied TRAIN {len(train_files)} files to '{train_cat_dir}'")
    print(f"  Copied VAL {len(validation_files)} files to '{validation_cat_dir}'")


print("Done splitting files into train, validation, and test sets.")


In [None]:
def preprocess_and_skeletonize_alternative(image):

    
    if image.shape != (224, 224, 3):
        raise ValueError("Image must be of shape (224, 224, 3)")
    
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    
    binary = gray > 127
    
    skeleton = morphology.skeletonize(binary)
    
    skeleton = (skeleton * 255).astype(np.uint8)
    
    skeleton_rgb = cv2.cvtColor(skeleton, cv2.COLOR_GRAY2RGB)

    skeleton_rgb = skeleton_rgb.astype(np.float32)
    preprocessed = tf.keras.applications.mobilenet_v2.preprocess_input(skeleton_rgb)
    
    return preprocessed

In [None]:
IMAGE_SIZE = 224
BATCH_SIZE = 8

datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=preprocess_and_skeletonize_alternative
)

In [None]:
train_generator = datagen.flow_from_directory(
    train_dir,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=True
)

val_generator = datagen.flow_from_directory(
    validation_dir,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=False  
)

test_generator = datagen.flow_from_directory(
    test_dir,
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=False  
)

In [None]:
print("Found classes:", train_generator.class_indices)
print("Samples in training set:", train_generator.samples)
print("Batches per epoch:", train_generator.samples // train_generator.batch_size)
print("Directory used:", train_generator.directory)

In [None]:
IMG_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)

base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                              include_top=False,
                                              weights=None)


In [None]:
base_model.trainable = False

## Add a classification head

In [None]:
model = tf.keras.Sequential([
  base_model,
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.GlobalAveragePooling2D(),
  tf.keras.layers.Dense(62, activation='softmax')
])




In [None]:
model.load_weights('baseline.weights.h5')

## Compile the model

In [None]:
from sklearn.metrics import f1_score
import numpy as np
import tensorflow as tf

class F1ScoreCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_generator):
        super().__init__()
        self.val_generator = val_generator

    def on_epoch_end(self, epoch, logs=None):
        y_true = []
        y_pred = []
        
        for i in range(len(self.val_generator)):
            x_val, y_val = self.val_generator[i]
            
            preds = self.model.predict(x_val)
            y_true.extend(np.argmax(y_val, axis=1))
            y_pred.extend(np.argmax(preds, axis=1))

        f1 = f1_score(y_true, y_pred, average='macro')
        print(f" — val_f1_score: {f1:.4f}")

        if logs is not None:
            logs['val_f1_score'] = f1


In [None]:

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
print('Number of trainable variables = {}'.format(len(model.trainable_variables)))
variables_names = [v.name for v in model.trainable_variables]
for k in variables_names:
    print("Variable: ", k)

## Train the model


In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    patience=3,
    restore_best_weights=True,
    verbose=1
)

In [None]:
epochs = 25 
f1_callback = F1ScoreCallback(val_generator)
history = model.fit(train_generator, epochs=epochs, validation_data=val_generator, callbacks=[f1_callback, early_stopping])

In [None]:
results = model.evaluate(test_generator, verbose=1)
print("Test Loss, Test Accuracy, Test F1 Score:", results)
#za v4 e ova


In [None]:
weights_name = f"{user}_per_user_weights_v1.weights.h5"
model.save_weights(weights_name)

In [None]:
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots(figsize=(10, 6))

epochs = range(1, len(history.history['accuracy']) + 1)

ax1.plot(epochs, history.history['accuracy'], 'b-', label='Train Accuracy')
ax1.plot(epochs, history.history['val_accuracy'], 'g-', label='Validation Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy', color='b')
ax1.tick_params(axis='y', labelcolor='b')

ax2 = ax1.twinx()
ax2.plot(epochs, history.history['loss'], 'r--', label='Train Loss')
ax2.plot(epochs, history.history['val_loss'], 'm--', label='Validation Loss')
ax2.set_ylabel('Loss', color='r')
ax2.tick_params(axis='y', labelcolor='r')

lines_1, labels_1 = ax1.get_legend_handles_labels()
lines_2, labels_2 = ax2.get_legend_handles_labels()
ax1.legend(lines_1 + lines_2, labels_1 + labels_2, loc='center right')
plt.savefig(base_dir+"train_val_loss", dpi=300, bbox_inches='tight')

plt.title('Model performance during training')
plt.savefig(f'training_plot_user_{user}.png', dpi=300, bbox_inches='tight')
plt.show()
plt.close() 


## Save the model to a file

In [None]:
saved_model_dir = user + 'saved_model_per_user' 
tf.saved_model.save(model, saved_model_dir)


# confusion matrix


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

In [None]:

def analyze_confusion_pairs(cm, class_names, top_n=10):
    print(f"\nMost Confused Character Pairs (Top {top_n}):")
    
    confusion_pairs = []
    for i in range(len(class_names)):
        for j in range(len(class_names)):
            if i != j and cm[i][j] > 0:
                confusion_pairs.append((class_names[i], class_names[j], cm[i][j]))
    
    confusion_pairs.sort(key=lambda x: x[2], reverse=True)
    
    for i, (true_class, pred_class, count) in enumerate(confusion_pairs[:top_n]):
        print(f"{i+1:2d}. '{true_class}' confused as '{pred_class}'")

In [None]:
def plot_confusion_matrix_and_report(model, data_generator):
    y_true = []
    y_pred = []
    
    print("Conf  matrix...")
    for i in range(len(data_generator)):
        x_batch, y_batch = data_generator[i]
        preds = model.predict(x_batch, verbose=0) 
        y_true.extend(np.argmax(y_batch, axis=1))
        y_pred.extend(np.argmax(preds, axis=1))
    
    class_names = list(data_generator.class_indices.keys())
    class_names = [class_names[i] for i in range(len(class_names))]
    
    cm = confusion_matrix(y_true, y_pred)
    
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    plt.figure(figsize=(15, 15))
    plt.imshow(cm_normalized, interpolation='nearest', cmap='Blues')
    tick_marks = np.arange(len(class_names))
    class_names = [name.replace('lowercase_', '') for name in class_names]
    plt.xticks(tick_marks, class_names, rotation=0, fontsize=9)
    plt.yticks(tick_marks, class_names, fontsize=9)

    thresh = cm_normalized.max() / 2.
    for i in range(cm_normalized.shape[0]):
        for j in range(cm_normalized.shape[1]):
            val = cm_normalized[i, j]
            if val > 0: 
                text_val = '1' if np.isclose(val, 1.0) else f'{val:.2f}'
                plt.text(j, i, text_val,
                         horizontalalignment="center",
                         verticalalignment="center",
                         color="white" if val > thresh else "black",
                         fontsize=6,
                         fontweight='bold')

    plt.xlabel('Predicted Label', fontsize=12, fontweight='bold')
    plt.ylabel('True Label', fontsize=12, fontweight='bold') 
    plt.savefig(confusion_matrix_name, dpi=300, bbox_inches='tight')
    # plt.tight_layout()
    plt.show()
    return cm_normalized
    



In [None]:
results = model.evaluate(test_generator)
cm = plot_confusion_matrix_and_report(model, test_generator)

In [None]:
analyze_confusion_pairs(cm, list(test_generator.class_indices.keys()), top_n=20)