In [None]:
!pip uninstall umap -y
!pip install umap-learn

In [None]:
import os
import random
import numpy as np

import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt

import sklearn.model_selection as skms
import sklearn.preprocessing as skp
import sklearn.utils as sku
import sklearn.decomposition as skd
import sklearn.metrics as skm

from sklearn.manifold import TSNE

import cv2

# Module for k-fold
from sklearn.model_selection import KFold

import umap.umap_ as umap

###Important : prepare data
If you use google colab
* please set `USE_GOOGLE_COLAB` to true
* Please set the `COLAB_WORKING_PATH` variable to the path of the data zip folder.
* Please set the `DATA_CROP_PATH` variable to the path of the folder containing the pre-processed neuchatel dataset.

In [None]:
# GOOGLE COLAB
USE_GOOGLE_COLAB = True # Are you using Google Colab ?
COLAB_WORKING_PATH = "/content/drive/My Drive/Colab/Botanist" if USE_GOOGLE_COLAB else "."

# Mount on Google Drive
if USE_GOOGLE_COLAB:
  from google.colab import drive
  drive.mount('/content/drive/', force_remount=True)

# PATHS
DATASET_ZIP_PATH = f"{COLAB_WORKING_PATH}/herbier.zip" # Path to zipped data
DATASET_PATH = "/content/data/"  if USE_GOOGLE_COLAB else "./"


WORKDIR = f"{DATASET_PATH}herbier"
WORD_DATA_PATH = f"{WORKDIR}/data_public/words/"
METADATA_PATH = f"{WORKDIR}/data_public/ascii/words.txt"
DATA_CROP_PATH = f"{COLAB_WORKING_PATH}/data_crop/"
if USE_GOOGLE_COLAB:
  DATA_CROP_PATH = "/content/drive/My Drive/Colab/Botanist/data_crop/"

INFERENCE_DATASET_PATH = f"{COLAB_WORKING_PATH}/data_crop/"

# Create our data folder, unzip the data
if USE_GOOGLE_COLAB:
  if not os.path.exists(WORKDIR):
    !mkdir -p $DATASET_PATH
    !unzip "$DATASET_ZIP_PATH" -d $DATASET_PATH

# AE
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
N_CLASSES = 2

# Choose random classes
all_dirs = os.listdir(WORD_DATA_PATH)
selected_top_dirs = random.sample(all_dirs, N_CLASSES)
sub_dirs = {top_dir: os.listdir(os.path.join(WORD_DATA_PATH, top_dir)) for top_dir in selected_top_dirs}
random_subdirs = {top_dir: random.choice(sub_dirs[top_dir]) for top_dir in selected_top_dirs}

CLASSES = list(random_subdirs.values())
print(f"Selected classes: {CLASSES}")

FLATTEN_LAYER_NAME = 'flattened'

DEBUG = True

In [None]:
def load_words_data(data_path, metadata_path, selected_writers = []):
    if selected_writers == []:
        raise ValueError("selected_writers must be a non-empty list of writer IDs")

    data = []

    with open(metadata_path, 'r') as file:
        for line in file:
            if not line.startswith("#"):
                components = line.strip().split(' ')
                word_id = components[0]

                parts = word_id.split('-')
                writer_id = '-'.join(parts[:2])

                if writer_id in selected_writers:
                    image_subfolder = parts[0]
                    image_filename = f"{word_id}.png"
                    image_path = os.path.join(data_path, image_subfolder, writer_id, image_filename)

                    if os.path.exists(image_path):
                        try:
                            img = tf.io.read_file(image_path)
                            img = tf.image.decode_png(img)
                            data.append({
                                'image_path': image_path,
                                'writer_id': writer_id,
                                'image_array': img
                            })
                        except tf.errors.InvalidArgumentError:
                            print(f"Image not found for word ID: {word_id} at {image_path}")
                    else:
                        print(f"Image not found for word ID: {word_id} at {image_path}")

    return data

def load_new_class_data(data_crop_path):
    new_class_data = []

    class_dirs = os.listdir(data_crop_path)

    # Select 2 random folders
    selected_dirs = random.sample(class_dirs, 2)

    for class_dir in selected_dirs:
        class_dir_path = os.path.join(data_crop_path, class_dir)
        image_files = os.listdir(class_dir_path)

        for image_file in image_files:
            image_path = os.path.join(class_dir_path, image_file)

            try:
                img = tf.io.read_file(image_path)
                img = tf.image.decode_png(img)
                new_class_data.append({
                    'image_path': image_path,
                    'writer_id': class_dir,  # Assuming folder name is the class name
                    'image_array': img
                })
            except tf.errors.InvalidArgumentError:
                print(f"Image not found for class {class_dir} at {image_path}")

    return new_class_data

words_data = load_words_data(WORD_DATA_PATH, METADATA_PATH, selected_writers=CLASSES)

new_classes_data = load_new_class_data(DATA_CROP_PATH)
words_data.extend(new_classes_data)

N_CLASSES = N_CLASSES + 2

images = [entry['image_array'] for entry in words_data]
labels = [entry['writer_id'] for entry in words_data]

X_train, X_test, y_train, y_test = skms.train_test_split(np.array(images), np.array(labels), test_size=0.2, random_state=42)


def plot_images(images, labels, num=5, class_num=N_CLASSES):
    plt.figure(figsize=(10,10))
    for i in range(num):
        plt.subplot(class_num,num,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i], cmap=plt.cm.binary)
        plt.xlabel(labels[i])
    plt.show()

def plot_random_samples(classes, num=5):
  for class_name in classes:
      class_indices = np.where(y_train == class_name)[0]
      size = min(num, len(class_indices))
      random_indices = np.random.choice(class_indices, size=size, replace=False)
      random_images = X_train[random_indices]
      plot_images(random_images, [class_name] * num)



if DEBUG:
  print(f"Loaded {len(words_data)} words.")
  for entry in words_data[:5]:
      print(f"  Writer ID: {entry['writer_id']}; image shape: {entry['image_array'].shape}")

  print("number of writers: ", len(set([entry['writer_id'] for entry in words_data])))

  plot_random_samples(CLASSES)

**Pre-processing**

In [None]:
def preprocess_data(data):
    labels = []
    images = []

    for entry in data:
        # Resize the image while preserving aspect ratio
        img = np.array(entry['image_array'])
        old_size = img.shape[:2]

        ratio = float(IMAGE_HEIGHT)/old_size[0]
        new_size = tuple([int(x*ratio) for x in old_size])

        img = cv2.resize(img, (new_size[1], new_size[0]))

        # Ignore images that are too narrows
        if new_size[1] < IMAGE_WIDTH:
          continue;

        # Crop images that are too wide
        if new_size[1] > IMAGE_WIDTH:
            start_x = (new_size[1] - IMAGE_WIDTH) // 2
            img = img[:, start_x:start_x + IMAGE_WIDTH]
            new_size = (new_size[0], IMAGE_WIDTH)

        img = img.astype('float32') / 255.0

        # Ensure dimensions format is correct: (sample_n, width, height, channels)
        img = np.expand_dims(img, axis=-1)
        delta_w = IMAGE_WIDTH - new_size[1]
        delta_h = IMAGE_HEIGHT - img.shape[0]
        delta_w = IMAGE_WIDTH - img.shape[1]
        padding = ((0, delta_h), (0, delta_w), (0, 0))
        img = np.pad(img, padding, 'constant')

        images.append(img)
        labels.append(entry['writer_id'])

    return np.array(images), np.array(labels)


images, labels = preprocess_data(words_data)
X_train, X_test, y_train, y_test = skms.train_test_split(images, labels, test_size=0.2, random_state=42)

if DEBUG:
  print(f"X_train: {X_train.shape}; y_train: {y_train.shape}")
  print(f"X_test: {X_test.shape}; y_test: {y_test.shape}")
  plot_random_samples(CLASSES)

In [None]:
# data augmentation

data_generator = keras.preprocessing.image.ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=False,
    fill_mode='nearest'
)

**Kfold**

In [None]:
# Define number of splits
n_splits = 5

# Create Kfold instance
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

**Set function to use kfold**

In [None]:
def evaluate_model(n_neighbors, n_neighbors_list, min_dist, min_dist_list, class_labels, ax, features_standardized):
      reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=32, metric='euclidean')
      embedding = reducer.fit_transform(np.nan_to_num(features_standardized))

      sc = ax.scatter(embedding[:, 0], embedding[:, 1],
                      c=class_labels, edgecolor='none', alpha=0.5,
                      cmap=plt.cm.get_cmap('Accent', N_CLASSES))
      ax.set_xlabel('UMAP component 1')
      ax.set_ylabel('UMAP component 2')
      ax.set_title(f'n_neighbors={n_neighbors}, min_dist={min_dist}')

      if n_neighbors == n_neighbors_list[-1] and min_dist == min_dist_list[-1]:
          plt.colorbar(sc, ax=ax)

def train_and_evaluate(encoder_model, X_train, y_train, X_test, y_test):
  BATCH_SIZE = 8 # fine tuned
  EPOCHS = 100

  class_weights = sku.compute_class_weight(
      class_weight='balanced',
      classes=np.unique(integer_class_labels),
      y=integer_class_labels
  )
  class_weights_dict = {i : weight for i, weight in enumerate(class_weights)}

  early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
  autoencoder.fit(X_train, X_train,  # input and output are the same for an autoencoder
                  epochs=EPOCHS,
                  batch_size=BATCH_SIZE,
                  shuffle=True,
                  validation_data=(X_test, X_test))

  ############

  # Extract features
  encoded_features = encoder_model.predict(X_train)

  # Standardize the features
  scaler = skp.StandardScaler()
  encoded_features_standardized = scaler.fit_transform(encoded_features.reshape(len(encoded_features), -1))

  ############

  # Standardize the features
  # Now, use the standardized features with UMAP
  n_neighbors_list = [10, 20, 30]
  min_dist_list = [0.0, 0.1, 0.2]

  fig, axes = plt.subplots(len(n_neighbors_list), len(min_dist_list), figsize=(15, 12))

  axes = axes.flatten()

  for idx, (n_neighbors, min_dist) in enumerate([(x, y) for x in n_neighbors_list for y in min_dist_list]):
      evaluate_model(n_neighbors, n_neighbors_list, min_dist, min_dist_list, integer_class_labels, axes[idx], encoded_features_standardized)

  plt.tight_layout()
  plt.show()

**Define model**

In [None]:
# Encoder
input_img = keras.layers.Input(shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 1)) # adapt this if using `channels_first` image data format

x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = keras.layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
encoded = keras.layers.MaxPooling2D((2, 2), padding='same', name='encoded_layer')(x)

# Decoder
x = keras.layers.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
x = keras.layers.UpSampling2D((2, 2))(x)
x = keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = keras.layers.UpSampling2D((2, 2))(x)
x = keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = keras.layers.UpSampling2D((2, 2))(x) # Additional upsampling layer
decoded = keras.layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)


# Autoencoder model
autoencoder = keras.Model(input_img, decoded)

if DEBUG:
    autoencoder.summary()

autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# Create a model to retrieve the encoded features
encoder_model = keras.Model(inputs=autoencoder.input, outputs=autoencoder.get_layer('encoded_layer').output)

**Use kfold**

In [None]:
# Execute kfold
for fold, (train_index, test_index) in enumerate(kf.split(images), 1):

    # encode labels
    label_encoder = skp.LabelEncoder()
    integer_encoded_labels = label_encoder.fit_transform(labels)
    one_hot_encoded_labels = keras.utils.to_categorical(integer_encoded_labels)

    X_train, X_test = images[train_index], images[test_index]
    y_train, y_test = one_hot_encoded_labels[train_index], one_hot_encoded_labels[test_index]

    print(f"\nFold {fold} - Training set: X_train shape = {X_train.shape}, y_train shape = {y_train.shape}")
    print(f"Fold {fold} - Testing set: X_test shape = {X_test.shape}, y_test shape = {y_test.shape}")

    integer_class_labels = np.argmax(y_train, axis=1)

    # Train and test modele
    train_and_evaluate(encoder_model, X_train, y_train, X_test, y_test)

**Inference**

In [None]:
from PIL import Image
from sklearn.preprocessing import LabelEncoder

dataset_path = DATA_CROP_PATH

data = []
class_dirs = os.listdir(dataset_path)

# Select 2 random folders
selected_dirs = random.sample(class_dirs, 2)

print("Selected classes :", selected_dirs)

# Fetch dataset image
for class_dir in selected_dirs:
    class_dir_path = os.path.join(dataset_path, class_dir)

    image_files = os.listdir(class_dir_path)
    for image_file in image_files:
        image_path = os.path.join(class_dir_path, image_file)

        image = Image.open(image_path)
        image_array = np.array(image)

        entry = {'image_array': image_array, 'writer_id': class_dir}
        data.append(entry)


# Prepare data
images, labels = preprocess_data(data)

le = LabelEncoder()
labels = le.fit_transform(labels)

X = np.array(images)
y = np.array(labels)

# Prediction
predictions = autoencoder.predict(X)

predicted_class_labels = np.argmax(predictions, axis=1)
true_class_labels = y

##############################

# Extract features
encoded_features = encoder_model.predict(X)

# Standardize the features
scaler = skp.StandardScaler()
encoded_features_standardized = scaler.fit_transform(encoded_features.reshape(len(encoded_features), -1))

# Standardize the features
# Now, use the standardized features with UMAP
n_neighbors_list = [10, 20, 30]
min_dist_list = [0.0, 0.1, 0.2]

fig, axes = plt.subplots(len(n_neighbors_list), len(min_dist_list), figsize=(15, 12))

axes = axes.flatten()

for idx, (n_neighbors, min_dist) in enumerate([(x, y) for x in n_neighbors_list for y in min_dist_list]):
    evaluate_model(n_neighbors, n_neighbors_list, min_dist, min_dist_list, true_class_labels, axes[idx], encoded_features_standardized)

plt.tight_layout()
plt.show()

**Model evaluation**

In [None]:
# Plot 10 random images and their corresponding labels
indices = np.random.choice(range(len(X)), size=10, replace=False)

fig, axes = plt.subplots(2, 5, figsize=(15, 6))

for i, idx in enumerate(indices):
    ax = axes[i//5, i%5]

    ax.imshow(X[idx], cmap=plt.cm.binary)

    true_label = le.inverse_transform([y[idx]])[0]
    predicted_label = le.inverse_transform([true_class_labels[idx]])[0]
    title_color = 'green' if true_label == predicted_label else 'red'

    ax.set_title(f"True: {true_label}\nPredicted: {predicted_label}", color=title_color)
    ax.axis('off')

plt.tight_layout()
plt.show()