In [1]:
import torch
from torch.utils.data import TensorDataset, DataLoader, random_split
from model import  Skeleton2Mesh
import torch.nn as nn
from tqdm import tqdm
from new_model import DHAmodel


In [None]:
# 1. Imports and Hyperparameters
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, random_split, DataLoader
from tqdm import tqdm

# Hyperparameters
INPUT_DIM     = 63    # dimension of input feature vector
TARGET_DIM    = 3     # dimension of target (joint angles per head)
BATCH_SIZE    = 256
LEARNING_RATE = 1e-4
NUM_EPOCHS    = 1000
DEVICE        = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2. Load dataset
X_tensor, y_tensor = torch.load('dataset.pt')
dataset = TensorDataset(X_tensor, y_tensor)

# Split into train/validation
train_size = int(0.9 * len(dataset))
val_size   = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

# DataLoaders
train_loader = DataLoader(
    train_ds, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=4, pin_memory=True, persistent_workers=True
)
val_loader = DataLoader(
    val_ds, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=4, pin_memory=True, persistent_workers=True
)

# 3. Model, Loss, Optimizer, Scheduler
model     = DHAmodel().to(DEVICE)
criterion = nn.MSELoss()    # use MSE for regression
optimizer = optim.AdamW(
    model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5
)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer, T_0=NUM_EPOCHS, T_mult=2, eta_min=1e-4
)

# 4. Training & Validation Loop with tqdm and 10‑epoch logging
for epoch in tqdm(range(1, NUM_EPOCHS + 1), desc='Epochs'):
    # --- Training Phase ---
    model.train()
    train_loss = 0.0
    for batch_data, batch_targets in train_loader:
        batch_data    = batch_data.to(DEVICE)
        batch_targets = batch_targets.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(batch_data)
        loss    = criterion(outputs, batch_targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * batch_data.size(0)
    train_loss /= train_size

    # --- Validation Phase ---
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for val_data, val_targets in val_loader:
            val_data    = val_data.to(DEVICE)
            val_targets = val_targets.to(DEVICE)

            val_outputs = model(val_data)
            v_loss = criterion(val_outputs, val_targets)
            val_loss += v_loss.item() * val_data.size(0)
    val_loss /= val_size

    # --- Scheduler Step ---
    scheduler.step()

    # --- Logging every 10 epochs ---
    if epoch % 10 == 0:
        print(
            f"Epoch [{epoch:04d}/{NUM_EPOCHS:04d}]  "
            f"Train Loss: {train_loss:.6f}  "
            f"Val Loss:   {val_loss:.6f}"
        )

    # --- Checkpoint Saving every 100 epochs ---
    if epoch % 100 == 0:
        torch.save(model.state_dict(), f"dha_model_epoch{epoch}.pth")

# 5. Final Save
torch.save(model.state_dict(), "dha_model_final.pth")
print("Training complete. Final model saved to dha_model_final.pth")


In [6]:
import torch
import torch.nn as nn
import torch.optim as optim


num_joints    = 20
num_bones     = 19
gsd_dim       = 100
mlp_hidden    = [1024, 1024]
pe_freqs_bone = 5
pe_freqs_order= 2

batch_size    = 256
learning_rate = 1e-4
num_epochs    = 1000
device        = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X_tensor, y_tensor = torch.load('dataset.pt')  # X: [N,182], y: [N]

dataset = TensorDataset(X_tensor, y_tensor)
train_size = int(0.9 * len(dataset))
val_size   = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])
save_interval = 100

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=4, pin_memory=True, persistent_workers=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True, persistent_workers=True)

# 4) 모델·옵티마이저·손실함수 초기화
model = Skeleton2Mesh(
    num_joints=num_joints,
    num_bones=num_bones,
    gsd_dim=gsd_dim,
    mlp_hidden=mlp_hidden,
    pe_freqs_bone=pe_freqs_bone,
    pe_freqs_order=pe_freqs_order
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)
criterion = nn.MSELoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=num_epochs, T_mult=2, eta_min=1e-4)

# alpha = 5.0
# weights = torch.cat([
#     torch.full((1,), 2),
#     torch.full((3,), alpha),  # first 4 dims have weight alpha
#     torch.ones(4)             # last 4 dims have weight 1.0
# ], dim=0).to(device) 


In [None]:


# 5. Usage example in training loop
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# weights_sum = weights.sum()

for epoch in tqdm(range(1, num_epochs+1)):
   model.train()
   train_loss = 0.0
   for skeletons_flat_batch, angles_batch in train_loader:
       skeletons_flat_batch = skeletons_flat_batch.to(device)
       angles_batch         = angles_batch.to(device)

       preds = model(skeletons_flat_batch)     # [B,8]
       loss  = criterion(preds, angles_batch)

    #    loss_per_sample = (loss_matrix * weights).sum(dim=1) / weights_sum
    #    loss = loss_per_sample.mean()
    
       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       train_loss += loss.item() * skeletons_flat_batch.size(0)

   train_loss /= train_size

   # 6) 검증
   model.eval()
   val_loss = 0.0
   with torch.no_grad():
       for skeletons_flat_batch, angles_batch in val_loader:
           skeletons_flat_batch = skeletons_flat_batch.to(device)
           angles_batch         = angles_batch.to(device)

           preds = model(skeletons_flat_batch)
           lm = criterion(preds, angles_batch)
        #    lps = (lm * weights).sum(dim=1) / weights_sum
        #    loss_v = lps.mean()

           val_loss += lm.item() * skeletons_flat_batch.size(0)
   val_loss /= val_size

   print(f"Epoch {epoch:03d} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}")

# Example variables: epoch (int), model (nn.Module), optimizer (Optimizer), loss_value (float)
# Adjust 'checkpoint_path' as 원하는 파일명 또는 경로로 변경하세요.
   if epoch % save_interval == 0 or epoch == num_epochs:
       checkpoint = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': val_loss
            }

       checkpoint_path = 'checkpoint_euler_epoch{}_ver3.pth'.format(epoch)
       torch.save(checkpoint, checkpoint_path)
       print(f"Checkpoint saved to {checkpoint_path} (epoch {epoch}, loss {val_loss:.6f})")
   scheduler.step()

In [2]:
import torch
import numpy as np

# 1) Load the checkpoint from .pth file
#    If the file contains a dict with 'model_state_dict', use that;
#    otherwise assume it _is_ the state_dict.
checkpoint = torch.load('checkpoint_euler_epoch1000_ver3.pth', map_location='cpu')
state_dict = checkpoint.get('model_state_dict', checkpoint)

# 2) Convert each tensor in the state_dict to a NumPy array
np_params = {}
for name, tensor in state_dict.items():
    # Ensure the tensor is on CPU and convert to numpy
    np_params[name] = tensor.detach().cpu().numpy()

# 3) Save all arrays into a single .npz file
#    Keys in the archive will match the original parameter names.
np.savez('euler_epoch1000_ver3.npz', **np_params)

print(f"Saved {len(np_params)} arrays to euler_epoch1000_ver3.npz")


Saved 32 arrays to euler_epoch1000_ver3.npz


In [9]:

# 2) Load test data
X_test, y_test = torch.load('test.pt',weights_only=False)

# 1) Device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 3) Hyperparameters / model config
num_joints    = 20
num_bones     = 19
gsd_dim       = 100
mlp_hidden    = [1024, 1024]
pe_freqs_bone = 5
pe_freqs_order= 2

# 4) Instantiate model and load checkpoint
model = Skeleton2Mesh(
    num_joints=num_joints,
    num_bones=num_bones,
    gsd_dim=gsd_dim,
    mlp_hidden=mlp_hidden,
    pe_freqs_bone=pe_freqs_bone,
    pe_freqs_order=pe_freqs_order
).to(device)

# Path to saved checkpoint
checkpoint_path = "checkpoint_euler_epoch1000_ver3.pth"  # 실제 파일명으로 변경

# Load checkpoint
checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()  # set to evaluation mode

# If optimizer state or epoch 등 추가 정보가 필요하면
# epoch_loaded = checkpoint.get('epoch')
# loss_loaded = checkpoint.get('loss')

# 5) Prepare test data
#    Assume you have NumPy arrays:
#      skeletons_test_np: shape [N_test, num_joints*3]
#      angles_test_np:    shape [N_test, 8]
#    These must be prepared beforehand.
import numpy as np
# Example placeholders; 실제 데이터를 여기에 할당하세요.
# skeletons_test_np = np.load("skeletons_test.npy")  # [N_test, 63]
# angles_test_np    = np.load("angles_test.npy")     # [N_test, 8]

# Convert to torch.Tensor

# Create DataLoader
batch_size = 64  # 필요에 따라 변경
test_dataset = TensorDataset(X_test, y_test)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

# 6) Define evaluation metrics
mse_loss_fn = nn.MSELoss(reduction='mean')
mae_loss_fn = nn.L1Loss(reduction='mean')

# Optional: per-dimension metrics
num_outputs = y_test.shape[1]  # e.g. 8

# Containers for aggregated results
total_mse = 0.0
total_mae = 0.0
num_samples = 0

# If you want per-output errors, accumulate sums:
sum_sq_errors = torch.zeros(num_outputs, device=device)
sum_abs_errors = torch.zeros(num_outputs, device=device)

# 7) Evaluation loop
with torch.no_grad():
    for skeletons_flat_batch, angles_batch in test_loader:
        skeletons_flat_batch = skeletons_flat_batch.to(device)  # [B, num_joints*3]
        angles_batch         = angles_batch.to(device)         # [B, 8]

        # Forward pass
        preds = model(skeletons_flat_batch)  # [B, 8]

        # Compute batch losses
        batch_mse = mse_loss_fn(preds, angles_batch)  # scalar
        batch_mae = mae_loss_fn(preds, angles_batch)  # scalar

        # Accumulate weighted by batch size
        B = skeletons_flat_batch.size(0)
        total_mse += batch_mse.item() * B
        total_mae += batch_mae.item() * B
        num_samples += B

        # Per-output accumulation
        # Compute squared error and abs error per sample per dimension
        diff = preds - angles_batch  # [B, 8]
        sum_sq_errors += torch.sum(diff * diff, dim=0)    # [8]
        sum_abs_errors += torch.sum(torch.abs(diff), dim=0)  # [8]

# 8) Final metrics
mean_mse = total_mse / num_samples
mean_mae = total_mae / num_samples

# Per-output RMSE and MAE
rmse_per_output = torch.sqrt(sum_sq_errors / num_samples)  # [8]
mae_per_output  = sum_abs_errors / num_samples            # [8]

print(f"Test MSE (mean over all outputs): {mean_mse:.6f}")
print(f"Test MAE (mean over all outputs): {mean_mae:.6f}")
print("Per-output RMSE:", rmse_per_output.cpu().numpy())
print("Per-output MAE: ", mae_per_output.cpu().numpy())

Test MSE (mean over all outputs): 5.143120
Test MAE (mean over all outputs): 1.658323
Per-output RMSE: [2.8750231 2.0619373 1.7064626]
Per-output MAE:  [2.1378648 1.5944549 1.2426484]


In [10]:
for i in range(1,11):
# Path to saved checkpoint
    checkpoint_path = f"checkpoint_euler_epoch{i*100}_ver3.pth"  # 실제 파일명으로 변경
    # Load checkpoint
    checkpoint = torch.load(checkpoint_path, map_location=device)
    print(checkpoint['loss'])


1.206814682346651
0.7473673742849708
0.7024254144227672
0.6089446878829069
0.5842644464923961
0.5384927660721625
0.5220780326945572
0.5117141604423523
0.49837305498610357
0.4976469677222338


In [None]:
import joblib              # joblib is better for sklearn objects
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error

# 1) Load the fitted scaler
scaler = joblib.load('y_scaler.pkl')

# 2) Inverse transform normalized predictions and targets


# 3) Compute MSE on denormalized values
mse_per_output_denorm = mean_squared_error(
    y_true,
    y_pred,
    multioutput='raw_values'
)
avg_mse_denorm = np.mean(mse_per_output_denorm)
print("Denormalized MSE per output:", mse_per_output_denorm)
print(f"Average denormalized MSE: {avg_mse_denorm:.4f}")

# 4) Build comparison DataFrame
num_outputs = y_true.shape[1]
columns = [f"true_{i}" for i in range(num_outputs)] + [f"pred_{i}" for i in range(num_outputs)]
df_compare = pd.DataFrame(
    np.hstack([y_true, y_pred]),
    columns=columns
)

# 5) Inspect a specific row by index
index_to_inspect = 1293  # ← change this to the row you want
if not (0 <= index_to_inspect < len(df_compare)):
    raise IndexError(f"Index out of range: 0 ≤ idx < {len(df_compare)}")

print(f"\nComparison at index {index_to_inspect}:")
print(df_compare.loc[[index_to_inspect]])


In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(8, 6))
plt.hist(mse_per_output, bins=10, edgecolor='black')
plt.title('Histogram of MSE per Output')
plt.xlabel('MSE Value')
plt.ylabel('Number of Outputs')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
import torch
import numpy as np


def extract_and_save_s2m_weights(pth_path: str,
                                 output_npz_path: str,
                                 num_joints: int,
                                 num_bones: int,
                                 gsd_dim: int,
                                 mlp_hidden: list,
                                 pe_freqs_bone: int,
                                 pe_freqs_order: int):
    """
    Load a Skeleton2Mesh checkpoint from pth_path, extract all model weights
    into NumPy arrays, and save into output_npz_path (.npz).

    Args:
      pth_path: path to the .pth checkpoint (must contain 'model_state_dict' key).
      output_npz_path: path (including filename) where to save the .npz file.
      num_joints, num_bones, gsd_dim, mlp_hidden, pe_freqs_bone, pe_freqs_order:
        the hyperparameters used to instantiate the model exactly as in training.
    """
    # 1) Instantiate the model with the same config used during training
    model = Skeleton2Mesh(
        num_joints=num_joints,
        num_bones=num_bones,
        gsd_dim=gsd_dim,
        mlp_hidden=mlp_hidden,
        pe_freqs_bone=pe_freqs_bone,
        pe_freqs_order=pe_freqs_order
    )
    # 2) Load checkpoint (map to CPU to avoid GPU dependency here)
    checkpoint = torch.load(pth_path, map_location='cpu')
    # Expect checkpoint to have 'model_state_dict'; if your checkpoint saved differently,
    # adjust the key accordingly.
    if 'model_state_dict' in checkpoint:
        state_dict = checkpoint['model_state_dict']
    else:
        # If you saved the entire model state_dict directly:
        state_dict = checkpoint
    model.load_state_dict(state_dict)
    model.eval()

    # 3) Extract weights to NumPy arrays
    np_params = {}
    for name, param in model.state_dict().items():
        # detach and move to CPU, then to NumPy float32
        np_params[name] = param.detach().cpu().numpy().astype(np.float32)

    # 4) Save into .npz
    np.savez(output_npz_path, **np_params)
    print(f"Saved {len(np_params)} arrays to {output_npz_path}")

# Example usage:
# Assume you have a checkpoint at 's2m_epoch100.pth'
# and you used these hyperparameters during training:
num_joints    = 20    # as in your code
num_bones     = 19
gsd_dim       = 100
mlp_hidden    = [256, 256]
pe_freqs_bone = 5
pe_freqs_order= 2

checkpoint_path = 'checkpoint_ori_epoch1800.pth'   # change to your actual checkpoint filename
output_npz_path = 'skeleton2angle_ori.npz'

extract_and_save_s2m_weights(
    pth_path=checkpoint_path,
    output_npz_path=output_npz_path,
    num_joints=num_joints,
    num_bones=num_bones,
    gsd_dim=gsd_dim,
    mlp_hidden=mlp_hidden,
    pe_freqs_bone=pe_freqs_bone,
    pe_freqs_order=pe_freqs_order
)


In [None]:
import numpy as np

# Replace with your actual .npz path
npz_path = 'skeleton2angle_ori.npz'

try:
    data = np.load(npz_path)
    print(f"Loaded .npz file: {npz_path}")
    print(f"Found {len(data.files)} arrays:\n")

    for key in data.files:
        arr = data[key]
        shape = arr.shape
        ndim = arr.ndim

        # Heuristic classification:
        if ndim == 2:
            # 2D weight: likely Linear.weight
            # shape = (out_features, in_features)
            print(f"  - {key}: shape={shape}, dtype={arr.dtype} => likely Linear.weight")
        elif ndim == 1:
            # 1D: could be LayerNorm.weight or LayerNorm.bias or Linear.bias.
            # 추가적으로 이름으로 유추:
            if key.endswith('.weight'):
                # 1D weight: if preceding layer is LayerNorm(prev_dim), then this is LayerNorm.weight.
                print(f"  - {key}: shape={shape}, dtype={arr.dtype} => 1D weight: likely LayerNorm.weight")
            elif key.endswith('.bias'):
                # 1D bias: could be LayerNorm.bias or Linear.bias.
                # 이름만으로 구분이 어려우므로, 앞의 인덱스와 shape를 보고 추론 필요.
                # 예: ffn.0.bias shape=(182,)인데, ffn.0.weight도 (182,)이므로 LayerNorm.bias.
                #     ffn.1.bias shape=(512,)인데 weight shape (512,182)이면 Linear.bias.
                # 간단히 두 가지 모두 가능하다고 표시:
                print(f"  - {key}: shape={shape}, dtype={arr.dtype} => 1D bias: LayerNorm.bias or Linear.bias")
            else:
                print(f"  - {key}: shape={shape}, dtype={arr.dtype} => 1D array: unknown role")
        else:
            print(f"  - {key}: shape={shape}, dtype={arr.dtype} => unexpected ndim={ndim}")

    # 추가로, 1D bias가 Linear.bias인지 LayerNorm.bias인지 정확히 구분하려면:
    print("\nAdditional check for 1D bias vs LayerNorm.bias:")
    for key in data.files:
        if key.endswith('.bias') and data[key].ndim == 1:
            idx = key.split('.')[1]  # 'ffn.<idx>.bias'
            weight_key = f"ffn.{idx}.weight"
            if weight_key in data.files:
                w = data[weight_key]
                if w.ndim == 1 and w.shape == data[key].shape:
                    # weight도 1D, bias shape 같음 → LayerNorm.bias
                    role = "LayerNorm.bias (matched 1D weight)"
                elif w.ndim == 2 and w.shape[0] == data[key].shape[0]:
                    # weight is 2D with out_features equal bias length → Linear.bias
                    role = "Linear.bias (matched 2D weight out_features)"
                else:
                    role = "Unknown bias role"
            else:
                role = "No matching weight key; unknown"
            print(f"  - {key}: shape={data[key].shape} => {role}")

except FileNotFoundError:
    print(f"File not found: {npz_path}. Please ensure the file exists or update the path.")
except Exception as e:
    print(f"An error occurred while loading or processing the npz file: {e}")
