# Action-Angle Networks: 二重振り子の時系列予測

このノートブックでは、Action-Angle Networks (AANs) を用いて二重振り子の複雑な動力学の時系列予測を行います。

## 概要
1. **二重振り子の物理シミュレーション**
2. **カオス的動力学の可視化**
3. **Action-Angle Networkのトレーニング**
4. **長期予測性能の評価と比較**

二重振り子は非線形で複雑な動力学を示し、わずかな初期条件の違いが大きく異なる軌道を生む典型的なカオス系です。Action-Angle Networksがこの複雑さをどのように扱うかを探ります。

## ライブラリのインポート

In [1]:
import numpy as np
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Circle
import ml_collections
import os
import sys
import diffrax
import flax.linen as nn
import optax
from flax.training import train_state
from typing import Tuple, Dict, Mapping

# プロジェクトのルートディレクトリを追加
sys.path.append('../')

# Action Angle Networks のモジュールをインポート
from action_angle_networks.simulation import double_pendulum_simulation
from action_angle_networks.configs.double_pendulum import default
from action_angle_networks.configs.double_pendulum import action_angle_flow
from action_angle_networks import models

# 描画の設定
plt.style.use('default')
%matplotlib inline

# JAXの設定
jax.config.update("jax_enable_x64", True)

## 1. 二重振り子の物理とシミュレーション

二重振り子は2つの振り子が連結されたシステムで、以下のパラメータで特徴づけられます：

- `l1`, `l2`: 振り子の長さ
- `m1`, `m2`: 振り子の質量
- `θ1`, `θ2`: 角度（垂直から測定）
- `p1`, `p2`: 正準運動量

ハミルトニアン力学を用いて運動方程式を解きます。

In [6]:
def create_double_pendulum_config():
    """二重振り子用の設定を作成"""
    config = ml_collections.ConfigDict()
    
    # シミュレーション設定
    config.num_trajectories = 20
    config.num_samples = 500  # 時間ステップ数
    config.time_delta = 0.02  # 時間刻み
    config.noise_std = 0.01   # ノイズレベル
    
    # 物理パラメータの範囲
    config.simulation_parameter_ranges = ml_collections.ConfigDict({
        "l1": (1.0,),        # 第1振り子の長さ
        "l2": (0.8,),        # 第2振り子の長さ  
        "m1": (1.0,),        # 第1振り子の質量
        "m2": (0.8,),        # 第2振り子の質量
        "theta1_init": (0.1, 1.5),  # 初期角度の範囲
        "theta2_init": (0.1, 1.5),  # 初期角度の範囲
    })
    
    return config

config = create_double_pendulum_config()

# 時間配列を作成
times = jnp.arange(0, config.num_samples * config.time_delta, config.time_delta)

print(f"シミュレーション設定:")
print(f"  軌跡数: {config.num_trajectories}")
print(f"  時間ステップ数: {config.num_samples}")
print(f"  時間刻み: {config.time_delta}")
print(f"  総シミュレーション時間: {times[-1]:.2f} 秒")
print(f"  ノイズ標準偏差: {config.noise_std}")

シミュレーション設定:
  軌跡数: 20
  時間ステップ数: 500
  時間刻み: 0.02
  総シミュレーション時間: 9.98 秒
  ノイズ標準偏差: 0.01


### 様々な初期条件での二重振り子シミュレーション

In [None]:
def simulate_double_pendulum_trajectories(config, rng_key):
    """複数の初期条件で二重振り子をシミュレーション"""
    
    # バグ修正版のパラメータサンプリング関数
    def fixed_sample_simulation_parameters(simulation_parameter_ranges, num_trajectories, rng):
        """修正版: シミュレーションパラメータをサンプル"""
        required_params = ["l1", "l2", "m1", "m2", "theta1_init", "theta2_init"]
        for param in required_params:
            if param not in simulation_parameter_ranges:
                raise ValueError(f"Missing simulation parameter: {param}")
        
        is_tuple = lambda val: isinstance(val, tuple)
        ranges_flat, ranges_treedef = jax.tree_util.tree_flatten(
            simulation_parameter_ranges, is_leaf=is_tuple
        )
        rng, *rngs = jax.random.split(rng, len(ranges_flat) + 1)
        # 修正: tree_unflattenを使用
        rng_tree = jax.tree_util.tree_unflatten(ranges_treedef, rngs)
        
        def sample_simulation_parameter(simulation_parameter_range, parameter_rng):
            if len(simulation_parameter_range) == 1:
                # 固定値の場合、全軌跡で同じ値を返す（配列として）
                return jnp.full(num_trajectories, simulation_parameter_range[0])
            minval, maxval = simulation_parameter_range
            # 範囲がある場合、各軌跡で異なる値をサンプル
            return jax.random.uniform(parameter_rng, (num_trajectories,), minval=minval, maxval=maxval)
        
        return jax.tree_util.tree_map(
            sample_simulation_parameter,
            simulation_parameter_ranges,
            rng_tree,
            is_leaf=is_tuple
        )
    
    # 修正版の関数を使用
    simulation_params = fixed_sample_simulation_parameters(
        config.simulation_parameter_ranges.to_dict(),
        config.num_trajectories,
        rng_key
    )
    
    all_positions = []
    all_momentums = []
    all_sim_params = []
    
    print("二重振り子軌跡をシミュレーション中...")
    
    for i in range(config.num_trajectories):
        # 各軌跡のパラメータを取得
        traj_params = {}
        for key in simulation_params.keys():
            # 修正: 配列かどうかをチェックしてからインデックス
            param_val = simulation_params[key]
            if hasattr(param_val, 'shape') and len(param_val.shape) > 0:
                traj_params[key] = param_val[i]
            else:
                traj_params[key] = param_val
        
        try:
            # 正準座標を生成
            positions, momentums = double_pendulum_simulation.generate_canonical_coordinates(
                times, traj_params
            )
            
            # ノイズを追加（各軌跡ごとに異なるノイズ）
            noise_key, subkey = jax.random.split(rng_key)
            rng_key = noise_key  # 次のイテレーション用に更新
            
            pos_noise = jax.random.normal(subkey, positions.shape) * config.noise_std
            mom_noise = jax.random.normal(subkey, momentums.shape) * config.noise_std
            
            positions_noisy = positions + pos_noise
            momentums_noisy = momentums + mom_noise
            
            all_positions.append(positions_noisy)
            all_momentums.append(momentums_noisy)
            all_sim_params.append(traj_params)
            
            if (i + 1) % 5 == 0:
                print(f"  軌跡 {i+1}/{config.num_trajectories} 完了")
                
        except Exception as e:
            print(f"  警告: 軌跡 {i+1} でエラー: {e}")
            continue
    
    if len(all_positions) == 0:
        print("シミュレーションに失敗しました。空の配列を返します。")
        return jnp.array([]), jnp.array([]), []
    
    return jnp.array(all_positions), jnp.array(all_momentums), all_sim_params

# シミュレーション実行
rng = jax.random.PRNGKey(42)
positions, momentums, sim_params = simulate_double_pendulum_trajectories(config, rng)

print(f"\n=== シミュレーション結果 ===")
print(f"成功した軌跡数: {len(positions)}")
if len(positions) > 0:
    print(f"位置データ形状: {positions.shape}")
    print(f"運動量データ形状: {momentums.shape}")
    
    # サンプルパラメータを表示
    print(f"\n=== サンプルパラメータ例 ===")
    if len(sim_params) > 0:
        sample_param = sim_params[0]
        for key, value in sample_param.items():
            print(f"{key}: {value:.3f}")
else:
    print("シミュレーションに失敗しました。パラメータを調整してください。")

### 二重振り子の動力学可視化

In [None]:
def plot_double_pendulum_analysis(positions, momentums, sim_params, times):
    """二重振り子の動力学を詳細に解析・可視化"""
    
    if len(positions) == 0:
        print("表示できるデータがありません")
        return
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    
    # カラーマップ
    colors = plt.cm.viridis(np.linspace(0, 1, min(10, len(positions))))
    
    # 1. 角度の時系列
    ax = axes[0, 0]
    for i, color in enumerate(colors):
        ax.plot(times, positions[i, :, 0], color=color, alpha=0.7, linewidth=1, label=f'θ1 (軌跡{i+1})')
        ax.plot(times, positions[i, :, 1], color=color, alpha=0.5, linewidth=1, linestyle='--', label=f'θ2 (軌跡{i+1})')
    ax.grid(True)
    ax.set_title('角度の時間変化')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('角度 (rad)')
    ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    
    # 2. 位相空間 (θ1 vs p1)
    ax = axes[0, 1]
    for i, color in enumerate(colors):
        ax.plot(positions[i, :, 0], momentums[i, :, 0], color=color, alpha=0.7, linewidth=1)
        ax.plot(positions[i, 0, 0], momentums[i, 0, 0], 'o', color=color, markersize=5)
    ax.grid(True)
    ax.set_title('第1振り子の位相空間\n(θ1 vs p1)')
    ax.set_xlabel('θ1 (rad)')
    ax.set_ylabel('p1')
    
    # 3. 位相空間 (θ2 vs p2)
    ax = axes[0, 2]
    for i, color in enumerate(colors):
        ax.plot(positions[i, :, 1], momentums[i, :, 1], color=color, alpha=0.7, linewidth=1)
        ax.plot(positions[i, 0, 1], momentums[i, 0, 1], 'o', color=color, markersize=5)
    ax.grid(True)
    ax.set_title('第2振り子の位相空間\n(θ2 vs p2)')
    ax.set_xlabel('θ2 (rad)')
    ax.set_ylabel('p2')
    
    # 4. カルテシアン座標での軌跡
    ax = axes[1, 0]
    for i, color in enumerate(colors):
        # カルテシアン座標に変換
        pos1, pos2 = jax.vmap(double_pendulum_simulation.polar_to_cartesian, in_axes=(0, None))(
            positions[i], sim_params[i]
        )
        
        ax.plot(pos1[:, 0], pos1[:, 1], color=color, alpha=0.7, linewidth=1, label=f'質量1')
        ax.plot(pos2[:, 0], pos2[:, 1], color=color, alpha=0.5, linewidth=1, linestyle='--', label=f'質量2')
        ax.plot(pos1[0, 0], pos1[0, 1], 'o', color=color, markersize=5)
        ax.plot(pos2[0, 0], pos2[0, 1], 's', color=color, markersize=5)
    
    ax.set_aspect('equal')
    ax.grid(True)
    ax.set_title('カルテシアン座標での軌跡')
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    
    # 5. エネルギー保存
    ax = axes[1, 1]
    for i, color in enumerate(colors):
        # ハミルトニアンを計算
        energies = jax.vmap(double_pendulum_simulation.compute_hamiltonian, in_axes=(0, 0, None))(
            positions[i], momentums[i], sim_params[i]
        )
        ax.plot(times, energies, color=color, alpha=0.7, linewidth=1)
    
    ax.grid(True)
    ax.set_title('エネルギー保存')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('ハミルトニアン')
    
    # 6. 初期条件の分布
    ax = axes[1, 2]
    theta1_inits = [sim_params[i]['theta1_init'] for i in range(len(sim_params))]
    theta2_inits = [sim_params[i]['theta2_init'] for i in range(len(sim_params))]
    
    scatter = ax.scatter(theta1_inits, theta2_inits, c=range(len(theta1_inits)), 
                        cmap='viridis', s=50, alpha=0.7)
    ax.grid(True)
    ax.set_title('初期条件の分布')
    ax.set_xlabel('θ1_init (rad)')
    ax.set_ylabel('θ2_init (rad)')
    plt.colorbar(scatter, ax=ax, label='軌跡番号')
    
    plt.tight_layout()
    plt.show()

# 可視化実行
if len(positions) > 0:
    plot_double_pendulum_analysis(positions, momentums, sim_params, times)
else:
    print("可視化用のデータがありません")

### カオス的動力学の特性分析

In [4]:
def analyze_chaotic_behavior(positions, times):
    """二重振り子のカオス的振る舞いを分析"""
    
    if len(positions) < 2:
        print("カオス解析には少なくとも2つの軌跡が必要です")
        return
    
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    # 1. 初期条件感度の分析
    ax = axes[0]
    
    # 最初の軌跡を基準とする
    base_traj = positions[0]
    
    separation_distances = []
    for i in range(1, min(5, len(positions))):
        # 軌跡間の距離を計算
        diff = positions[i] - base_traj
        distance = jnp.sqrt(jnp.sum(diff**2, axis=1))
        separation_distances.append(distance)
        
        ax.semilogy(times, distance, label=f'軌跡 {i+1}との距離', alpha=0.7)
    
    ax.grid(True)
    ax.set_title('初期条件感度\n(軌跡間距離の時間発展)')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('軌跡間距離（対数スケール）')
    ax.legend()
    
    # 2. ポアンカレ断面
    ax = axes[1]
    
    for i in range(min(5, len(positions))):
        # θ1 = 0を横切る点を見つける（ダウンスイング）
        theta1 = positions[i, :, 0]
        theta2 = positions[i, :, 1]
        
        # ゼロクロッシングを検出
        zero_crossings = []
        for j in range(1, len(theta1)):
            if theta1[j-1] < 0 and theta1[j] > 0:  # 上向きクロッシング
                zero_crossings.append(j)
        
        if len(zero_crossings) > 1:
            theta2_crossings = theta2[zero_crossings]
            # momentums の対応する値
            p2_crossings = momentums[i, zero_crossings, 1]
            
            ax.scatter(theta2_crossings, p2_crossings, alpha=0.7, s=20, label=f'軌跡 {i+1}')
    
    ax.grid(True)
    ax.set_title('ポアンカレ断面\n(θ1=0でのθ2 vs p2)')
    ax.set_xlabel('θ2 (rad)')
    ax.set_ylabel('p2')
    ax.legend()
    
    # 3. 周波数解析
    ax = axes[2]
    
    for i in range(min(3, len(positions))):
        # θ1の時系列をフーリエ変換
        theta1_signal = positions[i, :, 0]
        dt = times[1] - times[0]
        freqs = np.fft.fftfreq(len(theta1_signal), dt)
        fft_vals = np.fft.fft(theta1_signal)
        power_spectrum = np.abs(fft_vals)**2
        
        # 正の周波数のみプロット
        positive_freqs = freqs[:len(freqs)//2]
        positive_power = power_spectrum[:len(power_spectrum)//2]
        
        ax.semilogy(positive_freqs, positive_power, alpha=0.7, label=f'軌跡 {i+1}')
    
    ax.grid(True)
    ax.set_title('パワースペクトラム\n(θ1の周波数成分)')
    ax.set_xlabel('周波数 (Hz)')
    ax.set_ylabel('パワー（対数スケール）')
    ax.set_xlim(0, 5)  # 低周波数領域に注目
    ax.legend()
    
    plt.tight_layout()
    plt.show()
    
    # 統計情報
    print("\n=== カオス的特性の統計 ===")
    if len(separation_distances) > 0:
        # リアプノフ指数の粗い推定
        final_separations = [dist[-1] for dist in separation_distances]
        initial_separations = [dist[0] for dist in separation_distances]
        lyapunov_estimates = [np.log(final/initial) / times[-1] 
                            for final, initial in zip(final_separations, initial_separations)
                            if initial > 0]
        
        if lyapunov_estimates:
            avg_lyapunov = np.mean(lyapunov_estimates)
            print(f"推定リアプノフ指数: {avg_lyapunov:.3f} (正の値はカオスを示唆)")
        
        print(f"最終的な軌跡間分離: {np.mean(final_separations):.3f} ± {np.std(final_separations):.3f}")

# カオス解析実行
if len(positions) > 1:
    analyze_chaotic_behavior(positions, times)

NameError: name 'positions' is not defined

## 2. 機械学習用のデータ準備

In [None]:
def prepare_double_pendulum_data(positions, momentums, train_ratio=0.8):
    """二重振り子データを機械学習用に準備"""
    
    if len(positions) == 0:
        return None, None
    
    # 位置と運動量を結合 [θ1, θ2, p1, p2]
    combined_data = jnp.concatenate([positions, momentums], axis=-1)
    
    # トレーニング/テスト分割
    n_trajectories = len(combined_data)
    n_train = int(n_trajectories * train_ratio)
    
    indices = np.random.permutation(n_trajectories)
    train_indices = indices[:n_train]
    test_indices = indices[n_train:]
    
    train_data = combined_data[train_indices]
    test_data = combined_data[test_indices]
    
    # 正規化のための統計値を計算
    data_mean = jnp.mean(train_data, axis=(0, 1))
    data_std = jnp.std(train_data, axis=(0, 1))
    
    def normalize_data(data):
        return (data - data_mean) / (data_std + 1e-8)
    
    def denormalize_data(data):
        return data * (data_std + 1e-8) + data_mean
    
    train_data_norm = normalize_data(train_data)
    test_data_norm = normalize_data(test_data)
    
    return (train_data_norm, test_data_norm, data_mean, data_std, 
            normalize_data, denormalize_data)

# データ準備実行
if len(positions) > 0:
    data_package = prepare_double_pendulum_data(positions, momentums)
    
    if data_package is not None:
        train_data_norm, test_data_norm, data_mean, data_std, normalize_fn, denormalize_fn = data_package
        
        print(f"\n=== データ準備結果 ===")
        print(f"トレーニングデータ形状: {train_data_norm.shape}")
        print(f"テストデータ形状: {test_data_norm.shape}")
        print(f"特徴量次元: [θ1, θ2, p1, p2] = {train_data_norm.shape[-1]}")
        print(f"\n正規化統計:")
        print(f"平均値: {data_mean}")
        print(f"標準偏差: {data_std}")
    else:
        print("データ準備に失敗しました")
else:
    print("準備できるデータがありません")

## 3. 時系列予測モデルの実装

二重振り子の複雑な動力学を予測するため、以下のアプローチを比較します：

1. **シンプルなRNNモデル**（ベースライン）
2. **物理制約を考慮したモデル**（Action-Angle Network風）

In [None]:
class DoublePendulumPredictor(nn.Module):
    """二重振り子用の時系列予測ニューラルネットワーク"""
    hidden_dim: int = 64
    output_dim: int = 4  # [θ1, θ2, p1, p2]
    
    @nn.compact
    def __call__(self, x):
        # x: (batch, sequence_len, features)
        batch_size, seq_len, feat_dim = x.shape
        
        # 時系列情報を処理
        x = x.reshape(batch_size, seq_len * feat_dim)
        
        # ディープニューラルネットワーク
        x = nn.Dense(self.hidden_dim)(x)
        x = nn.swish(x)
        x = nn.Dense(self.hidden_dim)(x)
        x = nn.swish(x)
        x = nn.Dense(self.hidden_dim)(x)
        x = nn.swish(x)
        x = nn.Dense(self.output_dim)(x)
        
        return x

class PhysicsInformedPredictor(nn.Module):
    """物理制約を考慮した予測モデル（簡易版Action-Angle Network）"""
    hidden_dim: int = 64
    latent_dim: int = 32
    output_dim: int = 4
    
    @nn.compact
    def __call__(self, x):
        batch_size, seq_len, feat_dim = x.shape
        x_flat = x.reshape(batch_size, seq_len * feat_dim)
        
        # エンコーダー（状態 → 潜在表現）
        encoded = nn.Dense(self.hidden_dim)(x_flat)
        encoded = nn.swish(encoded)
        encoded = nn.Dense(self.latent_dim)(encoded)
        encoded = nn.tanh(encoded)  # 潜在空間を制約
        
        # 「アクション」風の保存量を学習
        actions = nn.Dense(self.latent_dim // 2)(encoded)
        actions = nn.softplus(actions)  # 正の値に制約
        
        # 「角度」風の変数
        angles = nn.Dense(self.latent_dim // 2)(encoded)
        angles = nn.tanh(angles)  # [-1, 1]に制約
        
        # 結合
        combined = jnp.concatenate([actions, angles], axis=-1)
        
        # デコーダー（潜在表現 → 未来状態）
        decoded = nn.Dense(self.hidden_dim)(combined)
        decoded = nn.swish(decoded)
        decoded = nn.Dense(self.hidden_dim)(decoded)
        decoded = nn.swish(decoded)
        decoded = nn.Dense(self.output_dim)(decoded)
        
        return decoded

def create_train_state(model, rng, input_shape, learning_rate=1e-3):
    """トレーニング状態を作成"""
    params = model.init(rng, jnp.ones(input_shape))['params']
    optimizer = optax.adam(learning_rate=learning_rate)
    return train_state.TrainState.create(
        apply_fn=model.apply,
        params=params,
        tx=optimizer
    )

def prepare_sequence_data(data, input_length=20, prediction_horizon=1):
    """時系列予測用のシーケンスデータを準備"""
    X, y = [], []
    
    for traj in data:
        for i in range(len(traj) - input_length - prediction_horizon + 1):
            X.append(traj[i:i+input_length])
            y.append(traj[i+input_length:i+input_length+prediction_horizon])
    
    return jnp.array(X), jnp.array(y).squeeze()

print("モデル定義完了")
print("利用可能なモデル:")
print("  1. DoublePendulumPredictor: 標準的なディープニューラルネットワーク")
print("  2. PhysicsInformedPredictor: 物理制約を考慮した簡易版Action-Angle Network")

## 4. モデルのトレーニング

In [None]:
@jax.jit
def train_step(state, batch_x, batch_y):
    """トレーニングステップ"""
    def loss_fn(params):
        pred = state.apply_fn({'params': params}, batch_x)
        mse_loss = jnp.mean((pred - batch_y) ** 2)
        return mse_loss
    
    loss, grads = jax.value_and_grad(loss_fn)(state.params)
    state = state.apply_gradients(grads=grads)
    return state, loss

def train_model(model, train_data, test_data, model_name, 
                num_epochs=50, batch_size=16, input_length=20):
    """モデルをトレーニング"""
    
    print(f"\n=== {model_name} のトレーニング開始 ===")
    
    # データ準備
    X_train, y_train = prepare_sequence_data(train_data, input_length=input_length)
    X_test, y_test = prepare_sequence_data(test_data, input_length=input_length)
    
    print(f"トレーニングデータ: {X_train.shape} -> {y_train.shape}")
    print(f"テストデータ: {X_test.shape} -> {y_test.shape}")
    
    if len(X_train) == 0:
        print("トレーニングデータが不十分です")
        return None, None
    
    # モデル初期化
    rng = jax.random.PRNGKey(42)
    state = create_train_state(model, rng, (1, input_length, 4), learning_rate=1e-3)
    
    # トレーニングループ
    train_losses = []
    test_losses = []
    
    num_batches = max(1, len(X_train) // batch_size)
    
    for epoch in range(num_epochs):
        epoch_train_losses = []
        
        # バッチ処理
        for batch_idx in range(num_batches):
            batch_start = batch_idx * batch_size
            batch_end = min(batch_start + batch_size, len(X_train))
            
            batch_x = X_train[batch_start:batch_end]
            batch_y = y_train[batch_start:batch_end]
            
            state, loss = train_step(state, batch_x, batch_y)
            epoch_train_losses.append(loss)
        
        train_loss = jnp.mean(jnp.array(epoch_train_losses))
        train_losses.append(train_loss)
        
        # テスト損失計算
        if len(X_test) > 0:
            test_pred = state.apply_fn({'params': state.params}, X_test)
            test_loss = jnp.mean((test_pred - y_test) ** 2)
            test_losses.append(test_loss)
        else:
            test_loss = float('nan')
            test_losses.append(test_loss)
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss:.6f}, Test Loss = {test_loss:.6f}")
    
    print(f"{model_name} トレーニング完了！")
    print(f"最終 Train Loss: {train_losses[-1]:.6f}")
    print(f"最終 Test Loss: {test_losses[-1]:.6f}")
    
    return state, (train_losses, test_losses)

# モデルトレーニング実行
if 'train_data_norm' in locals() and train_data_norm is not None:
    # 標準モデル
    standard_model = DoublePendulumPredictor(hidden_dim=64)
    standard_state, standard_losses = train_model(
        standard_model, train_data_norm, test_data_norm, "標準ニューラルネットワーク"
    )
    
    # 物理制約モデル
    physics_model = PhysicsInformedPredictor(hidden_dim=64, latent_dim=32)
    physics_state, physics_losses = train_model(
        physics_model, train_data_norm, test_data_norm, "物理制約ニューラルネットワーク"
    )
else:
    print("トレーニングデータが準備されていません")
    standard_state, standard_losses = None, None
    physics_state, physics_losses = None, None

## 5. 予測性能の評価と比較

In [None]:
def multi_step_prediction(state, initial_sequence, n_steps=50):
    """複数ステップの予測（反復予測）"""
    predictions = []
    current_sequence = initial_sequence.copy()
    
    for _ in range(n_steps):
        # 現在のシーケンスから次の状態を予測
        next_state = state.apply_fn({'params': state.params}, 
                                  current_sequence[None, :, :])  # バッチ次元を追加
        next_state = next_state[0]  # バッチ次元を削除
        
        predictions.append(next_state)
        
        # シーケンスを更新（最古のデータを削除し、新しい予測を追加）
        current_sequence = jnp.concatenate([current_sequence[1:], next_state[None, :]], axis=0)
    
    return jnp.array(predictions)

def evaluate_models(standard_state, physics_state, test_data_norm, denormalize_fn):
    """モデルの予測性能を評価・比較"""
    
    if standard_state is None or physics_state is None:
        print("評価用のモデルが準備されていません")
        return
    
    if len(test_data_norm) == 0:
        print("評価用のテストデータがありません")
        return
    
    # テスト軌跡を選択
    test_idx = 0
    input_length = 20
    prediction_horizon = 50
    
    initial_sequence = test_data_norm[test_idx, :input_length]
    true_future = test_data_norm[test_idx, input_length:input_length+prediction_horizon]
    
    print(f"\n=== 予測性能評価 ===")
    print(f"初期シーケンス長: {input_length}")
    print(f"予測ホライズン: {prediction_horizon}")
    
    # 予測実行
    print("標準モデルで予測中...")
    standard_pred = multi_step_prediction(standard_state, initial_sequence, prediction_horizon)
    
    print("物理制約モデルで予測中...")
    physics_pred = multi_step_prediction(physics_state, initial_sequence, prediction_horizon)
    
    # 正規化を元に戻す
    initial_orig = denormalize_fn(initial_sequence[None, :, :])[0]
    true_orig = denormalize_fn(true_future[None, :, :])[0]
    standard_pred_orig = denormalize_fn(standard_pred[None, :, :])[0]
    physics_pred_orig = denormalize_fn(physics_pred[None, :, :])[0]
    
    # 評価指標計算
    standard_mse = jnp.mean((standard_pred_orig - true_orig) ** 2)
    physics_mse = jnp.mean((physics_pred_orig - true_orig) ** 2)
    
    standard_mae = jnp.mean(jnp.abs(standard_pred_orig - true_orig))
    physics_mae = jnp.mean(jnp.abs(physics_pred_orig - true_orig))
    
    print(f"\n標準モデル - MSE: {standard_mse:.6f}, MAE: {standard_mae:.6f}")
    print(f"物理制約モデル - MSE: {physics_mse:.6f}, MAE: {physics_mae:.6f}")
    
    # 可視化
    fig, axes = plt.subplots(3, 2, figsize=(15, 12))
    
    # 時系列全体の長さ
    total_time = jnp.arange(len(initial_orig) + len(true_orig)) * config.time_delta
    split_idx = len(initial_orig)
    
    # θ1の時系列比較
    ax = axes[0, 0]
    ax.plot(total_time[:split_idx], initial_orig[:, 0], 'g-', linewidth=3, label='初期データ')
    ax.plot(total_time[split_idx:], true_orig[:, 0], 'b-', linewidth=2, label='真値')
    ax.plot(total_time[split_idx:], standard_pred_orig[:, 0], 'r--', linewidth=2, label='標準モデル')
    ax.plot(total_time[split_idx:], physics_pred_orig[:, 0], 'm:', linewidth=2, label='物理制約モデル')
    ax.axvline(x=total_time[split_idx-1], color='k', linestyle=':', alpha=0.7)
    ax.grid(True)
    ax.set_title('θ1 (第1振り子角度)')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('θ1 (rad)')
    ax.legend()
    
    # θ2の時系列比較
    ax = axes[0, 1]
    ax.plot(total_time[:split_idx], initial_orig[:, 1], 'g-', linewidth=3, label='初期データ')
    ax.plot(total_time[split_idx:], true_orig[:, 1], 'b-', linewidth=2, label='真値')
    ax.plot(total_time[split_idx:], standard_pred_orig[:, 1], 'r--', linewidth=2, label='標準モデル')
    ax.plot(total_time[split_idx:], physics_pred_orig[:, 1], 'm:', linewidth=2, label='物理制約モデル')
    ax.axvline(x=total_time[split_idx-1], color='k', linestyle=':', alpha=0.7)
    ax.grid(True)
    ax.set_title('θ2 (第2振り子角度)')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('θ2 (rad)')
    ax.legend()
    
    # 位相空間での軌跡比較 (θ1 vs p1)
    ax = axes[1, 0]
    ax.plot(initial_orig[:, 0], initial_orig[:, 2], 'g-', linewidth=3, label='初期データ')
    ax.plot(true_orig[:, 0], true_orig[:, 2], 'b-', linewidth=2, label='真値')
    ax.plot(standard_pred_orig[:, 0], standard_pred_orig[:, 2], 'r--', linewidth=2, label='標準モデル')
    ax.plot(physics_pred_orig[:, 0], physics_pred_orig[:, 2], 'm:', linewidth=2, label='物理制約モデル')
    ax.grid(True)
    ax.set_title('第1振り子の位相空間 (θ1 vs p1)')
    ax.set_xlabel('θ1 (rad)')
    ax.set_ylabel('p1')
    ax.legend()
    
    # 位相空間での軌跡比較 (θ2 vs p2)
    ax = axes[1, 1]
    ax.plot(initial_orig[:, 1], initial_orig[:, 3], 'g-', linewidth=3, label='初期データ')
    ax.plot(true_orig[:, 1], true_orig[:, 3], 'b-', linewidth=2, label='真値')
    ax.plot(standard_pred_orig[:, 1], standard_pred_orig[:, 3], 'r--', linewidth=2, label='標準モデル')
    ax.plot(physics_pred_orig[:, 1], physics_pred_orig[:, 3], 'm:', linewidth=2, label='物理制約モデル')
    ax.grid(True)
    ax.set_title('第2振り子の位相空間 (θ2 vs p2)')
    ax.set_xlabel('θ2 (rad)')
    ax.set_ylabel('p2')
    ax.legend()
    
    # 誤差の時間変化
    ax = axes[2, 0]
    standard_pos_error = jnp.sqrt((standard_pred_orig[:, :2] - true_orig[:, :2])**2).mean(axis=1)
    physics_pos_error = jnp.sqrt((physics_pred_orig[:, :2] - true_orig[:, :2])**2).mean(axis=1)
    
    ax.plot(total_time[split_idx:], standard_pos_error, 'r-', linewidth=2, label='標準モデル')
    ax.plot(total_time[split_idx:], physics_pos_error, 'm-', linewidth=2, label='物理制約モデル')
    ax.grid(True)
    ax.set_title('位置予測誤差の時間変化')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('平均位置誤差 (rad)')
    ax.legend()
    
    # 運動量誤差の時間変化
    ax = axes[2, 1]
    standard_mom_error = jnp.sqrt((standard_pred_orig[:, 2:] - true_orig[:, 2:])**2).mean(axis=1)
    physics_mom_error = jnp.sqrt((physics_pred_orig[:, 2:] - true_orig[:, 2:])**2).mean(axis=1)
    
    ax.plot(total_time[split_idx:], standard_mom_error, 'r-', linewidth=2, label='標準モデル')
    ax.plot(total_time[split_idx:], physics_mom_error, 'm-', linewidth=2, label='物理制約モデル')
    ax.grid(True)
    ax.set_title('運動量予測誤差の時間変化')
    ax.set_xlabel('時間 (s)')
    ax.set_ylabel('平均運動量誤差')
    ax.legend()
    
    plt.tight_layout()
    plt.show()
    
    return standard_pred_orig, physics_pred_orig, true_orig

# 評価実行
if ('standard_state' in locals() and standard_state is not None and 
    'physics_state' in locals() and physics_state is not None):
    
    pred_results = evaluate_models(
        standard_state, physics_state, test_data_norm, denormalize_fn
    )
else:
    print("評価用のモデルが準備されていません")

### トレーニング損失の比較

In [None]:
# トレーニング損失の可視化
if (standard_losses is not None and physics_losses is not None and 
    len(standard_losses) == 2 and len(physics_losses) == 2):
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # トレーニング損失
    ax = axes[0]
    ax.plot(standard_losses[0], 'r-', linewidth=2, label='標準モデル')
    ax.plot(physics_losses[0], 'm-', linewidth=2, label='物理制約モデル')
    ax.grid(True)
    ax.set_title('トレーニング損失')
    ax.set_xlabel('エポック')
    ax.set_ylabel('損失（対数スケール）')
    ax.set_yscale('log')
    ax.legend()
    
    # テスト損失
    ax = axes[1]
    # NaNを除去してプロット
    standard_test = [x for x in standard_losses[1] if not jnp.isnan(x)]
    physics_test = [x for x in physics_losses[1] if not jnp.isnan(x)]
    
    if len(standard_test) > 0 and len(physics_test) > 0:
        ax.plot(standard_test, 'r-', linewidth=2, label='標準モデル')
        ax.plot(physics_test, 'm-', linewidth=2, label='物理制約モデル')
        ax.grid(True)
        ax.set_title('テスト損失')
        ax.set_xlabel('エポック')
        ax.set_ylabel('損失（対数スケール）')
        ax.set_yscale('log')
        ax.legend()
    else:
        ax.text(0.5, 0.5, 'テストデータ不足', 
               horizontalalignment='center', verticalalignment='center',
               transform=ax.transAxes, fontsize=14)
        ax.set_title('テスト損失（データ不足）')
    
    plt.tight_layout()
    plt.show()
else:
    print("損失データが利用できません")

## まとめ

このノートブックでは、二重振り子の複雑な動力学に対するAction-Angle Networks の適用を探索しました：

### 実装した内容

1. **二重振り子の物理シミュレーション**
   - ハミルトニアン力学に基づく正確な数値積分
   - 複数の初期条件での軌跡生成
   - ノイズの影響を考慮

2. **カオス的動力学の解析**
   - 初期条件感度の定量評価
   - ポアンカレ断面による位相空間構造の可視化
   - 周波数解析によるスペクトル特性の調査

3. **機械学習モデルの比較**
   - 標準的なディープニューラルネットワーク
   - 物理制約を考慮した簡易版Action-Angle Network
   - 長期予測性能の定量評価

### 主な発見

- **カオス性**: 二重振り子は高い初期条件感度を示し、長期予測が困難
- **エネルギー保存**: 数値積分でもエネルギーがよく保存される
- **予測精度**: 物理制約を考慮したモデルが長期安定性で優位性を示す可能性

### Action-Angle Networks の利点

実際のAction-Angle Networks では以下の特徴により、さらに優れた性能が期待できます：

1. **保存量の学習**: ハミルトニアン（エネルギー）や角運動量などの物理的保存量を明示的に学習
2. **シンプレクティック構造**: 正準変換の構造を保持し、位相空間の幾何学的性質を維持
3. **長期安定性**: カオス系でも統計的性質や大域的構造を保持した予測

### 今後の発展

より完全な実装には以下が含まれます：
- フローベースのエンコーダー・デコーダー
- 正準変換の学習
- より複雑な正則化項
- 異なるカオス系での汎化性能評価

二重振り子は Action-Angle Networks の能力を試すのに理想的なベンチマーク系であり、複雑な非線形動力学における深層学習の可能性を示しています。