# Raw Data Visualizer 測定データの生値を可視化

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
import ipywidgets as widgets
from ipywidgets import interactive_output
from IPython.display import display

%matplotlib inline

In [2]:
def process_all_logs(data_dir):
    """指定されたディレクトリ内の全てのCSVを読み込み、一つのDataFrameにまとめる"""
    all_files = glob.glob(os.path.join(data_dir, "*.csv"))
    if not all_files:
        print(f"エラー: ディレクトリ '{data_dir}' にCSVファイルが見つかりません。")
        return None

    df_list = []
    for filename in all_files:
        try:
            basename = os.path.basename(filename)
            parts = basename.replace('.csv', '').split('_')
            df = pd.read_csv(filename)
            df['SubjectID'] = parts[0]
            df['Block'] = int(parts[1].replace('Block', ''))
            df_list.append(df)
        except Exception as e:
            print(f"ファイル {filename} の読み込み中にエラー: {e}")

    if not df_list:
        return None

    return pd.concat(df_list, ignore_index=True)

# ログファイルが保存されているディレクトリを指定して、データを読み込む
LOG_DIR = '../../data/RawDatas/'  # あなた環境に合わせてパスを修正してください
full_df = process_all_logs(LOG_DIR)

if full_df is not None:
    print("データの読み込みが完了しました。")
    # SubjectIDを文字列として扱うように明示的に型変換
    full_df['SubjectID'] = full_df['SubjectID'].astype(str)

データの読み込みが完了しました。


In [3]:
def plot_time_range_trajectory(df, subject_id, block_num, start_time, end_time):
    """指定された被験者・ブロック・時間範囲でハンドル軌跡をプロットする"""
    if df is None:
        return

    # ウィジェットから渡される値がNoneでないことを確認
    if subject_id is None or block_num is None or start_time is None or end_time is None:
        return

    # 型変換
    try:
        subject_id = str(subject_id)
        block_num = int(block_num)
        start_time = float(start_time)
        end_time = float(end_time)
    except ValueError:
        print("型変換に失敗しました。ウィジェットの値を確認してください。")
        return

    # 時間範囲の妥当性チェック
    if start_time >= end_time:
        print("開始時刻は終了時刻より小さい値を設定してください。")
        return

    # 指定された条件でデータをフィルタリング
    mask = (df['SubjectID'] == subject_id) & (df['Block'] == block_num)
    block_data = df[mask]

    if block_data.empty:
        print(f"データが見つかりません: Subject='{subject_id}', Block={block_num}")
        return

    # 時刻カラムがあるかチェック（データ構造に応じて調整）
    if 'Time' in block_data.columns:
        time_col = 'Time'
    elif 'time' in block_data.columns:
        time_col = 'time'
    elif 'Timestamp' in block_data.columns:
        time_col = 'Timestamp'
    else:
        # 時刻カラムがない場合、インデックスを時刻として使用
        block_data = block_data.reset_index()
        block_data['TimeIndex'] = block_data.index * 0.01  # 仮定: 10ms間隔
        time_col = 'TimeIndex'
        print("時刻カラムが見つからないため、インデックスベースの時刻を使用します。")

    # 時間範囲でフィルタリング
    time_mask = (block_data[time_col] >= start_time) & (block_data[time_col] <= end_time)
    time_filtered_data = block_data[time_mask]

    if time_filtered_data.empty:
        print(f"指定された時間範囲 ({start_time}s - {end_time}s) にデータがありません。")
        print(f"利用可能な時間範囲: {block_data[time_col].min():.2f}s - {block_data[time_col].max():.2f}s")
        return

    print(f"時間範囲フィルタリング後のデータ数: {len(time_filtered_data)}")

    # Trialの切り替わりタイミングを検出
    trial_transitions = []
    if 'CurrentTrial' in time_filtered_data.columns:
        # CurrentTrialの値が変わる点を検出
        trial_changes = time_filtered_data['CurrentTrial'].diff() != 0
        trial_change_indices = time_filtered_data[trial_changes].index

        for idx in trial_change_indices:
            if idx in time_filtered_data.index:
                transition_time = time_filtered_data.loc[idx, time_col]
                trial_num = time_filtered_data.loc[idx, 'CurrentTrial']
                trial_transitions.append((transition_time, trial_num))

        print(f"検出されたTrial切り替わり: {len(trial_transitions)}個")
        for time_point, trial_num in trial_transitions:
            print(f"  時刻 {time_point:.2f}s: Trial {trial_num}")

    # プロットの準備（3つのサブプロットに変更）
    plt.figure(figsize=(15, 5))

    # サブプロット1: X位置の時系列
    plt.subplot(1, 3, 1)
    plt.plot(time_filtered_data[time_col], time_filtered_data['HandlePosX'], 'b-', linewidth=2, label='Handle X')

    # StartTargetとEndTargetの全データポイントをプロット
    plt.plot(time_filtered_data[time_col], time_filtered_data['TargetStartPosX'], 'g--',
             linewidth=2, alpha=0.7, label='Start Target X')
    plt.plot(time_filtered_data[time_col], time_filtered_data['TargetEndPosX'], 'r--',
             linewidth=2, alpha=0.7, label='End Target X')

    # Trial切り替わりタイミングに縦線を追加
    for i, (transition_time, trial_num) in enumerate(trial_transitions):
        color = 'orange' if i % 2 == 0 else 'magenta'
        plt.axvline(x=transition_time, color=color, linestyle=':', linewidth=2, alpha=0.8)
        # 縦線の上部にTrial番号を表示
        y_max = plt.ylim()[1]
        plt.text(transition_time, y_max * 0.95, f'T{int(trial_num)}',
                rotation=90, ha='right', va='top', fontsize=9, color=color)

    plt.title(f'X Position vs Time\nSubject={subject_id}, Block={block_num}, Time: {start_time}s - {end_time}s')
    plt.xlabel('Time [s]')
    plt.ylabel('X Position [m]')
    plt.grid(True)
    plt.legend()

    # サブプロット2: Y位置の時系列
    plt.subplot(1, 3, 2)
    plt.plot(time_filtered_data[time_col], time_filtered_data['HandlePosY'], 'b-', linewidth=2, label='Handle Y')

    # StartTargetとEndTargetの全データポイントをプロット
    plt.plot(time_filtered_data[time_col], time_filtered_data['TargetStartPosY'], 'g--',
             linewidth=2, alpha=0.7, label='Start Target Y')
    plt.plot(time_filtered_data[time_col], time_filtered_data['TargetEndPosY'], 'r--',
             linewidth=2, alpha=0.7, label='End Target Y')

    # Trial切り替わりタイミングに縦線を追加
    for i, (transition_time, trial_num) in enumerate(trial_transitions):
        color = 'orange' if i % 2 == 0 else 'magenta'
        plt.axvline(x=transition_time, color=color, linestyle=':', linewidth=2, alpha=0.8)
        # 縦線の上部にTrial番号を表示
        y_max = plt.ylim()[1]
        plt.text(transition_time, y_max * 0.95, f'T{int(trial_num)}',
                rotation=90, ha='right', va='top', fontsize=9, color=color)

    plt.title('Y Position vs Time')
    plt.xlabel('Time [s]')
    plt.ylabel('Y Position [m]')
    plt.grid(True)
    plt.legend()

    # サブプロット3: 速度プロット
    plt.subplot(1, 3, 3)
    if len(time_filtered_data) > 1:
        # 速度計算（差分近似）
        dt = time_filtered_data[time_col].diff().mean()
        vx = time_filtered_data['HandlePosX'].diff() / dt
        vy = time_filtered_data['HandlePosY'].diff() / dt
        speed = np.sqrt(vx**2 + vy**2)

        plt.plot(time_filtered_data[time_col], speed, 'purple', linewidth=2, label='Handle Speed')

        # Trial切り替わりタイミングに縦線を追加
        for i, (transition_time, trial_num) in enumerate(trial_transitions):
            color = 'orange' if i % 2 == 0 else 'magenta'
            plt.axvline(x=transition_time, color=color, linestyle=':', linewidth=2, alpha=0.8)
            # 縦線の上部にTrial番号を表示
            y_max = plt.ylim()[1]
            plt.text(transition_time, y_max * 0.95, f'T{int(trial_num)}',
                    rotation=90, ha='right', va='top', fontsize=9, color=color)

        plt.title('Handle Speed vs Time')
        plt.xlabel('Time [s]')
        plt.ylabel('Speed [m/s]')
        plt.grid(True)
        plt.legend()
    else:
        plt.text(0.5, 0.5, 'Insufficient data\nfor speed calculation',
                ha='center', va='center', transform=plt.gca().transAxes)
        plt.title('Handle Speed vs Time')

    plt.tight_layout()
    plt.show()


if full_df is not None:
    # 時間範囲プロット用のウィジェット作成
    subject_dropdown_time = widgets.Dropdown(description='被験者ID:')
    block_dropdown_time = widgets.Dropdown(description='ブロック:')

    # 時間範囲の動的設定のためのスライダー
    start_time_slider = widgets.FloatSlider(
        description='開始時刻:',
        min=0.0,
        max=1500.0,  # 初期値、後で動的に更新
        step=0.1,
        value=0.0
    )

    end_time_slider = widgets.FloatSlider(
        description='終了時刻:',
        min=0.0,
        max=1500.0,  # 初期値、後で動的に更新
        step=0.1,
        value=5.0
    )

    # ドロップダウンの連動ロジック（時間範囲用）
    def update_block_options_time(*args):
        selected_subject = subject_dropdown_time.value
        if selected_subject:
            blocks = sorted(full_df[full_df['SubjectID'] == selected_subject]['Block'].unique())
            block_dropdown_time.options = blocks

    def update_time_range(*args):
        selected_subject = subject_dropdown_time.value
        selected_block = block_dropdown_time.value
        if selected_subject and selected_block:
            # 選択された被験者・ブロックのデータを取得
            mask = (full_df['SubjectID'] == selected_subject) & (full_df['Block'] == selected_block)
            block_data = full_df[mask]

            if not block_data.empty:
                # 時刻カラムを特定
                time_col = None
                for col in ['Time', 'time', 'Timestamp']:
                    if col in block_data.columns:
                        time_col = col
                        break

                if time_col is None:
                    # インデックスベースの時刻を使用
                    max_time = len(block_data) * 0.01  # 仮定: 10ms間隔
                    min_time = 0.0
                else:
                    min_time = block_data[time_col].min()
                    max_time = block_data[time_col].max()

                # スライダーの範囲を更新
                start_time_slider.min = min_time
                start_time_slider.max = max_time
                start_time_slider.value = min_time

                end_time_slider.min = min_time
                end_time_slider.max = max_time
                end_time_slider.value = max_time

    # イベントハンドラーを設定
    subject_dropdown_time.observe(update_block_options_time, 'value')
    block_dropdown_time.observe(update_time_range, 'value')

    # インタラクティブプロット
    interactive_time_plot = interactive_output(
        plot_time_range_trajectory,
        {'df': widgets.fixed(full_df),
         'subject_id': subject_dropdown_time,
         'block_num': block_dropdown_time,
         'start_time': start_time_slider,
         'end_time': end_time_slider
        }
    )

    # 初期値設定
    subject_ids = sorted(full_df['SubjectID'].unique())
    subject_dropdown_time.options = subject_ids

    # UIの表示
    time_controls = widgets.VBox([
        widgets.HBox([subject_dropdown_time, block_dropdown_time]),
        widgets.HBox([start_time_slider, end_time_slider])
    ])

    print("\\n" + "="*60)
    print("時間範囲指定による軌跡可視化")
    print("="*60)
    print("被験者・ブロック・時間範囲を選択してハンドル軌跡を表示:")
    display(time_controls, interactive_time_plot)

時間範囲指定による軌跡可視化
被験者・ブロック・時間範囲を選択してハンドル軌跡を表示:


VBox(children=(HBox(children=(Dropdown(description='被験者ID:', options=('h.nakamura', 'r.morishita', 'r.yanase',…

Output()

In [4]:
def plot_trial_trajectory(df, subject_id, block_num, trial_id):
    """
    指定された被験者・ブロック・試行IDの軌跡をプロットする
    'TRIAL_RUNNING'状態のデータのみを対象とする
    """
    if df is None or subject_id is None or block_num is None or trial_id is None:
        return

    try:
        subject_id = str(subject_id)
        block_num = int(block_num)
        trial_id = int(trial_id)
    except (ValueError, TypeError):
        print("型変換に失敗しました。ウィジェットの値を確認してください。")
        return

    # 条件に合うデータをフィルタリング（列名を修正）
    mask = (df['SubjectID'] == subject_id) & \
           (df['Block'] == block_num) & \
           (df['CurrentTrial'] == trial_id)

    trial_data = df[mask]

    if trial_data.empty:
        print(f"データが見つかりません: Subject='{subject_id}', Block={block_num}, Trial={trial_id}")
        return

    # 実際のデータに含まれる状態を確認
    print(f"利用可能な状態: {trial_data['TrialState'].unique()}")

    # TRIAL_RUNNING状態が存在するかチェック、なければ別の状態を使用
    if 'TRIAL_RUNNING' in trial_data['TrialState'].values:
        running_data = trial_data[trial_data['TrialState'] == 'TRIAL_RUNNING']
    else:
        # TRIAL_RUNNING状態がない場合は全データを使用
        running_data = trial_data
        print("TRIAL_RUNNING状態が見つからないため、全データを使用します。")

    if running_data.empty:
        print("プロット用のデータが見つかりません。")
        return

    # ターゲット座標は試行内で一定なので、最初の行から取得
    start_target_x = running_data['TargetStartPosX'].iloc[0]
    start_target_y = running_data['TargetStartPosY'].iloc[0]
    end_target_x = running_data['TargetEndPosX'].iloc[0]
    end_target_y = running_data['TargetEndPosY'].iloc[0]

    # プロット準備
    plt.figure(figsize=(8, 8))

    # 軌道をプロット
    plt.plot(running_data['HandlePosX'], running_data['HandlePosY'], 'b-', linewidth=2.5, alpha=0.8, label='Handle Trajectory')

    # 軌道の開始点と終了点をプロット
    plt.plot(running_data['HandlePosX'].iloc[0], running_data['HandlePosY'].iloc[0], 'go', markersize=12, label='Actual Start')
    plt.plot(running_data['HandlePosX'].iloc[-1], running_data['HandlePosY'].iloc[-1], 'ro', markersize=12, label='Actual End')

    # ターゲットをプロット
    plt.plot(start_target_x, start_target_y, 'gX', markersize=15, mew=2, label='Start Target')
    plt.plot(end_target_x, end_target_y, 'rX', markersize=15, mew=2, label='End Target')

    # グラフの体裁を整える
    plt.title(f'Handle Trajectory (X vs Y)\nSubject={subject_id}, Block={block_num}, Trial={trial_id}')
    plt.xlabel('X Position [m]')
    plt.ylabel('Y Position [m]')
    plt.grid(True)
    plt.axis('equal')  # X軸とY軸のスケールを合わせる
    plt.legend(loc='best')
    plt.show()


# --- インタラクティブウィジェットの作成 ---

if 'full_df' in locals() and full_df is not None:
    # ドロップダウンウィジェットを作成
    subject_dropdown = widgets.Dropdown(description='被験者ID:')
    block_dropdown = widgets.Dropdown(description='ブロック:')
    trial_dropdown = widgets.Dropdown(description='試行ID:')

    # ドロップダウンの連動ロジック
    def update_block_options(*args):
        selected_subject = subject_dropdown.value
        if selected_subject:
            blocks = sorted(full_df[full_df['SubjectID'] == selected_subject]['Block'].unique())
            block_dropdown.options = blocks

    def update_trial_options(*args):
        selected_subject = subject_dropdown.value
        selected_block = block_dropdown.value
        if selected_subject and selected_block:
            mask = (full_df['SubjectID'] == selected_subject) & (full_df['Block'] == selected_block)
            trials = sorted(full_df[mask]['CurrentTrial'].unique())
            trial_dropdown.options = trials

    # イベントハンドラを登録
    subject_dropdown.observe(update_block_options, 'value')
    block_dropdown.observe(update_trial_options, 'value')

    # インタラクティブプロットの出力先を定義
    interactive_plot = interactive_output(
        plot_trial_trajectory,
        {
            'df': widgets.fixed(full_df),
            'subject_id': subject_dropdown,
            'block_num': block_dropdown,
            'trial_id': trial_dropdown
        }
    )

    # 初期値を設定
    subject_ids = sorted(full_df['SubjectID'].unique())
    if subject_ids:
        subject_dropdown.options = subject_ids
    else:
        print("データ内に 'SubjectID' が見つかりません。")

    # UIの表示
    controls = widgets.VBox([
        widgets.HBox([subject_dropdown, block_dropdown, trial_dropdown])
    ])

    print("\n" + "="*60)
    print("試行ごとの軌跡可視化")
    print("="*60)
    print("被験者・ブロック・試行IDを選択してハンドル軌跡を表示:")
    display(controls, interactive_plot)


試行ごとの軌跡可視化
被験者・ブロック・試行IDを選択してハンドル軌跡を表示:


VBox(children=(HBox(children=(Dropdown(description='被験者ID:', options=('h.nakamura', 'r.morishita', 'r.yanase',…

Output()

In [5]:
def plot_handle_speed_components(df, subject_id, block_num, trial_id):
    """
    指定された被験者・ブロック・試行IDのハンドル速度（X、Y成分）をプロットする
    CSVファイルのHandleVelX, HandleVelYを使用
    """
    if df is None or subject_id is None or block_num is None or trial_id is None:
        return

    try:
        subject_id = str(subject_id)
        block_num = int(block_num)
        trial_id = int(trial_id)
    except (ValueError, TypeError):
        print("型変換に失敗しました。ウィジェットの値を確認してください。")
        return

    # 条件に合うデータをフィルタリング
    mask = (df['SubjectID'] == subject_id) & \
           (df['Block'] == block_num) & \
           (df['CurrentTrial'] == trial_id)

    trial_data = df[mask]

    if trial_data.empty:
        print(f"データが見つかりません: Subject='{subject_id}', Block={block_num}, Trial={trial_id}")
        return

    # 速度カラムの存在確認
    if 'HandleVelX' not in trial_data.columns or 'HandleVelY' not in trial_data.columns:
        print("速度データ（HandleVelX, HandleVelY）が見つかりません。")
        print(f"利用可能なカラム: {trial_data.columns.tolist()}")
        return

    # 時刻カラムを特定
    time_col = None
    for col in ['Timestamp', 'Time', 'time']:
        if col in trial_data.columns:
            time_col = col
            break

    if time_col is None:
        # インデックスベースの時刻を使用
        trial_data = trial_data.reset_index(drop=True)
        trial_data['TimeIndex'] = trial_data.index * 0.01  # 仮定: 10ms間隔
        time_col = 'TimeIndex'
        print("時刻カラムが見つからないため、インデックスベースの時刻を使用します。")

    # 速度データを取得
    vx = trial_data['HandleVelX']
    vy = trial_data['HandleVelY']

    # 速度の大きさを計算
    speed = np.sqrt(vx**2 + vy**2)

    # プロット準備（2×2のサブプロット）
    plt.figure(figsize=(15, 10))

    # サブプロット1: X方向とY方向の速度成分
    plt.subplot(2, 2, 1)
    plt.plot(trial_data[time_col], vx, 'r-', linewidth=2, label='X方向速度', alpha=0.8)
    plt.plot(trial_data[time_col], vy, 'b-', linewidth=2, label='Y方向速度', alpha=0.8)
    plt.title(f'Handle Speed Components (X & Y)\nSubject={subject_id}, Block={block_num}, Trial={trial_id}')
    plt.xlabel('Time [s]')
    plt.ylabel('Speed [m/s]')
    plt.grid(True)
    plt.legend()

    # サブプロット2: 速度の大きさ
    plt.subplot(2, 2, 2)
    plt.plot(trial_data[time_col], speed, 'purple', linewidth=2, label='速度の大きさ', alpha=0.8)
    plt.title('Handle Speed Magnitude')
    plt.xlabel('Time [s]')
    plt.ylabel('Speed [m/s]')
    plt.grid(True)
    plt.legend()

    # サブプロット3: X方向速度のみ
    plt.subplot(2, 2, 3)
    plt.plot(trial_data[time_col], vx, 'r-', linewidth=2, label='X方向速度')
    plt.title('X-Direction Speed')
    plt.xlabel('Time [s]')
    plt.ylabel('Speed [m/s]')
    plt.grid(True)
    plt.legend()

    # サブプロット4: Y方向速度のみ
    plt.subplot(2, 2, 4)
    plt.plot(trial_data[time_col], vy, 'b-', linewidth=2, label='Y方向速度')
    plt.title('Y-Direction Speed')
    plt.xlabel('Time [s]')
    plt.ylabel('Speed [m/s]')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.show()

    # 統計情報を表示
    print(f"\\n速度統計情報:")
    print(f"X方向速度 - 平均: {vx.mean():.4f} m/s, 標準偏差: {vx.std():.4f} m/s, 最大: {vx.max():.4f} m/s")
    print(f"Y方向速度 - 平均: {vy.mean():.4f} m/s, 標準偏差: {vy.std():.4f} m/s, 最大: {vy.max():.4f} m/s")
    print(f"速度の大きさ - 平均: {speed.mean():.4f} m/s, 標準偏差: {speed.std():.4f} m/s, 最大: {speed.max():.4f} m/s")


# --- ハンドル速度プロット用のインタラクティブウィジェット ---

if 'full_df' in locals() and full_df is not None:
    # ドロップダウンウィジェットを作成
    subject_dropdown_speed = widgets.Dropdown(description='被験者ID:')
    block_dropdown_speed = widgets.Dropdown(description='ブロック:')
    trial_dropdown_speed = widgets.Dropdown(description='試行ID:')

    # ドロップダウンの連動ロジック
    def update_block_options_speed(*args):
        selected_subject = subject_dropdown_speed.value
        if selected_subject:
            blocks = sorted(full_df[full_df['SubjectID'] == selected_subject]['Block'].unique())
            block_dropdown_speed.options = blocks

    def update_trial_options_speed(*args):
        selected_subject = subject_dropdown_speed.value
        selected_block = block_dropdown_speed.value
        if selected_subject and selected_block:
            mask = (full_df['SubjectID'] == selected_subject) & (full_df['Block'] == selected_block)
            trials = sorted(full_df[mask]['CurrentTrial'].unique())
            trial_dropdown_speed.options = trials

    # イベントハンドラを登録
    subject_dropdown_speed.observe(update_block_options_speed, 'value')
    block_dropdown_speed.observe(update_trial_options_speed, 'value')

    # インタラクティブプロットの出力先を定義
    interactive_speed_plot = interactive_output(
        plot_handle_speed_components,
        {
            'df': widgets.fixed(full_df),
            'subject_id': subject_dropdown_speed,
            'block_num': block_dropdown_speed,
            'trial_id': trial_dropdown_speed
        }
    )

    # 初期値を設定
    subject_ids = sorted(full_df['SubjectID'].unique())
    if subject_ids:
        subject_dropdown_speed.options = subject_ids
    else:
        print("データ内に 'SubjectID' が見つかりません。")

    # UIの表示
    speed_controls = widgets.VBox([
        widgets.HBox([subject_dropdown_speed, block_dropdown_speed, trial_dropdown_speed])
    ])

    print("\\n" + "="*60)
    print("ハンドル速度成分の可視化")
    print("="*60)
    print("被験者・ブロック・試行IDを選択してハンドル速度（X、Y成分）を表示:")
    display(speed_controls, interactive_speed_plot)

ハンドル速度成分の可視化
被験者・ブロック・試行IDを選択してハンドル速度（X、Y成分）を表示:


VBox(children=(HBox(children=(Dropdown(description='被験者ID:', options=('h.nakamura', 'r.morishita', 'r.yanase',…

Output()