In [1]:
import os
from pathlib import Path

if Path.cwd().name != "DogEmotions-ImageClassification-TensorFlow":
    ROOT = Path.cwd().parent
    os.chdir(ROOT)
    print(ROOT)

/home/huy/Workspace/PythonProject/DogEmotions-ImageClassification-TensorFlow


In [2]:
import pandas as pd
import numpy as np
import tensorflow as tf
import cv2
import src.logging as log
import shutil

from typing import Any, List, Tuple, Literal
# from tqdm.autonotebook import tqdm, trange
from tensorflow.python.data.ops.dataset_ops import DatasetV2
# from collections import deque
# from keras.utils import to_categorical

2024-04-22 22:49:09.224140: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-22 22:49:09.264003: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-22 22:49:09.264044: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-22 22:49:09.265346: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-22 22:49:09.272062: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-22 22:49:09.272958: I tensorflow/core/platform/cpu_feature_guard.cc:1

In [3]:
demo = Path(ROOT / "Dataset/train/sad/sad0.jpg")
str(demo.parent)

'/home/huy/Workspace/PythonProject/DogEmotions-ImageClassification-TensorFlow/Dataset/train/sad'

In [4]:
# Load data


def get_label(path: Path) -> str:
    return path.parent.name


def one_hot_encode(label: str) -> int:
    map_label = {"angry": 0, "fear": 1, "happy": 2, "neutral": 3, "sad": 4, "surprise": 5}
    label = map_label[label]
    return tf.one_hot(label, depth=6)


def get_image(path: Path, img_size=(32, 32)) -> np.ndarray[float]:
    img = cv2.imread(str(path))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = cv2.resize(img, dsize=img_size, interpolation=cv2.INTER_LINEAR)
    img = img.astype(np.float32) / 255.0
    img = np.expand_dims(img, axis=-1)
    return img


def get_dataset(
    path: Path,
    img_size: Tuple[int, int] = (32, 32),
    train_ratio: float = 0.8,
    test_ratio: float = 0.1,
    val_ratio: float = 0.1,
) -> DatasetV2:

    log.info(f"Loading dataset from: {path}")
    list_labels = os.listdir(path)

    list_images = []
    try:
        for label in list_labels:
            label_dir = path / label
            path_images = [
                label_dir / f for f in os.listdir(label_dir) if f.endswith(".jpg")
            ]
            list_images.extend(path_images)
        images = list(map(get_image, list_images))
        labels = list(map(get_label, list_images))
        labels = list(map(one_hot_encode, labels))
    except Exception as e:
        log.exception(e)

    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    N = len(dataset)
    dataset = dataset.shuffle(buffer_size=N + 1)

    train_size = int(N * train_ratio)
    test_size = int(N * test_ratio)
    val_size = int(N * val_ratio)
    train_ds = dataset.take(train_size)
    test_ds = dataset.skip(train_size).take(test_size)
    val_ds = dataset.skip(train_size + test_size).take(val_size)

    return train_ds, val_ds, test_ds

In [5]:
train_ds, val_ds, test_ds = get_dataset(ROOT / "Dataset/emotion")

print(f"Number of train samples: {len(train_ds)}")
print(f"Number of val samples: {len(val_ds)}")
print(f"Number of test samples: {len(test_ds)}")

[32m2024-04-22 22:49:10.600[0m | [1mINFO    [0m | [36msrc.logging[0m:[36minfo[0m:[36m16[0m - [1mLoading dataset from: /home/huy/Workspace/PythonProject/DogEmotions-ImageClassification-TensorFlow/Dataset/emotion[0m
2024-04-22 22:49:15.407443: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 144752640 exceeds 10% of free system memory.


Number of train samples: 28272
Number of val samples: 3534
Number of test samples: 3534


In [6]:
if os.path.exists("data_compression"):
    shutil.rmtree("data_compression")

train_ds.save("data_compression/train", compression="GZIP")
val_ds.save("data_compression/val", compression="GZIP")
test_ds.save("data_compression/test", compression="GZIP")

2024-04-22 22:49:44.511814: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 144752640 exceeds 10% of free system memory.
2024-04-22 22:49:50.683861: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 144752640 exceeds 10% of free system memory.
2024-04-22 22:49:51.556495: W external/local_tsl/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 144752640 exceeds 10% of free system memory.


In [7]:
def load_dataset(ds_path: Path) -> DatasetV2:
    # with open(ds_path / "element_spec", "rb") as in_:
    #     es = pickle.load(in_)
    dataset = tf.data.Dataset.load(str(ds_path), compression="GZIP")
    return dataset

In [8]:
train_ds = load_dataset("data_compression/train")
val_ds = load_dataset("data_compression/val")
test_ds = load_dataset("data_compression/test")
log.info(f"Number of train samples: {len(train_ds)}")
log.info(f"Number of val samples: {len(val_ds)}")
log.info(f"Number of test samples: {len(test_ds)}")

[32m2024-04-22 22:49:52.461[0m | [1mINFO    [0m | [36msrc.logging[0m:[36minfo[0m:[36m16[0m - [1mNumber of train samples: 28272[0m
[32m2024-04-22 22:49:52.462[0m | [1mINFO    [0m | [36msrc.logging[0m:[36minfo[0m:[36m16[0m - [1mNumber of val samples: 3534[0m
[32m2024-04-22 22:49:52.463[0m | [1mINFO    [0m | [36msrc.logging[0m:[36minfo[0m:[36m16[0m - [1mNumber of test samples: 3534[0m


In [9]:
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.batch(batch_size=2**5).cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.batch(batch_size=2**5).cache().prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.batch(batch_size=2**5).cache().prefetch(buffer_size=AUTOTUNE)

In [10]:
from tensorflow import keras
from keras import Sequential, Model
from keras.layers import (
    Dense,
    Dropout,
    Activation,
    Flatten,
    Input,
    Conv2D,
    MaxPooling2D,
    BatchNormalization,
)
from keras.optimizers import SGD, Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler

from keras.metrics import CategoricalAccuracy, Recall, Precision, F1Score

In [11]:
def CNN_model(img_height=32, img_width=32, initial_filter=2**5) -> Sequential:
    model = Sequential()
    model.add(Input(shape=(img_height, img_width, 1)))
    # First
    model.add(
        Conv2D(
            filters=initial_filter,
            kernel_size=(3, 3),
            activation=tf.nn.relu,
        )
    )
    model.add(MaxPooling2D(pool_size=(2, 2)))
    # Second
    model.add(
        Conv2D(filters=initial_filter * 2, kernel_size=(3, 3), activation=tf.nn.relu)
    )
    model.add(MaxPooling2D(pool_size=(2, 2)))
    # Third
    model.add(
        Conv2D(filters=initial_filter * 2**2, kernel_size=(3, 3), activation=tf.nn.relu)
    )
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(128, activation=tf.nn.relu))
    model.add(
        Dense(
            6,
            activation=tf.nn.softmax,
            kernel_regularizer=tf.keras.regularizers.l1(0.004),
            activity_regularizer=tf.keras.regularizers.l2(0.004),
        )
    )

    model.compile(
        optimizer=Adam(),
        loss="categorical_crossentropy",
        metrics=[
            CategoricalAccuracy(),
            Precision(),
            Recall(),
            F1Score(average="weighted"),
        ],
    )

    return model


def MLP_model() -> Sequential:
    model = Sequential(
        [
            Input(shape=(32, 32, 1)),
            Dense(128, activation=tf.nn.relu),
            BatchNormalization(),
            Dense(32, activation=tf.nn.relu),
            BatchNormalization(),
            Dropout(0.2),
            Flatten(),
            Dense(
                6,
                activation="softmax",
                kernel_regularizer=tf.keras.regularizers.l1(0.004),
                activity_regularizer=tf.keras.regularizers.l2(0.004),
            ),
        ]
    )

    model.compile(
        optimizer=Adam(),
        loss="categorical_crossentropy",
        metrics=[
            keras.metrics.CategoricalAccuracy(name="categorical_accuracy"),
            keras.metrics.Precision(name="precision"),
            keras.metrics.Recall(name="recall"),
            F1Score(average="weighted"),
        ],
    )
    return model

In [12]:
cnn = CNN_model()
mlp = MLP_model()
cnn.summary()
mlp.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 30, 30, 32)        320       
                                                                 
 max_pooling2d (MaxPooling2  (None, 15, 15, 32)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 13, 13, 64)        18496     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 6, 6, 64)          0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 4, 4, 128)         73856     
                                                                 
 max_pooling2d_2 (MaxPoolin  (None, 2, 2, 128)         0

In [13]:
cnn._name = "cnn_model"
mlp._name = "mlp_model"

In [14]:
f1_save_best_callbacks = ModelCheckpoint(
    ROOT / "weights" / cnn.name / "best.weights.h5",
    monitor="val_f1_score",
    save_best_only=True,
    save_weights_only=True,
    mode="max",
)

f1_save_last_callbacks = ModelCheckpoint(
    ROOT / "weights" / cnn.name / "last.weights.h5",
    monitor="val_f1_score",
    save_weights_only=True,
    mode="max",
    verbose=0,
)

f1_early_stopping_callback = EarlyStopping(
    monitor="val_f1_score", patience=3, mode="max", restore_best_weights=True, verbose=1
)


def lr_scheduler(epoch):
    k = 0.1
    initial_lr = 1e-3
    return initial_lr * np.exp(-k * epoch)

In [15]:
cnn.fit(
    train_ds,
    epochs=10,
    validation_data=val_ds,
    callbacks=[
        f1_save_last_callbacks,
        f1_early_stopping_callback,
        f1_save_best_callbacks,
        LearningRateScheduler(lr_scheduler),
    ],
)
mlp.fit(
    train_ds,
    epochs=10,
    validation_data=val_ds,
    callbacks=[
        f1_save_last_callbacks,
        f1_early_stopping_callback,
        f1_save_best_callbacks,
        LearningRateScheduler(lr_scheduler),
    ],
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x717ff06c5950>