In [None]:
# ライブラリのインポート
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, classification_report
import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner
import sqlite3
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# パス設定
import sys
sys.path.append('../src')

from lstm_v2_trainer import LSTMv2Trainer
from lstm_v2_model import LSTMv2HybridModel

print("CMI評価指標対応 LSTM v2 ハイパーパラメータ最適化環境準備完了")


In [None]:
# ターゲットジェスチャーの定義（CMI コンペティション仕様）
TARGET_GESTURES = [
    'Above ear - pull hair',
    'Cheek - pinch skin', 
    'Eyebrow - pull hair',
    'Eyelash - pull hair',
    'Forehead - pull hairline',
    'Forehead - scratch',
    'Neck - pinch skin',
    'Neck - scratch'
]

NON_TARGET_GESTURES = [
    'Write name on leg',
    'Wave hello', 
    'Glasses on/off',
    'Text on phone',
    'Write name in air',
    'Feel around in tray and pull out an object',
    'Scratch knee/leg skin',
    'Pull air toward your face',
    'Drink from bottle/cup',
    'Pinch knee/leg skin'
]

print(f"ターゲットジェスチャー: {len(TARGET_GESTURES)}個")
print(f"非ターゲットジェスチャー: {len(NON_TARGET_GESTURES)}個")
print(f"総ジェスチャー数: {len(TARGET_GESTURES) + len(NON_TARGET_GESTURES)}個")


In [None]:
def calculate_cmi_score(y_true, y_pred, label_encoder):
    """
    CMI コンペティション評価指標の計算
    
    Parameters:
    -----------
    y_true : array-like
        真のラベル（エンコード済み）
    y_pred : array-like
        予測ラベル（エンコード済み）
    label_encoder : LabelEncoder
        ラベルエンコーダー
    
    Returns:
    --------
    tuple
        (CMI スコア, Binary F1, Macro F1)
    """
    try:
        # ラベルを元の文字列に変換
        y_true_str = label_encoder.inverse_transform(y_true)
        y_pred_str = label_encoder.inverse_transform(y_pred)
        
        # 1. Binary F1: Target vs Non-Target
        y_true_binary = np.array([1 if gesture in TARGET_GESTURES else 0 for gesture in y_true_str])
        y_pred_binary = np.array([1 if gesture in TARGET_GESTURES else 0 for gesture in y_pred_str])
        
        # Zero division回避
        if len(np.unique(y_true_binary)) == 1 or len(np.unique(y_pred_binary)) == 1:
            binary_f1 = 0.0
        else:
            binary_f1 = f1_score(y_true_binary, y_pred_binary, average='binary', zero_division='warn')
        
        # 2. Macro F1: 全ジェスチャーのマクロF1（非ターゲットは単一クラスに統合）
        y_true_macro = np.array([gesture if gesture in TARGET_GESTURES else 'non_target' for gesture in y_true_str])
        y_pred_macro = np.array([gesture if gesture in TARGET_GESTURES else 'non_target' for gesture in y_pred_str])
        
        macro_f1 = f1_score(y_true_macro, y_pred_macro, average='macro', zero_division='warn')
        
        # 3. 最終スコア = Binary F1 + Macro F1の平均
        cmi_score = (binary_f1 + macro_f1) / 2.0
        
        return cmi_score, binary_f1, macro_f1
        
    except Exception as e:
        print(f"CMI評価指標計算でエラー: {str(e)}")
        return 0.0, 0.0, 0.0

print("CMI評価指標計算関数の定義完了")


In [None]:
def objective(trial):
    """
    Optuna目的関数（CMI評価指標対応版）
    
    Parameters:
    -----------
    trial : optuna.Trial
        Optuna試行オブジェクト
    
    Returns:
    --------
    float
        目的関数値（CMIスコア）
    """
    try:
        print(f"\n=== Trial {trial.number} ===")
        
        # ハイパーパラメータの提案
        lstm_units_1 = trial.suggest_int('lstm_units_1', 32, 128, step=16)
        lstm_units_2 = trial.suggest_int('lstm_units_2', 16, 64, step=8)
        dense_units = trial.suggest_int('dense_units', 16, 64, step=8)
        demographics_dense_units = trial.suggest_int('demographics_dense_units', 8, 32, step=4)
        fusion_dense_units = trial.suggest_int('fusion_dense_units', 16, 48, step=8)
        dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5, step=0.1)
        dense_dropout_rate = trial.suggest_float('dense_dropout_rate', 0.1, 0.4, step=0.1)
        learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
        batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
        fusion_type = trial.suggest_categorical('fusion_type', ['concatenate', 'attention', 'gated'])
        
        # モデルパラメータの設定
        model_params = {
            'lstm_units_1': lstm_units_1,
            'lstm_units_2': lstm_units_2,
            'dense_units': dense_units,
            'demographics_dense_units': demographics_dense_units,
            'fusion_dense_units': fusion_dense_units,
            'dropout_rate': dropout_rate,
            'dense_dropout_rate': dense_dropout_rate,
            'learning_rate': learning_rate,
            'batch_size': batch_size,
            'epochs': 30,  # 実際の最適化用
            'patience': 10,
            'reduce_lr_patience': 5,
            'use_tqdm': False,
            'use_tensorboard': False,
            'fusion_type': fusion_type
        }
        
        # 学習データの読み込み
        trainer = LSTMv2Trainer(experiment_name="lstm_v2_hyperopt", window_config="w64_s16")
        data = trainer.load_preprocessed_data(use_optimized_demographics=True)
        
        if data is None:
            print("データの読み込みに失敗しました")
            return 0.0
        
        # モデル学習
        results = trainer.train_model(data, model_params, fusion_type)
        
        # 評価結果の取得
        eval_results = results.get('results', {}) if isinstance(results, dict) else {}
        
        # CMI スコアの計算
        if 'test_data' in results and 'predictions' in eval_results:
            # test_dataはタプル (X_sensor_test, X_demographics_test, y_test)
            test_data = results['test_data']
            if isinstance(test_data, tuple) and len(test_data) == 3:
                X_sensor_test, X_demographics_test, y_true = test_data
                y_pred = eval_results['predictions']
                label_encoder = data['label_encoder']
                
                cmi_score, binary_f1, macro_f1 = calculate_cmi_score(y_true, y_pred, label_encoder)
                
                print(f"CMI Score: {cmi_score:.4f}")
                print(f"Binary F1: {binary_f1:.4f}")
                print(f"Macro F1: {macro_f1:.4f}")
                
                # 詳細結果の表示
                test_accuracy = eval_results.get('test_accuracy', 0.0)
                test_loss = eval_results.get('test_loss', float('inf'))
                
                print(f"Test Accuracy: {test_accuracy:.4f}")
                print(f"Test Loss: {test_loss:.4f}")
                
                return cmi_score
            else:
                print("test_dataの構造が期待と異なります")
                return 0.0
        
        else:
            print("評価結果の取得に失敗しました")
            return 0.0
        
    except Exception as e:
        print(f"Trial {trial.number} でエラーが発生: {str(e)}")
        return 0.0

print("目的関数の定義完了")


In [None]:
# 実験設定
EXPERIMENT_NAME = "lstm_v2_cmi_optimization"
N_TRIALS = 50  # 実際の最適化用
DB_PATH = f"../output/experiments/{EXPERIMENT_NAME}/{EXPERIMENT_NAME}.db"

# ディレクトリ作成
import os
os.makedirs(f"../output/experiments/{EXPERIMENT_NAME}", exist_ok=True)

print(f"CMI評価指標対応 LSTM v2 ハイパーパラメータ最適化開始")
print(f"実験名: {EXPERIMENT_NAME}")
print(f"試行数: {N_TRIALS}")
print(f"データベース: {DB_PATH}")


In [None]:
# Optuna スタディの作成
study = optuna.create_study(
    direction='maximize',
    sampler=TPESampler(seed=42),
    pruner=MedianPruner(n_startup_trials=5, n_warmup_steps=10),
    storage=f'sqlite:///{DB_PATH}',
    study_name=EXPERIMENT_NAME,
    load_if_exists=True
)

print("Optuna スタディ作成完了")
print(f"方向: maximize (CMI スコアを最大化)")
print(f"サンプラー: TPESampler")
print(f"プルーナー: MedianPruner")


In [None]:
# 最適化実行
study.optimize(objective, n_trials=N_TRIALS)

# 結果の表示
print("\n" + "="*60)
print("最適化完了")
print("="*60)
print(f"最良試行: {study.best_trial.number}")
print(f"最良CMIスコア: {study.best_value:.4f}")
print(f"最良パラメータ:")
for key, value in study.best_params.items():
    print(f"  {key}: {value}")


In [None]:
# 結果の保存
try:
    # 最適化結果の確認
    print(f"\n=== 最適化結果の確認 ===")
    print(f"実行された試行数: {len(study.trials)}")
    
    # 成功した試行のみを取得
    completed_trials = [trial for trial in study.trials if trial.state == optuna.trial.TrialState.COMPLETE]
    failed_trials = [trial for trial in study.trials if trial.state == optuna.trial.TrialState.FAIL]
    
    print(f"成功した試行: {len(completed_trials)}")
    print(f"失敗した試行: {len(failed_trials)}")
    
    if completed_trials:
        # 最良の試行を取得
        best_trial = max(completed_trials, key=lambda t: t.value if t.value is not None else 0.0)
        best_score = best_trial.value if best_trial.value is not None else 0.0
        
        results_summary = {
            'experiment_name': EXPERIMENT_NAME,
            'n_trials': N_TRIALS,
            'completed_trials': len(completed_trials),
            'failed_trials': len(failed_trials),
            'best_trial': best_trial.number,
            'best_cmi_score': best_score,
            'best_params': best_trial.params,
            'timestamp': datetime.now().isoformat(),
            'all_trials': [
                {
                    'trial_number': trial.number,
                    'state': str(trial.state),
                    'value': trial.value,
                    'params': trial.params
                }
                for trial in study.trials
            ]
        }
        
        # 結果をJSONファイルに保存
        results_file = f"../output/experiments/{EXPERIMENT_NAME}/optimization_results.json"
        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(results_summary, f, indent=2, ensure_ascii=False)
        
        print(f"\n結果が保存されました: {results_file}")
        print(f"最良試行: {best_trial.number}")
        print(f"最良CMIスコア: {best_score:.4f}")
        print(f"最良パラメータ:")
        for key, value in best_trial.params.items():
            print(f"  {key}: {value}")
            
    else:
        print("成功した試行がありません。")
        
except Exception as e:
    print(f"結果保存中にエラーが発生: {str(e)}")
    
    # 最小限の結果を保存
    emergency_results = {
        'experiment_name': EXPERIMENT_NAME,
        'n_trials': N_TRIALS,
        'error': str(e),
        'timestamp': datetime.now().isoformat()
    }
    
    with open(f"../output/experiments/{EXPERIMENT_NAME}/emergency_results.json", 'w') as f:
        json.dump(emergency_results, f, indent=2)
    
    print("緊急結果ファイルを保存しました。")
