In [None]:
# --- Imports ---
import csv
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import os
import math

# --- Constants and Configuration ---
RANDOM_SEED = 42
DATASET_PATH = 'model/keypoint_classifier/keypoint.csv'
MODEL_DIR = 'model/keypoint_classifier_modern' # Thư mục mới cho model hiện đại
MODEL_SAVE_PATH = os.path.join(MODEL_DIR, 'keypoint_classifier_modern.keras')
TFLITE_SAVE_PATH = os.path.join(MODEL_DIR, 'keypoint_classifier_modern.tflite')

# Đảm bảo thư mục lưu model tồn tại
os.makedirs(MODEL_DIR, exist_ok=True)

NUM_CLASSES = 26 # Số lớp đầu ra
INPUT_SHAPE = (21 * 2,) # Kích thước input (21 keypoints * 2 coords)

# Hyperparameters
INITIAL_LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4 # Weight decay cho AdamW
LABEL_SMOOTHING_FACTOR = 0.1 # Hệ số làm mềm nhãn
EPOCHS = 500 # Số epochs tối đa (có thể dừng sớm)
BATCH_SIZE = 128

print("Imports và cấu hình đã sẵn sàng.")
print(f"TensorFlow Version: {tf.__version__}")
print(f"Lưu model vào: {MODEL_DIR}")

In [None]:
# --- Load Data ---
print(f"Đang tải dataset từ: {DATASET_PATH}")
if not os.path.exists(DATASET_PATH):
    print(f"Lỗi: Không tìm thấy file dataset tại {DATASET_PATH}")
    # Bạn có thể dừng notebook ở đây nếu file không tồn tại
    raise FileNotFoundError(f"Dataset file not found at {DATASET_PATH}")

# Tải features (cột 1 đến 42)
X_dataset = np.loadtxt(DATASET_PATH, delimiter=',', dtype='float32', usecols=list(range(1, INPUT_SHAPE[0] + 1)))
# Tải labels dạng số nguyên (cột 0)
y_dataset_int = np.loadtxt(DATASET_PATH, delimiter=',', dtype='int32', usecols=(0))

print(f"Dataset đã tải: X shape={X_dataset.shape}, y_int shape={y_dataset_int.shape}")
print(f"Số lớp thực tế trong dữ liệu (labels): {len(np.unique(y_dataset_int))}")
# Cập nhật NUM_CLASSES nếu cần, dựa trên dữ liệu thực tế
# NUM_CLASSES = len(np.unique(y_dataset_int))
# print(f"Cập nhật NUM_CLASSES = {NUM_CLASSES}")

In [None]:
# --- Split Data ---
print("Đang chia dữ liệu thành tập train/test...")
X_train, X_test, y_train_int, y_test_int = train_test_split(
    X_dataset,
    y_dataset_int,
    train_size=0.75,
    random_state=RANDOM_SEED,
    stratify=y_dataset_int # Giữ tỉ lệ các lớp
)

# --- One-Hot Encode Labels ---
print("Đang thực hiện one-hot encoding cho labels...")
y_train = to_categorical(y_train_int, num_classes=NUM_CLASSES)
y_test = to_categorical(y_test_int, num_classes=NUM_CLASSES)

print(f"Kích thước tập huấn luyện: X={X_train.shape}, y={y_train.shape}")
print(f"Kích thước tập kiểm tra: X={X_test.shape}, y={y_test.shape}")
print("\nNhãn số nguyên mẫu (y_test_int[0]):", y_test_int[0])
print("Nhãn one-hot tương ứng (y_test[0]):", y_test[0])

In [None]:
# --- Build Modern MLP Model ---
print("Đang xây dựng mô hình MLP hiện đại...")
model = tf.keras.models.Sequential([
    tf.keras.layers.Input(shape=INPUT_SHAPE, name='input_layer'),
    tf.keras.layers.Dense(128, name='dense_1', kernel_initializer='he_normal'),
    tf.keras.layers.BatchNormalization(name='bn_1'),
    tf.keras.layers.Activation('gelu', name='gelu_1'),
    tf.keras.layers.Dropout(0.2, name='dropout_1'),

    tf.keras.layers.Dense(64, name='dense_2', kernel_initializer='he_normal'),
    tf.keras.layers.BatchNormalization(name='bn_2'),
    tf.keras.layers.Activation('gelu', name='gelu_2'),
    tf.keras.layers.Dropout(0.3, name='dropout_2'),

    tf.keras.layers.Dense(32, name='dense_3', kernel_initializer='he_normal'),
    tf.keras.layers.BatchNormalization(name='bn_3'),
    tf.keras.layers.Activation('gelu', name='gelu_3'),
    tf.keras.layers.Dropout(0.4, name='dropout_3'),

    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax', name='output_layer')
], name='ModernMLP')

print("Kiến trúc mô hình:")
model.summary()

In [None]:
# --- Define Cosine Decay Learning Rate Schedule ---
print("Đang thiết lập Cosine Decay learning rate schedule...")
def cosine_decay(epoch, lr):
    # Công thức Cosine Decay
    initial_lr = INITIAL_LEARNING_RATE
    total_epochs = EPOCHS
    alpha = 0.0 # Learning rate cuối cùng
    epoch_decay = min(epoch, total_epochs)
    cosine_decay_val = 0.5 * (1 + math.cos(math.pi * epoch_decay / total_epochs))
    decayed_lr = (initial_lr - alpha) * cosine_decay_val + alpha
    # In ra LR mỗi 10 epochs để theo dõi (tùy chọn)
    # if epoch % 10 == 0:
    #     print(f"\nEpoch {epoch+1}: Learning Rate = {decayed_lr:.6f}")
    return decayed_lr

# Tạo callback từ hàm schedule
lr_scheduler_cosine = LearningRateScheduler(cosine_decay, verbose=0)

print("Cosine Decay schedule đã sẵn sàng.")
# Test thử giá trị LR ở vài epoch
# print(f"LR at Epoch 0: {cosine_decay(0, 0):.6f}")
# print(f"LR at Epoch {EPOCHS//2}: {cosine_decay(EPOCHS//2, 0):.6f}")
# print(f"LR at Epoch {EPOCHS}: {cosine_decay(EPOCHS, 0):.6f}")

In [None]:
# --- Compile Model with Modern Settings ---
print("Đang biên dịch mô hình...")
model.compile(
    optimizer=AdamW(learning_rate=INITIAL_LEARNING_RATE, weight_decay=WEIGHT_DECAY),
    loss=CategoricalCrossentropy(label_smoothing=LABEL_SMOOTHING_FACTOR),
    metrics=[CategoricalAccuracy(name='accuracy')]
)

print("Mô hình đã được biên dịch.")

In [None]:
# --- Define Callbacks ---
print("Đang thiết lập các callbacks...")

# Lưu model tốt nhất
cp_callback = ModelCheckpoint(
    filepath=MODEL_SAVE_PATH,
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True,
    mode='max',
    save_weights_only=False
)

# Dừng sớm
es_callback = EarlyStopping(
    monitor='val_loss',
    patience=50, # Tăng patience
    verbose=1,
    mode='min',
    restore_best_weights=True # Khôi phục trọng số tốt nhất
)

# Gom các callbacks lại
callbacks_list = [cp_callback, es_callback, lr_scheduler_cosine]

print("Callbacks đã sẵn sàng:", [cb.__class__.__name__ for cb in callbacks_list])

In [None]:
# --- Train Model ---
print("Bắt đầu huấn luyện mô hình...")
print(f"Số Epochs tối đa: {EPOCHS}, Batch Size: {BATCH_SIZE}")

history = model.fit(
    X_train,
    y_train, # Nhãn one-hot
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_test, y_test), # Nhãn one-hot
    callbacks=callbacks_list,
    verbose=1 # In tiến trình huấn luyện
)

print("\nQuá trình huấn luyện đã hoàn tất.")
# Lưu ý: Model hiện tại trong biến 'model' đã có trọng số tốt nhất
# do 'restore_best_weights=True' trong EarlyStopping.

In [None]:
# --- Evaluate Model ---
print("Đang đánh giá mô hình trên tập test...")

# Sử dụng model hiện tại (đã khôi phục trọng số tốt nhất)
best_model = model

# Đánh giá trên dữ liệu test (nhãn one-hot)
val_loss, val_acc = best_model.evaluate(X_test, y_test, batch_size=BATCH_SIZE, verbose=0)

print(f"\nKết quả đánh giá trên tập Test:")
print(f"  Test Loss: {val_loss:.4f}")
print(f"  Test Accuracy: {val_acc:.4f}")

# (Optional) Vẽ đồ thị learning curves
# plt.figure(figsize=(12, 4))
# plt.subplot(1, 2, 1)
# plt.plot(history.history['loss'], label='Training Loss')
# plt.plot(history.history['val_loss'], label='Validation Loss')
# plt.title('Loss Over Epochs')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.legend()

# plt.subplot(1, 2, 2)
# plt.plot(history.history['accuracy'], label='Training Accuracy')
# plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
# plt.title('Accuracy Over Epochs')
# plt.xlabel('Epochs')
# plt.ylabel('Accuracy')
# plt.legend()
# plt.tight_layout()
# plt.show()

In [None]:
# --- Confusion Matrix and Classification Report ---
print("Đang tạo dự đoán để vẽ Confusion Matrix...")

# Hàm vẽ confusion matrix (định nghĩa lại ở đây cho độc lập)
def print_confusion_matrix(y_true_int, y_pred_int, report=True, class_labels=None):
    if class_labels is None:
      unique_labels = sorted(list(set(y_true_int)))
      class_labels = [f'Class {i}' for i in unique_labels]
      numeric_labels = unique_labels
    else:
      numeric_labels = list(range(len(class_labels)))

    cmx_data = confusion_matrix(y_true_int, y_pred_int, labels=numeric_labels)
    df_cmx = pd.DataFrame(cmx_data, index=class_labels, columns=class_labels)

    plt.figure(figsize=(10, 8))
    sns.heatmap(df_cmx, annot=True, fmt='g', cmap='Blues', square=False)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.yticks(rotation=0)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

    if report:
        print('\nClassification Report:')
        # Sử dụng nhãn số nguyên cho classification_report
        print(classification_report(y_true_int, y_pred_int, labels=numeric_labels, target_names=class_labels, zero_division=0))


# Dự đoán probabilities trên tập test
Y_pred_probabilities = best_model.predict(X_test)
# Lấy index của lớp có xác suất cao nhất -> nhãn dự đoán dạng số nguyên
y_pred_classes_int = np.argmax(Y_pred_probabilities, axis=1)

# Lấy danh sách tên lớp (ví dụ: 'A', 'B', 'C',...) nếu có, nếu không thì dùng số
# class_names = [chr(ord('A') + i) for i in range(NUM_CLASSES)] # Ví dụ nếu là 26 chữ cái
class_names = None # Để hàm tự động tạo 'Class 0', 'Class 1',...

# Sử dụng nhãn test dạng số nguyên (y_test_int) và nhãn dự đoán dạng số nguyên
print_confusion_matrix(y_test_int, y_pred_classes_int, report=True, class_labels=class_names)

In [None]:
# --- Inference Test (Optional) ---
print("\nThực hiện kiểm tra suy luận trên một mẫu...")
sample_index = 5 # Chọn một mẫu bất kỳ
test_sample = np.array([X_test[sample_index]])
true_label_int = y_test_int[sample_index] # Nhãn thật dạng số nguyên

# Dự đoán bằng model Keras
predict_result_keras = best_model.predict(test_sample)
predicted_label_int_keras = np.argmax(np.squeeze(predict_result_keras)) # Nhãn dự đoán dạng số nguyên

print(f"Mẫu kiểm tra index: {sample_index}")
print(f"Input Shape: {test_sample.shape}")
print(f"Output Thô (Probabilities) từ Keras: {np.squeeze(predict_result_keras)}")
print(f"Nhãn Dự đoán (Keras): {predicted_label_int_keras}")
print(f"Nhãn Thật: {true_label_int}")
if predicted_label_int_keras == true_label_int:
    print("==> Dự đoán Chính xác!")
else:
    print("==> Dự đoán Sai.")

In [None]:
# --- Save Final Model for TFLite Conversion ---
# Lưu lại model tốt nhất (best_model) không cần optimizer
print(f"Đang lưu model cuối cùng (không optimizer) vào: {MODEL_SAVE_PATH}")
best_model.save(MODEL_SAVE_PATH, include_optimizer=False)
print("Model đã được lưu.")

In [None]:
# --- Convert to TensorFlow Lite (with Quantization) ---
print("Đang chuyển đổi model sang TFLite với quantization...")

# Load lại model từ file để đảm bảo sử dụng phiên bản đã lưu không có optimizer
# Mặc dù best_model.save đã làm vậy, bước này đảm bảo tính độc lập
try:
    saved_model_for_converter = tf.keras.models.load_model(MODEL_SAVE_PATH)

    # Tạo bộ chuyển đổi từ model Keras
    converter = tf.lite.TFLiteConverter.from_keras_model(saved_model_for_converter)

    # Bật tối ưu hóa mặc định (thường bao gồm dynamic range quantization)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    # Thực hiện chuyển đổi
    tflite_quantized_model = converter.convert()

    # Lưu model TFLite
    print(f"Đang lưu model TFLite đã lượng tử hóa vào: {TFLITE_SAVE_PATH}")
    with open(TFLITE_SAVE_PATH, 'wb') as f:
        f.write(tflite_quantized_model)

    tflite_size = os.path.getsize(TFLITE_SAVE_PATH) / 1024
    print(f"Model TFLite đã lưu thành công. Kích thước: {tflite_size:.2f} KB")

except Exception as e:
    print(f"\nLỗi trong quá trình chuyển đổi TFLite: {e}")
    print("Vui lòng kiểm tra phiên bản TensorFlow và các dependencies.")

In [None]:
# --- TFLite Inference Test ---
print("\nKiểm tra suy luận bằng model TFLite...")

try:
    # Kiểm tra xem file TFLite có tồn tại không
    if not os.path.exists(TFLITE_SAVE_PATH):
        print(f"Lỗi: Không tìm thấy file TFLite tại {TFLITE_SAVE_PATH}")
    else:
        # Khởi tạo TFLite interpreter
        interpreter = tf.lite.Interpreter(model_path=TFLITE_SAVE_PATH)
        interpreter.allocate_tensors() # Cấp phát bộ nhớ

        # Lấy thông tin input/output
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # Chuẩn bị dữ liệu input (sử dụng cùng mẫu test_sample)
        # Đảm bảo đúng kiểu dữ liệu yêu cầu bởi model TFLite
        input_data = test_sample.astype(input_details[0]['dtype'])

        # Đặt dữ liệu vào tensor input
        interpreter.set_tensor(input_details[0]['index'], input_data)

        # Thực hiện suy luận
        interpreter.invoke()

        # Lấy kết quả từ tensor output
        tflite_results = interpreter.get_tensor(output_details[0]['index'])
        # Lấy nhãn dự đoán dạng số nguyên
        tflite_predicted_label_int = np.argmax(np.squeeze(tflite_results))

        print(f"\nMẫu kiểm tra index: {sample_index}")
        print(f"Output Thô (TFLite): {np.squeeze(tflite_results)}")
        print(f"Nhãn Dự đoán (TFLite): {tflite_predicted_label_int}")
        print(f"Nhãn Dự đoán (Keras) : {predicted_label_int_keras}") # Từ cell 11
        print(f"Nhãn Thật             : {true_label_int}") # Từ cell 11

        # So sánh kết quả
        if tflite_predicted_label_int == predicted_label_int_keras:
            print("\n==> Dự đoán TFLite khớp với dự đoán Keras.")
        else:
            print("\n==> Cảnh báo: Dự đoán TFLite khác với dự đoán Keras (có thể do quantization).")

except NameError:
     print("\nLỗi: Biến 'test_sample' hoặc 'predicted_label_int_keras' chưa được định nghĩa.")
     print("Hãy đảm bảo bạn đã chạy cell 'Kiểm tra Suy luận (Inference Test)' trước.")
except Exception as e:
    print(f"\nLỗi trong quá trình kiểm tra TFLite: {e}")


print("\nHoàn thành tất cả các bước!")