In [1]:
import cv2
import numpy as np
import pandas as pd
from keras.models import load_model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Input
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from loguru import logger

class FineTuningModelUsingMelSpectrogram:
    """
    Mô hình nhận dạng cảm xúc trong giọng nói sử dụng CNN.
    """
    
    def __init__(self, model_path, train_image_path, test_image_path, val_image_path, learning_rate=0.0001):
        """
        Khởi tạo các thuộc tính cần thiết cho đối tượng FineTuningModelUsingMelSpectrogram.
        """
        self.model = self.load_model(model_path)

    def read_and_process_image(self, image_path, target_size=(128, 128)):
        """
        Đọc và xử lý hình ảnh từ đường dẫn cho trước.
        """
        try:
            img = cv2.imread(image_path)
            if img is None:
                logger.error(f"Không thể đọc hình ảnh từ đường dẫn: {image_path}")
                return None
            img = cv2.resize(img, target_size)
            img = img.astype('float32') / 255.0
            return img
        except Exception as e:
            logger.error(f"Lỗi khi đọc và xử lý hình ảnh từ đường dẫn: {image_path}, Lỗi: {str(e)}")
            return None

    def process_data(self, csv_file):
        """
        Xử lý dữ liệu từ file CSV chứa đường dẫn hình ảnh và nhãn.
        """
        encoder = OneHotEncoder()
        df = pd.read_csv(csv_file)
        X = []
        y = df['label']
        
        for index, row in df.iterrows():
            image_path = row['file_path']
            img = self.read_and_process_image(image_path)
            if img is not None:
                X.append(img)
        
        X_tensor = np.array(X)
        y = encoder.fit_transform(np.array(y).reshape(-1, 1)).toarray()
        return X_tensor, y
    
    def compile_model(self):
        """
        Thiết lập và biên dịch mô hình CNN.
        """
        optimizer = Adam(learning_rate=self.learning_rate)
        self.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        
    def load_model(self, model_path):
        """
        Tải mô hình đã huấn luyện từ file.
        """
        self.model = load_model(model_path)
        self.compile_model()
    
    def fine_tune(self, X_train, y_train, X_val, y_val, model_path, n_mels, epochs=50, batch_size=64):
        """
        Fine-tune mô hình với dữ liệu mới.
        """
        self.load_model(model_path)
        
        cnn_model_checkpoint = ModelCheckpoint(f'fine_tuned_cnn_model_weights_using_mel_spectrogram_{n_mels}.h5', monitor='val_accuracy', save_best_only=True)
        early_stop = EarlyStopping(monitor='val_loss', mode='min', patience=8, restore_best_weights=True)
        lr_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience=3, verbose=1, factor=0.5, min_lr=0.00001)
        
        history = self.model.fit(X_train, y_train, epochs=epochs, validation_data=(X_val, y_val), batch_size=batch_size, callbacks=[cnn_model_checkpoint, early_stop, lr_reduction])
        self.model.load_weights(f'fine_tuned_cnn_model_weights_using_mel_spectrogram_{n_mels}.h5')
        return history
    
    def balanced_resampling(self, X, y):
        """
        Xử lý lệch dữ liệu bằng cách kết hợp xóa ngẫu nhiên các mẫu từ các lớp dư thừa và thêm ngẫu nhiên các mẫu cho các lớp thiếu hụt.
        """
        # Đếm số lượng mẫu cho mỗi lớp
        counter = Counter(np.argmax(y, axis=1))
        logger.info(f"Original class distribution: {counter}")

        # Tìm lớp có ít mẫu nhất và lớp có nhiều mẫu nhất
        min_samples = min(counter.values())
        max_samples = max(counter.values())

        # Khởi tạo danh sách lưu trữ các chỉ số của các mẫu được giữ lại
        indices_to_keep = []
        indices_to_add = []

        for label in counter.keys():
            # Lấy tất cả các chỉ số của các mẫu thuộc lớp này
            class_indices = np.where(np.argmax(y, axis=1) == label)[0]
            
            # Xóa ngẫu nhiên nếu số lượng mẫu lớn hơn min_samples
            if len(class_indices) > min_samples:
                np.random.shuffle(class_indices)
                indices_to_keep.extend(class_indices[:min_samples])
            # Thêm ngẫu nhiên nếu số lượng mẫu nhỏ hơn max_samples
            elif len(class_indices) < max_samples:
                indices_to_keep.extend(class_indices)
                indices_to_add.extend(resample(class_indices, replace=True, n_samples=max_samples - len(class_indices), random_state=42))

        # Lấy các mẫu được giữ lại từ X và y
        indices_to_keep.extend(indices_to_add)
        X_balanced = X[indices_to_keep]
        y_balanced = y[indices_to_keep]

        # Đếm lại số lượng mẫu cho mỗi lớp sau khi cân bằng
        counter_balanced = Counter(np.argmax(y_balanced, axis=1))
        logger.info(f"Balanced class distribution: {counter_balanced}")

        return X_balanced, y_balanced

    def plot_confusion_matrix(self, X_test, y_test, labels):
        """
        Vẽ ma trận nhầm lẫn cho bộ dữ liệu kiểm tra.
        """
        # Dự đoán nhãn cho bộ dữ liệu kiểm tra
        y_pred = self.model.predict(X_test)
        y_pred_classes = np.argmax(y_pred, axis=1)
        y_true_classes = np.argmax(y_test, axis=1)

        # Tính toán ma trận nhầm lẫn
        cm = confusion_matrix(y_true_classes, y_pred_classes)

        # Vẽ ma trận nhầm lẫn sử dụng seaborn
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')
        plt.title('Confusion Matrix')
        plt.show()

    def plot_training_history(self, history):
        """
        Vẽ biểu đồ lịch sử huấn luyện (accuracy và loss).
        """
        # Plot training & validation accuracy values
        plt.figure(figsize=(12, 4))
        plt.subplot(1, 2, 1)
        plt.plot(history.history['accuracy'])
        plt.plot(history.history['val_accuracy'])
        plt.title('Model accuracy')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')

        # Plot training & validation loss values
        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')

        plt.show()
