In [4]:
# # LSTMおよび線形予測による歩行者軌跡予測
# - 観測: 20フレーム(2秒間)の歩行者の座標
# - 予測: 観測後30フレーム(3秒間)の歩行者の軌跡
# - モデル: TrajectoryLSTM (絶対座標を学習)，TrajectoryLSTM_Relative (相対座標を学習)，LinearPredictionModel (線形予測)
# - 指標: 平均距離誤差(ADE) / 終点距離誤差(FDE)

#!pip -q install torch torchvision torchaudio matplotlib numpy pandas

# %%
import math, random, os, sys, time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Device: cpu の場合，ランタイム→ランタイムのタイプを変更→T4 GPUに切り替える

ModuleNotFoundError: No module named 'torch'

In [None]:
# ドライブマウント
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd

# merged_human_traj.csvまでのパスを入れれば動きます．マイドライブ直下の場合は以下で動くはず
csv_path = "/content/drive/MyDrive/merged_human_traj.csv"
df = pd.read_csv(csv_path)
df.head()

# マウントできていれば左に並んでいる中のフォルダマークをクリックすると"drive"というフォルダ(ディレクトリ)が見える
# "drive → Mydrive →...→ ~.csv"という感じにファイルを見つけて右クリックでパス取得可能

In [None]:
class TrajectoryDataset(Dataset):
    """
    各 traj_id から連続した obs_len+pred_len のウィンドウ切り出し
    modeはどこを切り出すか，xのみyのみ，もしくはall
    lenは人数を出力

    output：
    obs → (人数, 20, 1 or 2)
    fut → (人数, 30, 1 or 2)
    """
    def __init__(self, df, obs_len=20, pred_len=30, stride=1, normalize=False, mode='all'):

        self.obs_len = obs_len
        self.pred_len = pred_len
        self.seq_len = obs_len + pred_len
        self.normalize = normalize
        self.mode = mode

        self.seqs = []
        for tid, g in df.groupby("traj_id"):
            g = g.sort_values("frame")

            if mode == 'x':
                xy = g[["x"]].to_numpy(dtype=np.float32) # x座標のみ取り出す
            elif mode == 'y':
                xy = g[["y"]].to_numpy(dtype=np.float32) # y座標のみ取り出す
            elif mode == 'all':
                xy = g[["x","y"]].to_numpy(dtype=np.float32)  # x,y座標ともに取り出す
            else:
                raise ValueError(f"Invalid mode: {mode}")

            if len(xy) < self.seq_len:
                continue

            # ウィンドウ切り出し
            for s in range(0, len(xy)-self.seq_len+1, stride):
                subseq = xy[s:s+self.seq_len]
                self.seqs.append(subseq)

        # shapeを揃える (人数, 50, 次元)
        if len(self.seqs) > 0:
            self.seqs = np.stack(self.seqs, axis=0)
        else:
            D = 1 if mode in ['x','y'] else 2
            self.seqs = np.zeros((0, self.seq_len, D), dtype=np.float32)

        D = self.seqs.shape[2]

        # 正規化（全データ一括）
        if self.normalize and len(self.seqs) > 0:
            # self.normalizeがTrueの時，正規化に必要な値を取得
            self.mean = self.seqs[:,:self.obs_len,:].reshape(-1,D).mean(axis=0, keepdims=True)
            self.std  = self.seqs[:,:self.obs_len,:].reshape(-1,D).std(axis=0, keepdims=True) + 1e-6
        else:
            self.mean = np.zeros((1,D), dtype=np.float32)
            self.std  = np.ones((1,D), dtype=np.float32)

    def __len__(self):
        return len(self.seqs)

    def __getitem__(self, idx):
        seq = self.seqs[idx].copy()
        seq = (seq - self.mean) / self.std

        obs = seq[:self.obs_len]          # 座標（正規化後）
        fut = seq[self.obs_len:]          # 座標（正規化後）

        return {
            "obs": torch.from_numpy(obs).float(),         # 入力用
            "fut": torch.from_numpy(fut).float(),         # 予測評価，教師用
        }


#データセットアップ用
def setup_data(ped_data, obs_len=20, pred_len=30, mode='x', max_samples=10000):
    dataset = TrajectoryDataset(ped_data, obs_len=obs_len, pred_len=pred_len, stride=1, normalize=True, mode=mode)

    n = len(dataset)
    if n > max_samples:
        print(f"Dataset over {max_samples}(now {n} samples), randomly subsampling to {max_samples}.")
        # 再現性のためseed固定
        random.seed(0) # ※編集禁止
        indices = random.sample(range(n), max_samples)
        dataset = torch.utils.data.Subset(dataset, indices)
        n = max_samples
    n_train = int(n*0.8)
    n_val = n - n_train
    print(f"Dataset size: total={n}  train={n_train}  val={n_val}")
    train_set, val_set = torch.utils.data.random_split(dataset, [n_train, n_val], generator=torch.Generator().manual_seed(0))
    return train_set, val_set

#評価指標計算用
def ade(pred, gt):
    """入力は (人数, フレーム数, 次元)を想定"""
    # 平均距離誤差（Average Displacement Error）を計算
    return torch.norm(pred - gt, dim=2).mean()


def fde(pred, gt):
    """入力は (人数, フレーム数, 次元)を想定"""
    # 終点距離誤差（Final Displacement Error）を計算
    return torch.norm(pred[:,-1,:] - gt[:,-1,:], dim=1).mean()


In [None]:
# 線形予測モデル
class LinearPredictionModel(nn.Module):
    def __init__(self, obs_len=20, fut_len=30):
        super().__init__()
        self.obs_len = obs_len
        self.fut_len = fut_len

    def forward(self, obs_abs):
        #線形予測を組み立てるのは以下の部分
        B, O, D = obs_abs.shape
        last_movement = obs_abs[:, -1, :] - obs_abs[:, -2, :]  # 最後から最後から２番目を引く
        preds = []
        for t in range(self.fut_len):
            pred = obs_abs[:, -1, :] + last_movement * (t+1)  # 最後のステップに，フレーム数*最後の動きを足していく
            preds.append(pred[:, np.newaxis, :])
        preds = torch.cat(preds, dim=1)  # (B, fut_len, D)
        return preds


# モデル定義
#絶対座標で学習するモデル
class TrajectoryLSTM(nn.Module):
    """
    input: obs_abs フレームの絶対座標 (バッチサイズ, 入力の長さ, 次元)

    output: pred_abs フレーム先の絶対座標 (バッチサイズ, 予測の長さ, 次元)
    """
    def __init__(self, input_size=1, hidden_size=128, num_layers=1, pred_len=30):
        super().__init__()
        self.pred_len = pred_len
        self.input_size = input_size
        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.head = nn.Linear(hidden_size, pred_len * input_size)

    def forward(self, obs_abs):               # (B, obs_len, D)
        _, (h, _) = self.encoder(obs_abs)     # h: (num_layers, B, H)
        h_last = h[-1]                        # (B, H)
        out = self.head(h_last)               # (B, pred_len*D)
        pred_abs = out.view(-1, self.pred_len, self.input_size)  # (B, pred_len, D)
        return pred_abs


#相対座標を生成して学習するモデル
class TrajectoryLSTM_Relative(nn.Module):
    """
    input: obs_abs フレームの絶対座標 (バッチサイズ, 入力の長さ, 次元)

    output: pred_abs フレーム先の絶対座標 (バッチサイズ, 予測の長さ, 次元)
    """
    def __init__(self, input_size=1, hidden_size=128, num_layers=1, pred_len=30):
        super().__init__()
        self.pred_len = pred_len
        self.input_size = input_size
        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.head = nn.Linear(hidden_size, pred_len * input_size)

    def forward(self, obs_abs):               # (B, obs_len, D)
        # 相対座標に変換
        obs_start = obs_abs[:,0:1,:]          # (B, 1, D)
        obs_rel = obs_abs - obs_start      # (B, obs_len, D)

        _, (h, _) = self.encoder(obs_rel)     # h: (num_layers, B, H)
        h_last = h[-1]                        # (B, H)
        out = self.head(h_last)               # (B, pred_len*D)
        pred_rel = out.view(-1, self.pred_len, self.input_size)  # (B, pred_len, D)

        # 絶対座標に戻す
        pred_abs = pred_rel + obs_start       # (B, pred_len, D)
        return pred_abs


In [None]:
# 学習用ループ
def train_model(ped_data, model_type='absolute', data_mode='x', input_size=1, epochs=50, batch_size=256, lr=0.001, hidden_size=128, obs_len=20, pred_len=30):
    train_set, val_set = setup_data(ped_data, obs_len=obs_len, pred_len=pred_len, mode=data_mode)
    if model_type == 'absolute':
        model = TrajectoryLSTM(hidden_size=hidden_size, input_size=input_size, pred_len=pred_len).to(device)
    elif model_type == 'relative':
        model = TrajectoryLSTM_Relative(hidden_size=hidden_size, input_size=input_size, pred_len=pred_len).to(device)
    else:
        raise ValueError(f"Invalid model_type: {model_type}")
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, drop_last=False)

    print(f"Training on {len(train_set)} samples, validating on {len(val_set)} samples.")
    print(f"Input mode: {data_mode}, obs_len={obs_len}, pred_len={pred_len}")
    print(f"Training model with {sum(p.numel() for p in model.parameters() if p.requires_grad)} parameters.")

    best_val = float("inf")
    history = []
    for ep in range(1, epochs+1):
        model.train()
        tr_loss = 0.0
        for batch in train_loader:
            obs = batch["obs"].to(device)
            fut = batch["fut"].to(device)

            pred_abs = model(obs)

            loss = ade(pred_abs, fut)  # シンプルにADEを損失に

            opt.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()
            tr_loss += loss.item()

        # 検証
        model.eval()
        with torch.no_grad():
            val_ade, val_fde = 0.0, 0.0
            count = 0
            for batch in val_loader:
                obs = batch["obs"].to(device)
                fut = batch["fut"].to(device)
                pred_abs = model(obs)
                val_ade += ade(pred_abs, fut).item() * obs.size(0)
                val_fde += fde(pred_abs, fut).item() * obs.size(0)
                count += obs.size(0)
            val_ade /= count
            val_fde /= count

        history.append((ep, tr_loss/len(train_loader), val_ade, val_fde))
        print(f"[Epoch {ep:02d}] train_loss={tr_loss/len(train_loader):.4f}  val_ADE={val_ade:.4f}  val_FDE={val_fde:.4f}")

        if val_ade < best_val:
            best_val = val_ade
            torch.save({"model": model.state_dict()}, f"/content/traj_lstm_best_{data_mode}_{model_type}.pth")

    return model, history


#各モードテスト用(allの場合も可能)
#x,yを別々に評価したい場合などにも利用可能
def test_each_mode(model, ped_data, mode='x', obs_len=20, pred_len=30):
    print(f"--- Testing mode: {mode} ---")
    _, test_set = setup_data(ped_data, mode=mode, obs_len=obs_len, pred_len=pred_len, max_samples=10000)
    test_loader = DataLoader(test_set, batch_size=1, shuffle=False, drop_last=False)
    model.eval()
    ade_lstm, fde_lstm = 0.0, 0.0
    count = 0
    with torch.no_grad():
        for batch in test_loader:
            obs = batch["obs"].to(device)
            fut = batch["fut"].to(device)

            pred_abs = model(obs)
            ade_lstm += ade(pred_abs, fut).item() * obs.size(0)
            fde_lstm += fde(pred_abs, fut).item() * obs.size(0)

            count += obs.size(0)
    ade_lstm /= count
    fde_lstm /= count
    print(f"RESULT: ADE={ade_lstm:.4f}  FDE={fde_lstm:.4f}  Mode={mode}")
    return (ade_lstm, fde_lstm)


#複数モデルテスト用
def test_separate_modes(model_x, model_y, ped_data, obs_len=20, pred_len=30):
    print("--- Testing separate modes ---")

    model_x.eval()
    model_y.eval()
    _, test_set = setup_data(ped_data, mode='all', obs_len=obs_len, pred_len=pred_len, max_samples=10000)
    test_loader = DataLoader(test_set, batch_size=1, shuffle=False, drop_last=False)
    ade_lstm, fde_lstm = 0.0, 0.0
    count = 0
    with torch.no_grad():
        for batch in test_loader:
            obs = batch["obs"].to(device)
            fut = batch["fut"].to(device)

            pred_x = model_x(obs[:,:,0:1])
            pred_y = model_y(obs[:,:,1:2])
            pred_abs = torch.cat([pred_x, pred_y], dim=2)

            ade_lstm += ade(pred_abs, fut).item() * obs.size(0)
            fde_lstm += fde(pred_abs, fut).item() * obs.size(0)

            count += obs.size(0)
    ade_lstm /= count
    fde_lstm /= count
    print(f"RESULT: ADE={ade_lstm:.4f}  FDE={fde_lstm:.4f}  Mode=separate")
    return (ade_lstm, fde_lstm)

In [None]:
# 線形モデルを用いて予測する
linear_model = LinearPredictionModel()
result = test_each_mode(linear_model, df, mode='all')

In [None]:
# LSTMモデルでx,y別々に学習して予測する
# 入力座標は絶対値
model_x, hist_x = train_model(df, model_type='absolute', data_mode='x', input_size=1, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
model_y, hist_y = train_model(df, model_type='absolute', data_mode='y', input_size=1, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
result = test_separate_modes(model_x, model_y, df)

In [None]:
# LSTMモデルでx,yの2次元で学習して予測する
# 入力座標は絶対値
model, hist = train_model(df, model_type='absolute', data_mode='all', input_size=2, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
result = test_each_mode(model, df, mode='all')

In [None]:
# LSTMモデルでx,y別々に学習して予測する
# 入力座標は相対値
model_x_relative, hist_x_relative = train_model(df, model_type='relative', data_mode='x', input_size=1, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
model_y_relative, hist_y_relative = train_model(df, model_type='relative', data_mode='y', input_size=1, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
result = test_separate_modes(model_x_relative, model_y_relative, df)

In [None]:
# LSTMモデルでx,yの2次元で学習して予測する
# 入力座標は相対値
model_relative, hist_relative = train_model(df, model_type='relative', data_mode='all', input_size=2, epochs=50, batch_size=256, lr=1e-3, hidden_size=128)
result = test_each_mode(model_relative, df, mode='all')

In [None]:
# ロスカーブの可視化
ep, tr, va, vf = zip(*hist) # histを見たいモデルのhistに切り替え可能
# ep, tr, va, vf = zip(*hist_relative) # 例えば
plt.figure()
plt.plot(ep, tr, label="train_loss(ADE)", color='blue')
plt.plot(ep, va, label="val_ADE", color='r')
plt.plot(ep, vf, label="val_FDE", color='g')
plt.xlabel("Epoch")
plt.ylabel("Error")
plt.legend()
plt.title("Training History")
plt.show()