In [None]:
import numpy as np
import pandas as pd
import os

# Đọc dữ liệu
directory = "Processed data txt/" # dữ liệu video train đã được tiền xử lý
test_directory = "Processed test/" # dữ liệu video test đã được tiền xử lý

n_frames = 20 # số frame gom vào một record

def train_loader(directory,batch_size=128): # trong trường hợp dữ liệu quá lớn, load từng phần bằng generator
    txt_data = os.listdir(directory)
    folders = [folder for folder in txt_data if not folder.endswith(".txt")]
    X_train = []
    y_train = []
    while True:
        count_label = 0
        for folder in folders:
            sub_directory = os.path.join(directory,folder)
            file_list = os.listdir(sub_directory)
            for file in file_list:
                file_path = os.path.join(sub_directory,file)
                df = pd.read_csv(file_path)
                dataset = df.iloc[:,1:].values
                n_sample = len(dataset)
                for i in range(n_frames, n_sample):
                    X_train.append(dataset[i-n_frames:i,:])
                    y_train.append(count_label)
                    if len(y_train) == batch_size:
                        yield np.array(X_train), np.array(y_train)
                        X_train = []
                        y_train = []
            count_label += 1
            if count_label==22:
                count_label=0

def load_all(directory): # trong trường hợp dữ liệu nhỏ hoặc vừa phải, load toàn bộ để đẩy nhanh tốc độ train
    X_train = []
    y_train = []
    txt_data = os.listdir(directory)
    folders = [folder for folder in txt_data if not folder.endswith(".txt")]
    count_label = 0
    print(folders)
    label_dict = {}
    for folder in folders:
        sub_directory = os.path.join(directory,folder)
        file_list = os.listdir(sub_directory)
        for file in file_list:
            file_path = os.path.join(sub_directory,file)
            df = pd.read_csv(file_path)
            dataset = df.iloc[:,1:].values
            n_sample = len(dataset)
            for i in range(n_frames, n_sample):
                X_train.append(dataset[i-n_frames:i,:])
                y_train.append(count_label)
        label_dict[count_label] = folder
        count_label += 1
    return np.array(X_train), np.array(y_train)

In [None]:
# load dữ liệu toàn bộ
X_train, y_train = load_all(directory)
X_test, y_test = load_all(test_directory)
print(X_train.shape,X_test.shape)
num_train = y_train.shape[0]
num_val = y_test.shape[0]

In [None]:
# Model LSTM 4 lớp với số unit [44, 88, 44, 22], kết hợp với BatchNormalization và Dropout
from keras.models import Sequential
from keras.layers import LSTM, Dense,Dropout,BatchNormalization,Flatten
from keras.optimizers import Adam
from keras import regularizers
input_shape = (20,132)

model = Sequential()
model.add(LSTM(units = 44,
                activation="tanh",
                kernel_regularizer=regularizers.l2(1e-4),
                return_sequences=True,
                input_shape=input_shape))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(LSTM(units = 88,
                activation="tanh",
                kernel_regularizer=regularizers.l2(1e-4),
                return_sequences=True))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(LSTM(units = 44,
                activation="tanh",
                kernel_regularizer=regularizers.l2(1e-4),
                return_sequences=True))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(LSTM(units = 22,
                activation="tanh",
                kernel_regularizer=regularizers.l2(1e-4)))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(Flatten())
model.add(Dense(22,activation="softmax"))
model.build(input_shape=(None,20,132))
model.summary()

In [None]:
# Cài đặt checkpoint và lr-scheduler giảm dần learning_rate theo từng bước
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
checkpoint_callback = ModelCheckpoint(
    filepath="best_checkpoint_LSTM_4_44-88-44-22.keras",
    monitor="val_accuracy",
    save_weights_only=False,
    mode="max",
    save_best_only=True
)
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])
lr_max = 0.001
reduct_step = 1
def lr_sch(epoch):
    return lr_max*0.8**(epoch//reduct_step)
lr_schedule = LearningRateScheduler(lr_sch,verbose=1)

In [None]:
# Train và lưu lại model
history = model.fit(X_train,y_train,
          epochs=10,
          batch_size=128,
          validation_data = (X_test,y_test),
          callbacks=[checkpoint_callback,lr_schedule])
model.save("model_remake_LSTM_4_44-88-44-22.keras")

In [None]:
# Đánh giá cuối cùng dựa trên tập test, vẽ confusion matrix
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis = 1)
f1 = f1_score(y_test,y_pred,average="weighted")
acc = accuracy_score(y_test,y_pred)
print("F1 score: " + str(f1))
print("Accuracy_score: " + str(acc))

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=False, fmt="d", cmap="viridis")
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.show()