In [1]:
import pandas as pd
import numpy as np
from hmmlearn import hmm
import joblib 
import os
from typing import Dict, List, Tuple, Any

# HMMの隠れ状態（感情ラベル）：4つ
EMOTIONAL_STATES = [
    '感覚運動的興奮', 
    '難解・頭脳型', 
    '和みと癒し', 
    '設定状況の魅力' 
]
N_STATES = len(EMOTIONAL_STATES)
N_FEATURES = 5 # 特徴量の次元数
MODEL_FILENAME = 'hmm_emotion_model_4states.pkl'

In [2]:
# --- 2.1 キーログからの特徴量抽出関数 ---
def extract_features_from_keylog(log_data: pd.DataFrame, time_window_sec: float = 30.0) -> Tuple[np.ndarray, np.ndarray]:
    """
    キーログデータ（CSV読み込み後）を時間窓で区切り、HMMの観測特徴量を抽出します。
    5つの特徴量: [持続時間平均, 遅延時間平均, APM, 持続時間分散, 停止時間割合]
    """
    
    # 列名の統一と型変換
    log_data.rename(columns={'経過時間(s) (セッション開始からの時間)': 'elapsed_time',
                             '持続時間(s)': 'duration',
                             '遅延時間(s)': 'delay'}, inplace=True)
    
    try:
        log_data['elapsed_time'] = log_data['elapsed_time'].astype(float)
        log_data['duration'] = log_data['duration'].astype(float)
        log_data['delay'] = log_data['delay'].astype(float)
    except Exception as e:
        print(f"データの型変換エラー: {e}")
        return np.array([]), np.array([])
    
    max_time = log_data['elapsed_time'].max()
    segments = []
    
    start_time = 0.0
    while start_time < max_time + time_window_sec:
        end_time = start_time + time_window_sec
        window_data = log_data[(log_data['elapsed_time'] >= start_time) & (log_data['elapsed_time'] < end_time)]
        
        if len(window_data) == 0:
            feature_vector = [0.0, time_window_sec, 0.0, 0.0, 1.0] 
        else:
            duration_mean = window_data['duration'].mean()
            delay_mean = window_data['delay'].mean()
            apm = len(window_data) / time_window_sec * 60
            duration_var = window_data['duration'].var()
            if np.isnan(duration_var): duration_var = 0.0
            total_active_duration = window_data['duration'].sum()
            stop_time_ratio = (time_window_sec - total_active_duration) / time_window_sec
            stop_time_ratio = max(0.0, min(1.0, stop_time_ratio))
            
            feature_vector = [duration_mean, delay_mean, apm, duration_var, stop_time_ratio]
            
        segments.append(feature_vector)
        start_time = end_time

    X = np.array(segments)
    lengths = np.array([len(X)])
    
    return X, lengths

# --- 2.2 HMM学習/分類クラス ---
class HMMGameEmotionClassifier:
    """キーログ特徴量から4つのゲーム感情ラベルを推定するHMM分類器"""
    
    def __init__(self, n_components: int = N_STATES, n_features: int = N_FEATURES):
        self.model = hmm.GaussianHMM(
            n_components=n_components, 
            covariance_type="diag", 
            n_iter=100,
            init_params="stmc",
            transmat_prior=1.0,
            startprob_prior=1.0
        )
        self.state_names = EMOTIONAL_STATES

    def train(self, X: np.ndarray, lengths: np.ndarray):
        """HMMモデルを訓練します。"""
        print(f"HMMモデルを訓練中... (隠れ状態数: {self.model.n_components})")
        if X.shape[1] != N_FEATURES:
            print(f"❌ エラー: 訓練データの特徴量次元が一致しません ({X.shape[1]} != {N_FEATURES})")
            return
        try:
            self.model.fit(X, lengths)
            print("✅ HMMモデルの訓練が完了しました。")
        except Exception as e:
            print(f"❌ HMMの訓練中にエラーが発生しました: {e}")

    def predict_emotion_sequence(self, X: np.ndarray) -> List[str]:
        """観測データから、最も適合する感情ラベルの時系列を推定します。"""
        if self.model.n_iter == 0:
             print("警告: モデルが訓練されていません。")
             return []
        logprob, state_sequence = self.model.decode(X, algorithm="viterbi")
        return [self.state_names[state] for state in state_sequence]

    def save_model(self, filename: str = MODEL_FILENAME):
        """訓練済みモデルを保存します。"""
        joblib.dump(self.model, filename)
        joblib.dump(self.state_names, filename.replace('.pkl', '_states.pkl'))
        print(f"✅ モデルと状態名を '{filename}' に保存しました。")

    @classmethod
    def load_model(cls, filename: str = MODEL_FILENAME):
        """保存されたモデルを読み込みます。"""
        if not os.path.exists(filename):
            raise FileNotFoundError(f"モデルファイル '{filename}' が見つかりません。")
        instance = cls()
        instance.model = joblib.load(filename)
        instance.state_names = joblib.load(filename.replace('.pkl', '_states.pkl'))
        print(f"✅ モデルを '{filename}' から正常に読み込みました。")
        return instance

### ダミーデータバージョン

### 訓練データバージョン

In [3]:
# --- 3.1: 訓練データの準備バージョン ---

# ★★★ 訓練データCSVのパスを指定してください ★★★
HMM_TRAIN_DATA_PATH = 'kiyotaANOTEr.csv' 

# ★★★ アノテーションとセッションIDの列名を指定してください ★★★
EMOTION_COL = 'True_Emotion' 
SESSION_ID_COL = 'Session_ID' 

print(f"--- 訓練データ準備中: {HMM_TRAIN_DATA_PATH}を読み込み ---")

try:
    df_train = pd.read_csv(HMM_TRAIN_DATA_PATH)
except FileNotFoundError:
    print(f"❌ エラー: 訓練データファイル '{HMM_TRAIN_DATA_PATH}' が見つかりません。")
    # 実行を中断
    # exit() 

    
    
# 特徴量となる列名を定義
# 感情ラベル列とセッションID列以外の全ての列を特徴量と見なします。
feature_columns = [col for col in df_train.columns 
                   if col not in [EMOTION_COL, SESSION_ID_COL]]

# 感情ラベルの順序をHMMの状態順に合わせるためのマッピング
state_to_index = {name: i for i, name in enumerate(EMOTIONAL_STATES)}

# --- データの前処理と結合 ---

# セッションIDでデータをグループ化
grouped = df_train.groupby(SESSION_ID_COL)

X_train_list = []
lengths_train_list = []

for session_id, session_df in grouped:
    # 特徴量データ (X) を抽出
    X_session = session_df[feature_columns].values
    
    # 感情ラベル (教師信号) を抽出 (今回は教師信号はHMMの構造学習には不要だが、検証用に重要)
    # y_session = session_df[EMOTION_COL].map(state_to_index).values 

    X_train_list.append(X_session)
    lengths_train_list.append(len(X_session))

X_train = np.vstack(X_train_list)
lengths_train = np.array(lengths_train_list)

print(f"✅ データ結合完了。総セグメント数: {len(X_train)}、総セッション数: {len(lengths_train)}")

# --- 3.2: モデルの訓練と保存 ---

if X_train.size == 0:
    print("エラー: 訓練データが空です。処理を中止します。")
else:
    # 1. モデルの訓練と保存
    classifier = HMMGameEmotionClassifier()
    classifier.train(X_train, lengths_train)
    
    MODEL_FILENAME = 'kiyota.pkl'
    classifier.save_model(MODEL_FILENAME)
    
    print(f"\n✅ モデルの訓練と保存が完了しました。推定の準備ができました。")

Fitting a model with 55 free scalar parameters with only 30 data points will result in a degenerate solution.


--- 訓練データ準備中: kiyotaANOTEr.csvを読み込み ---
✅ データ結合完了。総セグメント数: 6、総セッション数: 6
HMMモデルを訓練中... (隠れ状態数: 4)


Some rows of transmat_ have zero sum because no transition from the state was ever observed.
Model is not converging.  Current: 14.836418718881328 is not greater than 17.02512101363289. Delta is -2.188702294751563


✅ HMMモデルの訓練が完了しました。
✅ モデルと状態名を 'kiyota.pkl' に保存しました。

✅ モデルの訓練と保存が完了しました。推定の準備ができました。


In [4]:
# --- 4.1: 推定データの指定 ---
# ★★★ ここを推定したいキーログCSVのパスに置き換えてください ★★★
TEST_CSV_PATH = '/Users/sakumasin/Documents/GitHub/kenkyu/mizuki2025/keylogmizuki/sakuma.csv' 

# 【注意】TEST_CSV_PATHのデータは、訓練データと同じ列名(持続時間(s)など)が必要です。

# --- 4.2: モデルの読み込み ---
try:
    loaded_classifier = HMMGameEmotionClassifier.load_model(MODEL_FILENAME)
except FileNotFoundError as e:
    print(f"❌ エラー: モデルファイルが見つかりません。ブロック3を実行してモデルを訓練・保存してください。")
    # ここで実行を中断
    # raise e

# --- 4.3: 特徴量の抽出 ---
print(f"\n--- 推定開始: ファイル '{TEST_CSV_PATH}' の特徴量を抽出中 ---")

try:
    df_test = pd.read_csv(TEST_CSV_PATH)
except FileNotFoundError:
    print(f"❌ エラー: ファイル '{TEST_CSV_PATH}' が見つかりません。")
    df_test = pd.DataFrame()

if not df_test.empty:
    X_test, _ = extract_features_from_keylog(df_test, time_window_sec=30.0)
else:
    X_test = np.array([])

if X_test.size == 0:
    print("推定をスキップ: テストデータの特徴量抽出に失敗しました。")
else:
    # --- 4.4: 感情の推定と結果表示 ---
    estimated_emotions = loaded_classifier.predict_emotion_sequence(X_test)
    
    print("\n--- 最終的な感情推定結果（30秒間隔） ---")
    for i, emotion in enumerate(estimated_emotions):
        time_start = i * 30
        time_end = (i + 1) * 30
        print(f"時間窓 {time_start}-{time_end}秒: {emotion}")

✅ モデルを 'kiyota.pkl' から正常に読み込みました。

--- 推定開始: ファイル '/Users/sakumasin/Documents/GitHub/kenkyu/mizuki2025/keylogmizuki/sakuma.csv' の特徴量を抽出中 ---


ValueError: transmat_ rows must sum to 1 (got row sums of [0. 0. 0. 0.])