In [None]:
# -*- coding: utf-8 -*-
# CNN(左右) → Transformer(時系列) → 回帰（Aパターン実装）
# 学習/検証/保存/推論まで一気通貫

import os
import math
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import StandardScaler


# ===============================
# 1) データ読み込み・前処理
# ===============================
data_pairs = [
    # 釘宮くん
    ('./data/20250517old_data/20241212test4/Opti-track/Take 2024-12-12 03.06.59 PM.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_152700_left.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_152700_right.csv'),
    # 百田くん
    ('./data/20250517old_data/20241212test4/Opti-track/Take 2024-12-12 03.45.00 PM.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_160501_left.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_160501_right.csv'),
    # 渡辺(me)
    ('./data/20250517old_data/20241212test4/Opti-track/Take 2024-12-12 04.28.00 PM.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_164800_left.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_164800_right.csv'),
    # にるぱむさん
    ('./data/20250517old_data/20241212test4/Opti-track/Take 2024-12-12 05.17.59 PM.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_173800_left.csv',
     './data/20250517old_data/20241212test4/InsoleSensor/20241212_173800_right.csv')
]

test_data = ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.50.00 PM.csv', 
              './data/20250517old_data/20241115test3/insoleSensor/20241115_155500_left.csv', 
              './data/20250517old_data/20241115test3/insoleSensor/20241115_155500_right.csv')

        # # 立ちっぱなし
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.20.00 PM.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_152500_left.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_152500_right.csv')
        # # お辞儀
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.26.00 PM.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_153100_left.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_153100_right.csv')
        # # 体の横の傾け
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.32.00 PM.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_153700_left.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_153700_right.csv')
        # # 立つ座る
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.38.00 PM.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_154300_left.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_154300_right.csv')
        # # スクワット
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.44.00 PM.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_154900_left.csv',
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_154900_right.csv')
        # # 総合(test3)
        # ('./data/20250517old_data/20241115test3/Opti-track/Take 2024-11-15 03.50.00 PM.csv', 
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_155500_left.csv', 
        #  './data/20250517old_data/20241115test3/insoleSensor/20241115_155500_right.csv')

# 学習用の配列を貯める（左・右を分けて保持）
X_left_list, X_right_list, y_list = [], [], []

for opti_path, left_path, right_path in data_pairs:
    left_df = pd.read_csv(left_path).fillna(0.0)
    right_df = pd.read_csv(right_path).fillna(0.0)
    skel_df = pd.read_csv(opti_path).fillna(0.0)

    # 時間長さの揃え（必要ならmin長に合わせる）
    T = min(len(left_df), len(right_df), len(skel_df))
    left_df = left_df.iloc[:T]
    right_df = right_df.iloc[:T]
    skel_df = skel_df.iloc[:T]

    X_left_list.append(left_df.values)
    X_right_list.append(right_df.values)
    y_list.append(skel_df.values)

X_left = np.concatenate(X_left_list, axis=0)     # [N, D_left]
X_right = np.concatenate(X_right_list, axis=0)   # [N, D_right]
y_train = np.concatenate(y_list, axis=0)         # [N, out_dim]

# スケーリング（左右・出力で個別にfit）
x_left_scaler = StandardScaler()
x_right_scaler = StandardScaler()
y_scaler = StandardScaler()

X_left_scaled = x_left_scaler.fit_transform(X_left)
X_right_scaled = x_right_scaler.fit_transform(X_right)
y_train_scaled = y_scaler.fit_transform(y_train)

print("X_left_scaled.shape :", X_left_scaled.shape)
print("X_right_scaled.shape:", X_right_scaled.shape)
print("y_train_scaled.shape:", y_train_scaled.shape)

# テスト読み込み
opti_path_test, left_path_test, right_path_test = test_data
left_df_test = pd.read_csv(left_path_test).fillna(0.0)
right_df_test = pd.read_csv(right_path_test).fillna(0.0)
skel_df_test = pd.read_csv(opti_path_test).fillna(0.0)

Tt = min(len(left_df_test), len(right_df_test), len(skel_df_test))
left_df_test = left_df_test.iloc[:Tt]
right_df_test = right_df_test.iloc[:Tt]
skel_df_test = skel_df_test.iloc[:Tt]

X_left_test = left_df_test.values
X_right_test = right_df_test.values
y_test = skel_df_test.values

X_left_test_scaled = x_left_scaler.transform(X_left_test)
X_right_test_scaled = x_right_scaler.transform(X_right_test)
y_test_scaled = y_scaler.transform(y_test)

# ===============================
# 2) スライディング窓の作成
# ===============================
def create_sequences_dual(XL, XR, Y, window_size):
    """ 左右それぞれの特徴列から窓を切り出して [B, S, D] を作る。
        マスクは (有効=1/欠損=0) を想定。ここでは0埋めではないので全て1。
        将来、欠損を0埋め運用する場合はルールを差し替えて下さい。
    """
    Xl_seq, Xr_seq, y_seq, mask_seq = [], [], [], []
    N = len(Y)
    for i in range(N - window_size + 1):
        Xl_window = XL[i:i+window_size]
        Xr_window = XR[i:i+window_size]
        y_window = Y[i + window_size - 1]  # 1ステップ先を予測（末尾フレームの姿勢）

        # 欠損マスクの雛形（ここでは全1）
        mask_window = np.ones((window_size,), dtype=np.int64)

        Xl_seq.append(Xl_window)
        Xr_seq.append(Xr_window)
        y_seq.append(y_window)
        mask_seq.append(mask_window)

    return (np.array(Xl_seq), np.array(Xr_seq),
            np.array(y_seq), np.array(mask_seq))

window_size = 10
Xl_tr, Xr_tr, y_tr, m_tr = create_sequences_dual(X_left_scaled, X_right_scaled, y_train_scaled, window_size)
Xl_te, Xr_te, y_te, m_te = create_sequences_dual(X_left_test_scaled, X_right_test_scaled, y_test_scaled, window_size)

print("Train seq shapes:",
      Xl_tr.shape, Xr_tr.shape, y_tr.shape, m_tr.shape)
print("Test seq shapes:",
      Xl_te.shape, Xr_te.shape, y_te.shape, m_te.shape)


# ===============================
# 3) Dataset / DataLoader
# ===============================
class PostureDualDataset(Dataset):
    def __init__(self, Xl, Xr, y, mask):
        self.Xl = torch.tensor(Xl, dtype=torch.float32)     # [B, S, Dl]
        self.Xr = torch.tensor(Xr, dtype=torch.float32)     # [B, S, Dr]
        self.y  = torch.tensor(y,  dtype=torch.float32)     # [B, Dy]
        self.m  = torch.tensor(mask, dtype=torch.long)      # [B, S] 1=valid, 0=pad

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.Xl[idx], self.Xr[idx], self.y[idx], self.m[idx]

batch_size = 128
train_ds = PostureDualDataset(Xl_tr, Xr_tr, y_tr, m_tr)
test_ds  = PostureDualDataset(Xl_te, Xr_te, y_te, m_te)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=False)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, drop_last=False)





X_left_scaled.shape : (47963, 41)
X_right_scaled.shape: (47963, 41)
y_train_scaled.shape: (47963, 63)
Train seq shapes: (47954, 10, 41) (47954, 10, 41) (47954, 63) (47954, 10)
Test seq shapes: (2990, 10, 41) (2990, 10, 41) (2990, 63) (2990, 10)


In [None]:

# ===============================
# 4) モデル: CNN(左右) → Transformer
# ===============================
class ConvBlock1D(nn.Module):
    """ 短時間パターン抽出用の軽量1D-CNNブロック """
    def __init__(self, in_dim, hid_dim=128, out_dim=256, k=5, p=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(in_dim, hid_dim, kernel_size=k, padding=k//2),
            nn.ReLU(),
            nn.Conv1d(hid_dim, out_dim, kernel_size=k, padding=k//2),
            nn.ReLU(),
            nn.Dropout(p),
            nn.Conv1d(out_dim, out_dim, kernel_size=k, padding=k//2),
            nn.ReLU()
        )
        self.proj = nn.Identity()

    def forward(self, x):  # x: [B, S, D] -> permute to [B, D, S]
        x = x.transpose(1, 2)                  # [B, D, S]
        h = self.net(x)                        # [B, C, S]
        return h.transpose(1, 2)               # [B, S, C]


class PositionalEncoding(nn.Module):
    """ 標準的なサイン波位置埋め込み（学習不要・軽量） """
    def __init__(self, d_model, max_len=10000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)     # [L, D]
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float()
                             * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)[:,:pe[:,0::2].shape[1]]
        pe[:, 1::2] = torch.cos(position * div_term)[:,:pe[:,1::2].shape[1]]
        self.register_buffer('pe', pe.unsqueeze(0))  # [1, L, D]

    def forward(self, x):  # x: [B, S, D]
        S = x.size(1)
        return x + self.pe[:, :S, :]


class CNNTransformerRegressor(nn.Module):
    def __init__(
        self,
        in_left, in_right,               # 左/右 入力次元
        d_cnn=256,                       # 各CNN出力次元
        d_model=512,                     # Transformer埋め込み次元
        nhead=8, num_layers=6,
        dim_ff=1024, dropout=0.1,
        out_dim=63                       # 出力（J×3）
    ):
        super().__init__()
        # 左右のCNN分岐
        self.cnn_left  = ConvBlock1D(in_left,  hid_dim=128, out_dim=d_cnn, k=5, p=dropout)
        self.cnn_right = ConvBlock1D(in_right, hid_dim=128, out_dim=d_cnn, k=5, p=dropout)

        # 融合射影 → d_model
        self.fuse = nn.Sequential(
            nn.Linear(d_cnn * 2, d_model),
            nn.LayerNorm(d_model),
            nn.Dropout(dropout)
        )

        self.posenc = PositionalEncoding(d_model)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_ff,
            dropout=dropout, activation='gelu', batch_first=True, norm_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)

        # 回帰ヘッド
        self.head = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, out_dim)
        )

    @staticmethod
    def last_valid_token(h, mask):
        """ mask: [B,S] (1=valid, 0=pad)
            各バッチで最後の有効トークンを取り出す（CLS相当）
        """
        # 長さSの累積和で最後の有効位置を得る
        lengths = mask.sum(dim=1)                      # [B]
        idx = torch.clamp(lengths - 1, min=0)          # [B]
        # gatherで取り出す
        B, S, D = h.shape
        arange = torch.arange(B, device=h.device)
        return h[arange, idx, :]                       # [B, D]

    def forward(self, xl, xr, mask):
        # xl/xr: [B,S,Dl/Dr], mask: [B,S] (1=valid, 0=pad)
        hl = self.cnn_left(xl)     # [B,S,C]
        hr = self.cnn_right(xr)    # [B,S,C]
        h = torch.cat([hl, hr], dim=-1)          # [B,S,2C]
        h = self.fuse(h)                          # [B,S,d_model]
        h = self.posenc(h)                        # 位置埋め込み

        # Transformerのpaddingマスクは True=pad なので反転
        src_key_padding_mask = (mask == 0)        # [B,S], bool
        h = self.encoder(h, src_key_padding_mask=src_key_padding_mask)

        cls = self.last_valid_token(h, mask)      # [B,d_model]
        yhat = self.head(cls)                     # [B,out_dim]
        return yhat


# ===============================
# 5) 学習設定
# ===============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

in_left_dim  = Xl_tr.shape[2]
in_right_dim = Xr_tr.shape[2]
out_dim      = y_tr.shape[1]

model = CNNTransformerRegressor(
    in_left=in_left_dim, in_right=in_right_dim,
    d_cnn=256, d_model=512, nhead=8, num_layers=6,
    dim_ff=1024, dropout=0.1, out_dim=out_dim
).to(device)

criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=30)

num_epochs = 30


# ===============================
# 6) 学習ループ
# ===============================
for epoch in range(1, num_epochs + 1):
    model.train()
    total_loss = 0.0
    for xl, xr, y, m in tqdm(train_loader, desc=f"Train {epoch}/{num_epochs}"):
        xl = xl.to(device)
        xr = xr.to(device)
        y  = y.to(device)
        m  = m.to(device)

        optimizer.zero_grad()
        yhat = model(xl, xr, m)
        loss = criterion(yhat, y)

        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()

    scheduler.step()
    avg_train = total_loss / len(train_loader)

    # 検証
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for xl, xr, y, m in test_loader:
            xl = xl.to(device); xr = xr.to(device)
            y  = y.to(device);  m  = m.to(device)
            yhat = model(xl, xr, m)
            loss = criterion(yhat, y)
            val_loss += loss.item()
    avg_val = val_loss / len(test_loader)

    print(f"[Epoch {epoch:02d}] Train {avg_train:.5f} | Val {avg_val:.5f}")

# 保存
torch.save(model.state_dict(), "weight/1dcnn_trans_pose_regressor.pth")

In [22]:
# ===============================
# 7) 推論（テスト全区間）
# ===============================
model.eval()
preds = []

with torch.no_grad():
    # バッチ推論
    for i in range(0, len(Xl_te), batch_size):
        xl = torch.tensor(Xl_te[i:i+batch_size], dtype=torch.float32, device=device)
        xr = torch.tensor(Xr_te[i:i+batch_size], dtype=torch.float32, device=device)
        m  = torch.tensor(m_te[i:i+batch_size],  dtype=torch.long,    device=device)
        yhat = model(xl, xr, m)                            # [B, out_dim]
        preds.append(yhat.cpu().numpy())

pred_scaled = np.concatenate(preds, axis=0)                # [N_seq, out_dim]
pred_original = y_scaler.inverse_transform(pred_scaled)

# 列名（21関節×xyz想定、必要に応じて編集）
J = pred_original.shape[1] // 3
cols = [f'{ax}.{j*2 + 1}' for j in range(J) for ax in ['X','Y','Z']]

os.makedirs("output", exist_ok=True)
pd.DataFrame(pred_original, columns=cols).to_csv(
    "output/1DCNNTrans_predicted_skeleton.csv", index=False
)

print("Saved:", "output/1DCNNTrans_predicted_skeleton.csv")

Saved: output/1DCNNTrans_predicted_skeleton.csv
