In [None]:
import os

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
print(gpus)

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from pathlib import Path
import pickle
import warnings
warnings.filterwarnings('ignore')

from config import ModelConfig as cf

print(f"TensorFlow 버전: {tf.__version__}")
print(f"GPU 사용 가능: {tf.config.list_physical_devices('GPU')}")

In [None]:
print(cf.DATA_ROOT)

In [None]:
class NormalizeSequenceLength:
    """
    시퀀스 길이를 정규화하는 클래스
    - 패딩: 짧은 시퀀스를 0으로 채움
    - 트리밍: 긴 시퀀스를 균등 샘플링
    """
    
    def pad_or_trim(self, seq: np.ndarray, target_len: int) -> np.ndarray:
        """
        시퀀스를 target_len 길이로 정규화
        
        Args:
            seq: 입력 시퀀스 (2D array)
            target_len: 목표 길이
            
        Returns:
            정규화된 시퀀스
        """
        try:
            # 입력 검증
            if seq is None or seq.size == 0:
                raise ValueError("입력 시퀀스가 비어있습니다.")
            
            if len(seq.shape) != 2:
                raise ValueError(f"시퀀스는 2차원 배열이어야 합니다. 현재 shape: {seq.shape}")
            
            t, F = seq.shape
            
            if target_len <= 0:
                raise ValueError(f"target_len은 양수여야 합니다. 현재 값: {target_len}")
            
            # 길이가 같으면 그대로 반환
            if t == target_len:
                return seq
            
            # 길이가 길면 샘플링
            if t > target_len:
                idx = np.linspace(0, t - 1, target_len).astype(int)
                return seq[idx]
            
            # 길이가 짧으면 패딩
            pad = np.zeros((target_len - t, F), dtype=seq.dtype)
            return np.vstack([seq, pad])
            
        except Exception as e:
            print(f"pad_or_trim 에러: {e}")
            raise

    def nearest_bucket_len(self, t: int, buckets=cf.BUCKETS) -> int:
        """
        현재 길이와 가장 가까운 버킷 길이를 반환
        
        Args:
            t: 현재 시퀀스 길이
            buckets: 사용 가능한 버킷 길이들
            
        Returns:
            가장 가까운 버킷 길이
        """
        try:
            if t <= 0:
                raise ValueError(f"시퀀스 길이는 양수여야 합니다. 현재 값: {t}")
            
            if not buckets or len(buckets) == 0:
                raise ValueError("버킷 리스트가 비어있습니다.")
            
            if any(b <= 0 for b in buckets):
                raise ValueError("모든 버킷 값은 양수여야 합니다.")
            
            return min(buckets, key=lambda b: abs(b - t))
            
        except Exception as e:
            print(f"nearest_bucket_len 에러: {e}")
            raise

# 테스트
normalizer = NormalizeSequenceLength()
test_seq = np.random.rand(15, 195)  # 15프레임 시퀀스
print(test_seq)
normalized = normalizer.pad_or_trim(test_seq, 60)
print(f"원본: {test_seq.shape} → 정규화: {normalized.shape}")

In [None]:
class SignLanguageModel:
    """
    수어 인식을 위한 CNN + LSTM 하이브리드 모델
    
    특징:
    - TimeDistributed CNN으로 프레임별 특징 추출
    - LSTM으로 시계열 패턴 학습
    - 버킷 기반 시퀀스 길이 정규화
    """
    
    def __init__(self):
        """모델 초기화"""
        self.num_classes = cf.NUM_CLASSES
        self.sequence_length = cf.SEQUENCE_LENGTH
        self.feature_dim = cf.FEATURE_DIM
        self.model = None
        self.label_encoder = LabelEncoder()
        self.normalizer = NormalizeSequenceLength()
        
        print(f"모델 설정:")
        print(f"  - 시퀀스 길이: {self.sequence_length}")
        print(f"  - 특성 차원: {self.feature_dim}")
        print(f"  - 클래스 수: {self.num_classes}")
        print(f"  - 버킷: {cf.BUCKETS}")

In [None]:
def normalize_sequence_length(self, sequence: np.ndarray) -> np.ndarray:
    """
    시퀀스 길이를 고정 길이로 정규화
    
    Args:
        sequence: 입력 시퀀스 (2D array)
        
    Returns:
        정규화된 시퀀스 (SEQUENCE_LENGTH 길이)
    """
    try:
        # 입력 검증
        if sequence is None or sequence.size == 0:
            raise ValueError("입력 시퀀스가 비어있습니다.")
        
        if len(sequence.shape) != 2:
            raise ValueError(f"시퀀스는 2차원 배열이어야 합니다. 현재 shape: {sequence.shape}")
        
        # NaN 값 처리
        sequence = np.nan_to_num(sequence, copy=False).astype(np.float32)
        
        # 고정 길이로 정규화
        seq = self.normalizer.pad_or_trim(sequence, cf.SEQUENCE_LENGTH)
        
        return seq
        
    except Exception as e:
        print(f"normalize_sequence_length 에러: {e}")
        raise

# SignLanguageModel 클래스에 메서드 추가
SignLanguageModel.normalize_sequence_length = normalize_sequence_length

In [None]:
# 데이터 증강

import numpy as np
import random

class SignAugmentor:
    def __init__(self, drop_prob=0.1, noise_std=0.01, scale_range=(0.9, 1.1), shift_range=(-2, 2)):
        self.drop_prob = drop_prob
        self.noise_std = noise_std
        self.scale_range = scale_range
        self.shift_range = shift_range

    def temporal_dropout(self, sequence):
        mask = np.random.rand(len(sequence)) > self.drop_prob
        return sequence[mask] if mask.sum() > 0 else sequence

    def temporal_shift(self, sequence):
        shift = random.randint(*self.shift_range)
        return np.roll(sequence, shift, axis=0)

    def scale(self, sequence):
        factor = random.uniform(*self.scale_range)
        return sequence * factor

    def add_noise(self, sequence):
        noise = np.random.normal(0, self.noise_std, sequence.shape)
        return sequence + noise

    def mirror(self, sequence):
        sequence[..., 0] *= -1
        return sequence

    def augment(self, sequence):
        if random.random() < 0.5:
            sequence = self.temporal_dropout(sequence)
        if random.random() < 0.5:
            sequence = self.temporal_shift(sequence)
        if random.random() < 0.5:
            sequence = self.scale(sequence)
        if random.random() < 0.5:
            sequence = self.add_noise(sequence)
        if random.random() < 0.3:
            sequence = self.mirror(sequence)
        return sequence


augmentor = SignAugmentor()

In [None]:
# @title npz 파일을 train/test로 나누기
def load_processed_data(self, data_path, validation_person_ids=None, num_samples=None, random_state=42):
    """
    전처리된 landmark 데이터 로드 및 person_id 기반 분할

    Args:
        data_path: 데이터 경로
        validation_person_ids: 검증 세트로 사용할 person_id 리스트 (None이면 랜덤 분할)
        num_samples: 로드할 샘플 수 (None이면 전체 로드)
        random_state: 랜덤 시드 (재현성을 위해, validation_person_ids가 None일 때만 사용)

    Returns:
        X_train, X_val, y_train, y_val: 분할된 학습/검증 데이터 (validation_person_ids가 None이면 X, y 전체 반환)
        label_encoder: 학습된 LabelEncoder
    """
    try:
        data_path = Path(data_path)

        # 경로 검증
        if not data_path.exists():
            raise FileNotFoundError(f"데이터 경로가 존재하지 않습니다: {data_path}")

        metadata_path = cf.DATA_ROOT / "dataset_metadata.csv"
        if not metadata_path.exists():
            raise FileNotFoundError(f"메타데이터 파일이 존재하지 않습니다: {metadata_path}")

        # 메타데이터 로드
        metadata = pd.read_csv(metadata_path)

        if metadata.empty:
            raise ValueError("메타데이터가 비어있습니다.")

        # 컬럼 검증
        required_columns = ['landmarks_file', 'word_gloss', 'word_id', 'person_id'] # person_id, word_id 추가
        missing_columns = [col for col in required_columns if col not in metadata.columns]
        if missing_columns:
            raise ValueError(f"메타데이터에 필요한 컬럼이 없습니다: {missing_columns}. person_id 컬럼이 필요합니다.")

        # 지정된 수의 샘플만 로드 (랜덤 선택)
        if num_samples is not None and num_samples > 0:
            if num_samples > len(metadata):
                print(f"경고: 요청된 샘플 수({num_samples})가 전체 데이터 수({len(metadata)})보다 많습니다. 전체 데이터를 로드합니다.")
            else:
                # 메타데이터를 랜덤으로 섞고 지정된 수만큼 선택
                metadata = metadata.sample(n=num_samples, random_state=random_state).reset_index(drop=True)
                print(f"{num_samples}개의 랜덤 샘플만 로드합니다.")


        # 데이터 로딩
        X, y, person_ids = [], [], [] # person_ids 리스트 추가
        failed_files = []

        print(f"총 {len(metadata)}개 파일 처리 중...")

        for idx, row in metadata.iterrows():
            try:
                landmarks_file = data_path / row['landmarks_file']
                if landmarks_file.exists():
                    landmarks_group = np.load(landmarks_file, allow_pickle=False)    # NPZ
                    # print(f"프레임 그룹 개수: {len(landmarks_group['x'])}")

                    # 프레임 그룹에서 프레임 가져오기
                    if hasattr(landmarks_group, "files"):
                        if "x" not in landmarks_group.files:
                            raise ValueError(f"'x'키가 없습니다: {landmarks_file}")
                        landmark = landmarks_group["x"].astype(np.float32)

                    # 정규화
                    landmark = self.normalize_sequence_length(landmark)

                    X.append(landmark)

                    # 라벨 키를 word_id 또는 word_id + word_gloss로 구성
                    label_key = row['word_id']
                    y.append(label_key)
                    person_ids.append(row['person_id']) # person_id 추가

                else:
                    failed_files.append(str(landmarks_file))
                    print(f"경고: 파일이 존재하지 않습니다: {landmarks_file}")

            except Exception as e:
                failed_files.append(str(landmarks_file))
                print(f"경고: 파일 로딩 실패 {landmarks_file}: {e}")
                continue

        if len(X) == 0:
            raise ValueError("로드된 데이터가 없습니다. 모든 파일 로딩에 실패했습니다.")

        # 데이터 검증
        # 리스트 => 넘파이 배열
        X = np.asarray(X, dtype=np.float32)
        person_ids = np.asarray(person_ids) # person_ids도 넘파이 배열로 변환

        # 형태/차원 검증
        if X.ndim != 3:
            raise ValueError(f"입력 X차원 오류: 기대 (N, L, F) 3차원, 실제 {X.shape}")

        sample_shape = X[0].shape    # (L, feature_fim)
        if sample_shape[1] != self.feature_dim:
            raise ValueError(f"특성 차원 불일치: 예상={self.feature_dim}, 실제={sample_shape[1]}")

        if len(failed_files) > 0:
            print(f"총 {len(failed_files)}개 파일 로딩에 실패했습니다.")

        print(f"성공적으로 로드된 데이터: {len(X)}개")
        print(f"데이터 형태: {np.array(X).shape}")

        # 라벨 인코딩 : 'label_key'로 수행
        self.label_encoder.fit(y) # 전체 라벨에 대해 fit
        y_encoded = self.label_encoder.transform(y)
        self.num_classes = len(self.label_encoder.classes_)    # 동적 결정
        print(f"NUM_CLASSES: {self.num_classes}")
        y_categorical = keras.utils.to_categorical(y_encoded, num_classes=self.num_classes)

        # person_id 기반 데이터 분할
        if validation_person_ids is not None:
            print(f"Person ID {validation_person_ids}를 사용하여 데이터 분할 중...")
            train_indices = np.isin(person_ids, validation_person_ids, invert=True)
            val_indices = np.isin(person_ids, validation_person_ids)

            X_train, y_train = X[train_indices], y_categorical[train_indices]
            X_val, y_val = X[val_indices], y_categorical[val_indices]

            print(f"학습 데이터 (Person ID 제외): {len(X_train)}개")
            print(f"검증 데이터 (Person ID {validation_person_ids}): {len(X_val)}개")

            if len(X_train) == 0 or len(X_val) == 0:
                 print("경고: person_id 기반 분할 결과 학습 데이터 또는 검증 데이터가 비어있습니다. 분할 설정을 확인하세요.")


            return X_train, X_val, y_train, y_val, self.label_encoder
        else:
            print("validation_person_ids가 지정되지 않아 전체 데이터를 반환합니다.")
            return X, y_categorical, self.label_encoder # 랜덤 분할 대신 전체 데이터와 인코더 반환

    except Exception as e:
        print(f"load_processed_data 에러: {e}")
        raise

# SignLanguageModel 클래스에 메서드 추가
SignLanguageModel.load_processed_data = load_processed_data

In [None]:
def train(self, data_path, validation_split=0.2, num_samples=None, validation_person_ids=None):
    """
    모델 학습

    Args:
        data_path: 학습 데이터 경로
        validation_split: 검증 데이터 비율 (validation_person_ids가 None일 때만 사용)
        num_samples: 로드할 샘플 수
        validation_person_ids: 검증 세트로 사용할 person_id 리스트 (이 값이 있으면 validation_split 무시)

    Returns:
        학습 히스토리
    """
    try:
        # 입력 검증
        if validation_person_ids is None and not (0 < validation_split < 1):
            raise ValueError(f"validation_split은 0과 1 사이의 값이어야 합니다. 현재 값: {validation_split}")

        print("=== 모델 학습 시작 ===")

        # 데이터 로딩 및 분할
        print("데이터 로딩 중...")
        if validation_person_ids is not None:
            # person_id 기반 분할
            X_train, X_val, y_train, y_val, self.label_encoder = self.load_processed_data(
                data_path, validation_person_ids=validation_person_ids, num_samples=num_samples
            )
        else:
            # 랜덤 분할 (기존 로직)
            X, y, self.label_encoder = self.load_processed_data(data_path, num_samples=num_samples)
            if len(X) < 10:
                raise ValueError(f"학습 데이터가 너무 적습니다. 최소 10개 이상 필요합니다. 현재: {len(X)}개")

            print("데이터 분할 중...")
            try:
                X_train, X_val, y_train, y_val = train_test_split(
                    X, y, test_size=validation_split, random_state=42, stratify=y.argmax(axis=1)
                )
            except ValueError as e:
                print(f"stratify 실패, stratify 없이 분할: {e}")
                X_train, X_val, y_train, y_val = train_test_split(
                    X, y, test_size=validation_split, random_state=42
                )

        if len(X_train) == 0 or len(X_val) == 0:
             print("경고: 데이터 분할 결과 학습 데이터 또는 검증 데이터가 비어있습니다. 데이터 또는 분할 설정을 확인하세요.")
             return None # 학습 데이터가 없으면 학습 진행하지 않음


        print(f"학습 데이터: {len(X_train)}개, 검증 데이터: {len(X_val)}개")

        # 모델 구성
        if self.model is None:
            print("모델 구성 중...")
            self.build_model()
        else:
            print("기존 모델 사용")

        # 모델 저장 경로 설정
        model_save_path = cf.MODEL_SAVE_PATH
        if not model_save_path.exists():
            model_save_path.mkdir(parents=True, exist_ok=True)
            print(f"모델 저장 경로 생성: {model_save_path}")

        # 콜백 설정
        callbacks = [
            keras.callbacks.EarlyStopping(
                monitor='val_accuracy',
                patience=cf.PATIENCE,
                restore_best_weights=True,
                verbose=1
            ),
            # 7회 동안 성능 개선이 없으면 학습률을 50% 씩 낮춰서 실행.
            keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=7,
                min_lr=1e-7,
                verbose=1
            ),
            keras.callbacks.ModelCheckpoint(
                model_save_path / "best_model.h5",
                monitor='val_accuracy',
                save_best_only=True,
                verbose=1
            )
        ]

        # Mixed Precision 설정 (GPU 메모리 최적화)
        policy = keras.mixed_precision.Policy('mixed_float16')
        keras.mixed_precision.set_global_policy(policy)
        print("Mixed Precision 활성화")

        augmentor = SignAugmentor()

        def augment_fn(x, y):
            def aug(z):
                z = z.numpy()
                z = augmentor.augment(z)
                return self.normalize_sequence_length(z).astype(np.float32)
                
            x = tf.py_function(aug, [x], Tout=tf.float32)
            # shape 강제 지정: (sequence_length, feature_dim)
            x.set_shape((self.sequence_length, self.feature_dim))
            return x, y

        train_dataset = (tf.data.Dataset.from_tensor_slices((X_train, y_train))
                         .map(augment_fn, num_parallel_calls=tf.data.AUTOTUNE)
                         .shuffle(1000)
                         .batch(cf.BATCH_SIZE)
                         .prefetch(tf.data.AUTOTUNE))

        val_dataset = (tf.data.Dataset.from_tensor_slices((X_val, y_val))
                       .batch(cf.BATCH_SIZE)
                       .prefetch(tf.data.AUTOTUNE))

        # 학습 실행
        print("모델 학습 시작...")
        history = self.model.fit(
            train_dataset,
            validation_data=val_dataset,
            epochs=cf.EPOCHS,
            batch_size=cf.BATCH_SIZE,
            callbacks=callbacks,
            verbose=1
        )

        # 라벨 인코더 저장
        print("라벨 인코더 저장 중...")
        with open(model_save_path / 'label_encoder.pkl', 'wb') as f:
            pickle.dump(self.label_encoder, f)

        print("=== 학습 완료 ===")
        return history

    except Exception as e:
        print(f"train 에러: {e}")
        raise

# SignLanguageModel 클래스에 메서드 추가
SignLanguageModel.train = train

In [None]:
def build_model(self):
    """
    CNN(1D + TimeDistributed) + LSTM + MultiHeadAttention 하이브리드 모델
    구조:
    1. TimeDistributed Conv1D (프레임별 특징 추출)
    2. BiLSTM (시계열 패턴 학습)
    3. MultiHeadAttention (중요 프레임 강조)
    4. Dense (분류)
    """
    try:
        if self.sequence_length <= 0 or self.feature_dim <= 0:
            raise ValueError(
                f"잘못된 입력 크기: sequence_length={self.sequence_length}, feature_dim={self.feature_dim}"
            )

        print("모델 구성 중...")

        # 입력: (batch, time, feature_dim, 1)
        inputs = keras.Input(shape=(self.sequence_length, self.feature_dim, 1))

        # --- Conv 블록 1 ---
        x = layers.TimeDistributed(
            layers.Conv1D(16, 3, activation="relu", padding="same")
        )(inputs)
        x = layers.TimeDistributed(layers.BatchNormalization())(x)
        x = layers.TimeDistributed(layers.MaxPooling1D(2))(x)
        x = layers.TimeDistributed(layers.Dropout(cf.DROPOUT_RATE))(x)

        # --- Conv 블록 2 ---
        x = layers.TimeDistributed(
            layers.Conv1D(32, 3, activation="relu", padding="same")
        )(x)
        x = layers.TimeDistributed(layers.BatchNormalization())(x)
        x = layers.TimeDistributed(layers.MaxPooling1D(2))(x)
        x = layers.TimeDistributed(layers.Dropout(cf.DROPOUT_RATE))(x)

        # --- Conv 블록 3 ---
        x = layers.TimeDistributed(
            layers.Conv1D(64, 3, activation="relu", padding="same")
        )(x)
        x = layers.TimeDistributed(layers.BatchNormalization())(x)
        x = layers.TimeDistributed(layers.MaxPooling1D(2))(x)
        x = layers.TimeDistributed(layers.Dropout(cf.DROPOUT_RATE))(x)

        x = layers.TimeDistributed(layers.Flatten())(x)

        # --- LSTM 블록 ---
        x = layers.Bidirectional(layers.LSTM(256, return_sequences=True, dropout=cf.DROPOUT_RATE))(x)
        x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=cf.DROPOUT_RATE))(x)

        print("LSTM 출력 shape:", x.shape)

        # --- Attention ---
        attention = layers.MultiHeadAttention(num_heads=4, key_dim=32)(x, x)
        x = layers.Add()([x, attention])
        x = layers.GlobalAveragePooling1D()(x)

        # --- 분류기 ---
        x = layers.Dropout(0.3)(x)
        x = layers.Dense(256, activation="relu")(x)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.5)(x)

        outputs = layers.Dense(
            self.num_classes, activation="softmax", dtype="float32"
        )(x)

        self.model = keras.Model(inputs, outputs)

        # --- 컴파일 ---
        loss = keras.losses.CategoricalCrossentropy(label_smoothing=0.1)
        self.model.compile(
            optimizer=keras.optimizers.Adam(
                learning_rate=cf.LEARNING_RATE, clipnorm=1.0
            ),
            loss=loss,
            metrics=["accuracy", keras.metrics.TopKCategoricalAccuracy(k=5, name="top5")],
        )

        print("모델 구성 완료!")
        return self.model

    except Exception as e:
        print(f"build_model 에러: {e}")
        raise


# SignLanguageModel 클래스에 메서드 추가
SignLanguageModel.build_model = build_model


In [None]:
# def build_model(self):
#     try:
#         if self.sequence_length <= 0 or self.feature_dim <= 0:
#             raise ValueError(
#                 f"잘못된 입력 크기: sequence_length={self.sequence_length}, feature_dim={self.feature_dim}"
#             )

#         print("경량화 모델 구성 중...")

#         # 입력: (batch, time, feature_dim)
#         inputs = keras.Input(shape=(self.sequence_length, self.feature_dim))

#         # --- Conv 블록 ---
#         x = layers.Conv1D(16, 3, activation="relu", padding="same")(inputs)
#         x = layers.BatchNormalization()(x)
#         x = layers.MaxPooling1D(2)(x)
#         x = layers.Dropout(cf.DROPOUT_RATE)(x)
    
#         x = layers.Conv1D(32, 3, activation="relu", padding="same")(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.MaxPooling1D(2)(x)
#         x = layers.Dropout(cf.DROPOUT_RATE)(x)

#         x = layers.Conv1D(64, 3, activation="relu", padding="same")(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.MaxPooling1D(2)(x)
#         x = layers.Dropout(cf.DROPOUT_RATE)(x)

#         x = layers.Conv1D(128, 3, activation="relu", padding="same")(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.MaxPooling1D(2)(x)
#         x = layers.Dropout(cf.DROPOUT_RATE)(x)

#         x = layers.TimeDistributed(layers.Flatten())(x)

#         # --- GRU 블록 ---
#         x = layers.Bidirectional(layers.GRU(256, return_sequences=True, dropout=cf.DROPOUT_RATE))(x)
#         x = layers.Bidirectional(layers.GRU(128, return_sequences=True, dropout=cf.DROPOUT_RATE))(x)
#         x = layers.Bidirectional(layers.GRU(64, return_sequences=True, dropout=cf.DROPOUT_RATE))(x)

#         # --- Attention ---
#         attention = layers.MultiHeadAttention(num_heads=4, key_dim=32)(x, x)
#         x = layers.Add()([x, attention])  
#         x = layers.GlobalAveragePooling1D()(x)

#         # --- 분류기 ---
#         x = layers.Dropout(0.3)(x)
#         x = layers.Dense(128, activation="relu")(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.Dropout(0.5)(x)

#         outputs = layers.Dense(
#             self.num_classes, activation="softmax", dtype="float32"
#         )(x)

#         self.model = keras.Model(inputs, outputs)

#         # --- 컴파일 ---
#         loss = keras.losses.CategoricalCrossentropy(label_smoothing=0.1)
#         self.model.compile(
#             optimizer=keras.optimizers.Adam(
#                 learning_rate=cf.LEARNING_RATE, clipnorm=1.0
#             ),
#             loss=loss,
#             metrics=["accuracy", keras.metrics.TopKCategoricalAccuracy(k=5, name="top5")],
#         )

#         print("경량화 모델 구성 완료!")
#         return self.model

#     except Exception as e:
#         print(f"build_model 에러: {e}")
#         raise

# SignLanguageModel.build_model = build_model


In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
    """
    학습/검증 Accuracy & Loss 시각화
    """
    acc = history.history["accuracy"]
    val_acc = history.history["val_accuracy"]
    loss = history.history["loss"]
    val_loss = history.history["val_loss"]

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(acc, label="Train Acc")
    plt.plot(val_acc, label="Val Acc")
    plt.title("Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(loss, label="Train Loss")
    plt.plot(val_loss, label="Val Loss")
    plt.title("Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()

    plt.show()

In [None]:
# 데이터 증강

# 모델 학습

In [None]:
# SignLanguageModel 클래스 인스턴스 생성
model = SignLanguageModel()

In [None]:
history = model.train(data_path=cf.DATA_ROOT, validation_person_ids=[1, 3])

- epoch 200
    

In [None]:
# 모델 학습이 끝났는데 추가 학습을 하고 싶을 경우 아래의 2 블록을 주석 풀고 실행

In [None]:
# from tensorflow import keras
# import pickle
# from pathlib import Path

# model_instance = SignLanguageModel(
# )

# model_path = Path("BiLSTM_MHAttention_model") / "best_model.h5"
# model_instance.model = keras.models.load_model(model_path)

# label_encoder_path = model_path.parent / "label_encoder.pkl"
# with open(label_encoder_path, "rb") as f:
#     model_instance.label_encoder = pickle.load(f)

# # 모델 불러온 뒤 반드시 컴파일 다시 실행
# model_instance.model.compile(
#     optimizer=keras.optimizers.Adam(learning_rate=cf.LEARNING_RATE, clipnorm=1.0),
#     loss=keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
#     metrics=['accuracy', keras.metrics.TopKCategoricalAccuracy(k=5, name="top5")]
# )


In [None]:
# history = model_instance.train(
#     data_path=cf.DATA_ROOT", validation_person_ids=[1, 3]            # 데이터 샘플 제한 지정 가능
# )

In [None]:
plot_history(history)

## 평가

- 수정중

In [None]:
# @title 모델 로드 및 평가

def evaluate_best_model(model_instance, data_path, num_samples=None):
    """
    저장된 best 모델을 로드하고 테스트 데이터셋으로 평가

    Args:
        model_instance: SignLanguageModel 클래스 인스턴스
        data_path: 테스트 데이터 경로
        num_samples: 로드할 샘플 수 (None이면 전체 로드)

    Returns:
        테스트 데이터 평가 결과 (딕셔너리 형태)
    """
    try:
        print("=== 테스트 데이터 평가 시작 ===")

        # 테스트 데이터 로딩
        print(f"테스트 데이터 로딩 중: {data_path}")
        X_test, y_test = model_instance.load_processed_data(data_path, num_samples)

        if len(X_test) == 0:
            print("테스트 데이터가 로드되지 않았습니다.")
            return None

        print(f"테스트 데이터: {len(X_test)}개")

        # 저장된 best 모델 로드
        model_save_path = cf.MODEL_SAVE_PATH / "best_model_2_batch32_epoch293.h5"
        if not model_save_path.exists():
            raise FileNotFoundError(f"저장된 best 모델이 없습니다: {model_save_path}")

        print(f"모델 로딩 중: {model_save_path}")
        loaded_model = keras.models.load_model(model_save_path)

        # 모델 평가
        print("모델 평가 중...")
        loss, accuracy, top5_accuracy = loaded_model.evaluate(X_test, y_test, verbose=0)

        test_results = {
            'loss': loss,
            'accuracy': accuracy,
            'top5_accuracy': top5_accuracy
        }
        print(f"테스트 결과 - Loss: {loss:.4f}, Accuracy: {accuracy:.4f}, Top-5 Accuracy: {top5_accuracy:.4f}")
        print("=== 테스트 데이터 평가 완료 ===")

        return test_results

    except Exception as e:
        print(f"evaluate_best_model 에러: {e}")
        raise

In [None]:
# 실제 테스트 데이터 경로로 수정하세요.
test_data_directory = cf.DATA_ROOT # 예시: 학습 데이터와 동일한 경로 사용 (실제와 다를 수 있음)

# SignLanguageModel 클래스 인스턴스가 필요합니다. 이미 생성된 'model' 인스턴스를 사용합니다.
if 'model' in locals():
    test_results = evaluate_best_model(model, test_data_directory, num_samples=10)
else:
    print("SignLanguageModel 인스턴스가 생성되지 않았습니다. 'model = SignLanguageModel()' 코드를 실행하세요.")

if test_results:
    print("\n테스트 평가 최종 결과:")
    print(f"  Loss: {test_results['loss']:.4f}")
    print(f"  Accuracy: {test_results['accuracy']:.4f}")
    print(f"  Top-5 Accuracy: {test_results['top5_accuracy']:.4f}")

In [None]:
%pip list

In [None]:
%pip freeze > req.txt