#  Import

In [1]:
print("hello")


hello


In [2]:
import os
import sys

import numpy as  np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset as TorchDataset, DataLoader, Subset
import time

from PIL import Image
import numpy as np

from deepverse import ParameterManager
from deepverse.scenario import ScenarioManager
from deepverse import Dataset

from deepverse.visualizers import ImageVisualizer, LidarVisualizer

# Settings

In [3]:
# Scenes 2000
## Subcarriers 64

scenarios_name = "DT31"
config_path = f"scenarios/{scenarios_name}/param/config.m"
param_manager = ParameterManager(config_path)

params = param_manager.get_params()

param_manager.params["scenes"] =list(range(100))
param_manager.params["comm"]["OFDM"]["selected_subcarriers"] = list(range(64))

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"현재 사용 중인 장치: {device}")


현재 사용 중인 장치: cuda:0


# Generate a dataset

In [4]:
dataset = Dataset(param_manager)

Generating camera dataset: ⏳ In progress
[F[KGenerating camera dataset: ✅ Completed (0.00s)
Generating LiDAR dataset: ⏳ In progress
[F[KGenerating LiDAR dataset: ✅ Completed (0.00s)
Generating mobility dataset: ⏳ In progress
[F[KGenerating mobility dataset: ✅ Completed (0.00s)
Generating comm dataset: ⏳ In progress


                                                                    

[F[KGenerating comm dataset: ✅ Completed (0.94s)
Generating radar dataset: ⏳ In progress


                                                                    

[F[KGenerating radar dataset: ✅ Completed (197.67s)




# location dataset 
 지금 실험에서는 안쓰임

In [5]:
# comm = dataset.comm_dataset
# location =  comm

# location = [
#     {
#         "bs_loc": d["bs_loc"],                      # (3,)
#         "ue_loc": np.asarray(d["ue_loc"]).squeeze() # (3,)  (원래 (1,3)이면 squeeze)
#     }
#     for row in comm.data      # row: [dict] 형태
#     for d in row              # d: dict
# ]

# print(ue_location)  #)

# communication  dataset

In [6]:
# UE 정보
comm = dataset.comm_dataset
ch = comm.data[0][0]['ue'][0]
print(ch.coeffs.shape)  

(1, 16, 64)


# preprocessing

In [7]:
def get_coeffs_from_frame(frame, ue_idx=0):
    ue_obj = frame["ue"]

    # 케이스1) list/tuple이면 ue_idx로 선택
    if isinstance(ue_obj, (list, tuple)):
        ch_obj = ue_obj[ue_idx]
    else:
        # 케이스2) 단일 OFDMChannel이면 그대로 사용
        ch_obj = ue_obj

    # coeffs는 dict key가 아니라 attribute일 확률이 매우 큼
    if hasattr(ch_obj, "coeffs"):
        return ch_obj.coeffs

    # 혹시 dict라면 마지막 보험
    if isinstance(ch_obj, dict) and "coeffs" in ch_obj:
        return ch_obj["coeffs"]

    raise TypeError(f"Cannot get coeffs. ue type={type(ue_obj)}, ch type={type(ch_obj)}")


In [8]:
def get_train_min_max_realimag(frames, train_idx, us_idx=0):

    rmin, rmax =  float('inf'), float('-inf')
    imin, imax =  float('inf'), float('-inf')

    print("Calculating min/max over training set...")

    for t  in train_idx:
        frame  = frames[t]
        cooeffs  = get_coeffs_from_frame(frame, us_idx)  # (N_subcarriers, )

        rmin = min(rmin, float(cooeffs.real.min()))
        rmax = max(rmax, float(cooeffs.real.max()))
        imin = min(imin, float(cooeffs.imag.min()))
        imax = max(imax, float(cooeffs.imag.max()))

    print(f"Done. rmin={rmin}, rmax={rmax}, imin={imin}, imax={imax}")
    return (rmin, rmax), (imin, imax)

In [9]:
def preprocess_channel_coeffs_minmax(coeffs_np, r_min, r_max, i_min, i_max, device=device, eps=1e-16):
    # Convert Numpy to Tensor
    coeffs = torch.from_numpy(coeffs_np).to(torch.complex128)
    
    r = coeffs.real
    i = coeffs.imag
    
    # Min-Max Scaling [0, 1]
    # Add eps to denominator to prevent division by zero
    r_scaled = (r - r_min) / max(r_max - r_min, eps)
    i_scaled = (i - i_min) / max(i_max - i_min, eps)
    
    # Concat (Maintains shape like (..., 2*subcarriers))
    H = torch.cat([r_scaled, i_scaled], dim=-1).to(device)
    return H

In [10]:
# 사용예시
H = preprocess_channel_coeffs_minmax(ch.coeffs, r_min=-0.5, r_max=0.5, i_min=-0.5, i_max=0.5)
print(H.shape)  # (1, 16, 128) 64 subcar

torch.Size([1, 16, 128])


### image dataset

In [11]:
sensor = dataset.camera_dataset.sensors["unit1_cam1"]
path0 = sensor.files[0]
img = Image.open(path0).convert("RGB")
arr = np.array(img)

print("path:", path0)
print("PIL size (W,H):", img.size)
print("np shape:", arr.shape, "dtype:", arr.dtype)  # 보통 (H,W,3), uint8


path: scenarios/DT31/RGB_images/unit1_cam1/7.png
PIL size (W,H): (1920, 1080)
np shape: (1080, 1920, 3) dtype: uint8


In [12]:
IMG_SIZE = 224

def preprocess_img(path, img_size=IMG_SIZE, device=device):
    # 1) load (H,W,3) uint8
    img = Image.open(path).convert("RGB")
    arr = np.array(img)

    # 2) numpy -> torch, (3,H,W), float32
    x = torch.from_numpy(arr).permute(2, 0, 1).contiguous().double()  # (3,H,W), double for ImageNet stats
    x = x / 255.0  # [0,1]

    # 3) add batch dim -> (1,3,H,W)
    x = x.unsqueeze(0)

    # 4) resize -> (1,3,224,224)
    x = F.interpolate(x, size=(img_size, img_size),
                      mode="bilinear", align_corners=False)

    # 5) normalize (ImageNet)
    mean = torch.tensor([0.485, 0.456, 0.406], dtype=x.dtype).view(1, 3, 1, 1)
    std  = torch.tensor([0.229, 0.224, 0.225], dtype=x.dtype).view(1, 3, 1, 1)
    x = (x - mean) / std

    # 6) move to device (GPU)
    x = x.to(device, non_blocking=True)

    return x  # (1,3,224,224) on device


In [13]:
# 사용예시
cd = dataset.camera_dataset
sensor = cd.sensors['unit1_cam1']
path0 = sensor.files[0]
img = preprocess_img(path0, device=device)
print(img.shape, img.device)  # torch.Size([1,3,224,224]) cuda:0
print(path0)

torch.Size([1, 3, 224, 224]) cuda:0
scenarios/DT31/RGB_images/unit1_cam1/7.png


# Dataset 구현

In [14]:
def flatten_comm_frames(comm):
    frames = []
    for row in comm.data:
        for d in row:
            frames.append(d)
    return frames

class MultiModalNextStepDatasetGPU(TorchDataset):
    def __init__(self, comm_frames, cam_files, ue_idx=0, past_len=15, device=device,
                 # Arguments for statistical values (initialized with default values)
                 r_min=0.0, r_max=1.0, i_min=0.0, i_max=1.0):
        
        self.comm_frames = comm_frames
        self.cam_files = list(cam_files)
        self.ue_idx = ue_idx
        self.past_len = past_len
        self.device = device
        
        # Save statistical values
        self.r_min, self.r_max = r_min, r_max
        self.i_min, self.i_max = i_min, i_max

        self.N = min(len(self.comm_frames), len(self.cam_files))
        self.valid_start = past_len - 1
        self.valid_end = self.N - 2 

    def __len__(self):
        return self.valid_end - self.valid_start + 1

    def __getitem__(self, idx):
        t = self.valid_start + idx

        # 1. Image Past (Apply Preprocessing)
        img_list = []
        for k in range(t - self.past_len + 1, t + 1):
            img_path = self.cam_files[k]
            img_k = preprocess_img(img_path, device=self.device).squeeze(0)
            img_list.append(img_k)
        img = torch.stack(img_list, dim=0)  # Shape: (past_len

        
        # 2. Channel Past (Apply Scaling)
        ch_list = []
        for k in range(t - self.past_len + 1, t + 1):
            coeffs_np = get_coeffs_from_frame(self.comm_frames[k], ue_idx=self.ue_idx)
            # Use the newly defined Min-Max preprocessing function
            h = preprocess_channel_coeffs_minmax(
                coeffs_np, 
                self.r_min, self.r_max, self.i_min, self.i_max, 
                device=self.device
            ).reshape(-1)
            ch_list.append(h)
        channel_past = torch.stack(ch_list, dim=0)

        # 3. Target (Apply Scaling) - Target must also be scaled for model training!
        coeffs_np_next = get_coeffs_from_frame(self.comm_frames[t + 1], ue_idx=self.ue_idx)
        target = preprocess_channel_coeffs_minmax(
            coeffs_np_next, 
            self.r_min, self.r_max, self.i_min, self.i_max, 
            device=self.device
        ).reshape(-1)

        return channel_past, img, target

# DataLoader 구현

In [15]:
comm_frames = flatten_comm_frames(dataset.comm_dataset)
sensor = dataset.camera_dataset.sensors["unit1_cam1"]

ds = MultiModalNextStepDatasetGPU(
    comm_frames=comm_frames,
    cam_files=sensor.files,
    ue_idx=0,
    past_len=16,
    device=device
)

loader = DataLoader(
    ds,
    batch_size=8,
    shuffle=True,
    num_workers=0,     
    pin_memory=False   # ✅ 의미 없음 (이미 GPU)
)

ch, img, y = next(iter(loader))
print(ch.shape, img.shape, y.shape)
print(ch.device, img.device, y.device)


torch.Size([8, 16, 2048]) torch.Size([8, 16, 3, 224, 224]) torch.Size([8, 2048])
cuda:0 cuda:0 cuda:0


# GRU/LSTM Early fusion

In [16]:
class ImgFrameEncoderVec(nn.Module):
    """(B*T,3,224,224) -> (B*T,d_img)"""
    def __init__(self, d_img=128):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2, padding=2), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1)),
        )
        self.proj = nn.Linear(128, d_img)

    def forward(self, x):
        z = self.conv(x).flatten(1)  # (B*T,128)
        return self.proj(z)          # (B*T,d_img)

class RNN_EarlyFusion_Forecaster(nn.Module):
    """
    ch : (B,T,F_in=2048)
    img: (B,T,3,224,224)
    yhat: (B,F_out)
    """
    def __init__(self, F_in, F_out, rnn_type="gru", hidden=256, num_layers=2, d_img=128, dropout=0.1):
        super().__init__()
        self.img_enc = ImgFrameEncoderVec(d_img=d_img)

        self.in_proj = nn.Sequential(
            nn.Linear(F_in + d_img, hidden),
            nn.GELU(),
        )

        rnn_type = rnn_type.lower()
        if rnn_type == "gru":
            self.rnn = nn.GRU(hidden, hidden, num_layers=num_layers, batch_first=True,
                              dropout=dropout if num_layers > 1 else 0.0)
        elif rnn_type == "lstm":
            self.rnn = nn.LSTM(hidden, hidden, num_layers=num_layers, batch_first=True,
                               dropout=dropout if num_layers > 1 else 0.0)
        else:
            raise ValueError("rnn_type must be 'gru' or 'lstm'")

        self.head = nn.Linear(hidden, F_out)

    def forward(self, ch, img):
        B, T, _ = ch.shape
        img_ = img.view(B*T, *img.shape[2:])             # (B*T,3,224,224)
        img_feat = self.img_enc(img_).view(B, T, -1)     # (B,T,d_img)

        x = torch.cat([ch, img_feat], dim=-1)            # (B,T,F_in+d_img)
        x = self.in_proj(x)                              # (B,T,hidden)

        out, _ = self.rnn(x)                             # (B,T,hidden)
        z = out[:, -1, :]                                # last
        return self.head(z)                              # (B,F_out)


# Fine-tuning
data shape 맞추기 위해

In [17]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F

# from lwm_multi_model import multi_modal_lwm  # 너가 올린 backbone

# class FinetuneChannelPredictor(nn.Module):
#     """
#     직접 구현한 lwm_multi_model(channel + image)에 맞는 파인튜닝 모델
#     Input:
#       ch:  (B, T, F_in)  e.g., (B,16,2048) 16: past Length, 2048: feature dim
#       img: (B, 3, 224, 224)
#     Output:
#       yhat: (B, F_out)  e.g., (B,2048)
#     """
#     def __init__(
#         self,
#         backbone: nn.Module,
#         F_in: int,
#         F_out: int,
#         pool: str = "last",          # "last" or "mean"
#         freeze_image: bool = False,
#         freeze_backbone: bool = False,
#         element_length: int = 16,    # 채널 벡터 차원 (backbone 기대값)
#         d_model: int = 64            # backbone 내부 feature dim
#     ):
#         super().__init__()
#         self.backbone = backbone
#         self.pool = pool

#         # backbone이 기대하는 channel feature dim = ELEMENT_LENGTH
#         # (backbone 내부 Channel_Embedding: Linear(ELEMENT_LENGTH -> D_MODEL))
#         if element_length is None:
#             element_length = backbone.channel_embedding.element_length
#         if d_model is None:
#             d_model = backbone.channel_embedding.d_model

#         # 입력 차원 정렬: F_in -> ELEMENT_LENGTH
#         self.in_proj = nn.Sequential(
#             nn.Linear(F_in, 512),
#             nn.GELU(),
#             nn.Linear(512, 128),
#             nn.GELU(),
#             nn.Linear(128, element_length)
#         )

#         # 출력 head: D_MODEL -> F_out
#         self.head = nn.Linear(d_model, F_out)

#         if freeze_image:
#             for p in self.backbone.image_embedding.parameters():
#                 p.requires_grad = False

#         if freeze_backbone:
#             for p in self.backbone.parameters():
#                 p.requires_grad = False
#             # 그래도 projection/head는 학습되게 다시 켜기
#             for p in self.in_proj.parameters():
#                 p.requires_grad = True
#             for p in self.head.parameters():
#                 p.requires_grad = True

#     def forward(self, ch, img):
#         # ch: (B,T,F_in) -> (B,T,ELEMENT_LENGTH)
#         ch = self.in_proj(ch)

#         # backbone: (B,T,D_MODEL)/
#         tokens = self.backbone(ch, img)

#         # pooling -> (B,D_MODEL)
#         if self.pool == "last":
#             z = tokens[:, -1, :]
#         elif self.pool == "mean":
#             z = tokens.mean(dim=1)
#         else:
#             raise ValueError(f"Unknown pool={self.pool}")

#         # head -> (B,F_out)
#         yhat = self.head(z)
#         return yhat
    


## NMSE(dB)

In [18]:
@torch.no_grad()
def nmse_db(yhat: torch.Tensor, y: torch.Tensor, eps: float = 1e-16) -> torch.Tensor:
    # yhat, y: (B,F)
    num = torch.sum((yhat - y) ** 2, dim=1)
    den = torch.sum(y ** 2, dim=1).clamp_min(eps)
    nmse = num / den
    return 10.0 * torch.log10(nmse.clamp_min(eps)).mean()


# Train/Val split

In [19]:
n = len(ds)
n_train = int(0.75 * n)
train_idx = list(range(0, n_train))
val_idx = list(range(n_train, n))

train_ts = [ds.valid_start + i for i in train_idx]

(real_min,  real_max), (imag_min, imag_max) = get_train_min_max_realimag(
    comm_frames, train_ts, us_idx=0
)

ds.r_min = real_min
ds.r_max = real_max
ds.i_min = imag_min
ds.i_max = imag_max

print("Dataset statistical values set in the dataset.")

train_ds = Subset(ds, train_idx)
val_ds   = Subset(ds, val_idx)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,  num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=0)
F_in = 2048  # 

# Verify
ch, img, y = next(iter(train_loader))
F_out = y.shape[-1]
print("\n=== Data Check ===")
print(f"y stats | min: {y.min().item():.4f}, max: {y.max().item():.4f}")
print("If scaling worked correctly, values should be within [0, 1].")

Calculating min/max over training set...
Done. rmin=-1.0646139639174899e-06, rmax=1.0917989155343993e-06, imin=-1.0719516685941772e-06, imax=1.0643868111374237e-06
Dataset statistical values set in the dataset.

=== Data Check ===
y stats | min: 0.0318, max: 0.9671
If scaling worked correctly, values should be within [0, 1].


In [20]:
comm_frames = flatten_comm_frames(dataset.comm_dataset)
cam_files = list(dataset.camera_dataset.sensors["unit1_cam1"].files)

print("len(comm_frames):", len(comm_frames))
print("len(cam_files):", len(cam_files))
print("first comm frame keys:", list(comm_frames[0].keys()))
print("first cam file:", cam_files[0])


len(comm_frames): 100
len(cam_files): 1114
first comm frame keys: ['bs_loc', 'ue', 'ue_loc', 'bs']
first cam file: scenarios/DT31/RGB_images/unit1_cam1/7.png


In [21]:
len(val_loader)

1

# Model generate and also check

In [22]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print("device:", device)

# # 배치 하나로 F_in/F_out 자동 확정
# ch, img, y = next(iter(train_loader))
# F_in  = ch.shape[-1]
# F_out = y.shape[-1]
# print("Detected:", "F_in=", F_in, "F_out=", F_out)
# print("Batch devices:", ch.device, img.device, y.device)

# # backbone + finetune model
# backbone = multi_modal_lwm().to(device)

# model = FinetuneChannelPredictor(
#     backbone=backbone,
#     F_in=F_in,
#     F_out=F_out,
#     pool="last",            # "mean"으로 바꿔도 됨
#     freeze_image=False,     # 원하면 True (이미지 인코더 고정)
#     freeze_backbone=False,  # 원하면 True (proj/head만 학습)
#     element_length=16,
#     d_model=64
# ).to(device)

# # sanity forward
# model.eval()
# with torch.no_grad():
#     # ds가 이미 cuda 텐서 반환이면 아래 .to(device) 생략 가능
#     yhat = model(ch.to(device), img.to(device))
# print("yhat:", yhat.shape, "y:", y.shape)



# Train/ Eval 함수 (AMP + grad clip)

In [23]:


def train_one_epoch(model, loader, optimizer, device, grad_clip=1.0):
    model.train()

    total_loss = 0.0
    total_nmse = 0.0
    n = 0

    for ch, img, y in loader:
        # float64(Double) 타입으로 확실하게 캐스팅하여 GPU로 이동
        ch = ch.to(device, dtype=torch.float64, non_blocking=True)
        img = img.to(device, dtype=torch.float64, non_blocking=True)
        y  = y.to(device, dtype=torch.float64, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)

        # amp.autocast 제거됨
        yhat = model(ch, img)
        loss = F.mse_loss(yhat, y)

        # scaler 없이 바로 backward 호출
        loss.backward()

        # scaler.unscale_ 없이 바로 gradient clipping 적용
        if grad_clip is not None and grad_clip > 0:
            nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

        # scaler.step 대신 바로 optimizer.step() 호출
        optimizer.step()

        total_loss += loss.item()
        total_nmse += nmse_db(yhat.detach(), y).item()
        n += 1

    return total_loss / max(n, 1), total_nmse / max(n, 1)


@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()

    total_loss = 0.0
    total_nmse = 0.0
    n = 0

    for ch, img, y in loader:
        # float64(Double) 타입으로 확실하게 캐스팅
        ch = ch.to(device, dtype=torch.float64, non_blocking=True)
        img = img.to(device, dtype=torch.float64, non_blocking=True)
        y  = y.to(device, dtype=torch.float64, non_blocking=True)

        yhat = model(ch, img)
        loss = F.mse_loss(yhat, y)

        total_loss += loss.item()
        total_nmse += nmse_db(yhat, y).item()
        n += 1

    return total_loss / max(n, 1), total_nmse / max(n, 1)

In [24]:
# define fit model GRU/ LSTM
def fit_model(
    model: nn.Module,
    train_loader,
    val_loader,
    device,
    epochs: int,
    ckpt_path: str,
    lr: float = 1e-4,
    weight_decay: float = 1e-4,
    grad_clip: float = 1.0,
):
    trainable_params = [p for p in model.parameters() if p.requires_grad]
    print("trainable params:", sum(p.numel() for p in trainable_params))

    optimizer = torch.optim.AdamW(trainable_params, lr=lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    best_val = float("inf")

    for epoch in range(1, epochs + 1):
        t0 = time.time()

        tr_loss, tr_nmse = train_one_epoch(model, train_loader, optimizer, device=device, grad_clip=grad_clip)
        va_loss, va_nmse = evaluate(model, val_loader, device=device)

        scheduler.step()

        dt = time.time() - t0
        print(
            f"[{epoch:02d}/{epochs}] "
            f"train loss={tr_loss:.6f}, nmse(dB)={tr_nmse:.4f} | "
            f"val loss={va_loss:.6f}, nmse(dB)={va_nmse:.4f} | "
            f"{dt:.1f}s"
        )

        if va_loss < best_val:
            best_val = va_loss
            torch.save(
                {
                    "epoch": epoch,
                    "model_state": model.state_dict(),
                    "optimizer_state": optimizer.state_dict(),
                    "best_val": best_val,
                    "F_in": F_in,
                    "F_out": F_out,
                },
                ckpt_path
            )
            print(f"  ↳ saved {ckpt_path}")

    return best_val
 

#  Optiimizer  / Scheduler 설정

In [25]:
# # requires_grad=True인 파라미터만 학습
# trainable_params = [p for p in model.parameters() if p.requires_grad]
# print("trainable params:", sum(p.numel() for p in trainable_params))

# optimizer = torch.optim.AdamW(trainable_params, lr=1e-4, weight_decay=1e-4)

# # (선택) cosine scheduler
# epochs = 1000
# scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)


# 학습 루프 + checkpoint 저장

In [None]:
print("\n=== Final Check on GRU Model ===")
ch, img, y = next(iter(train_loader))
print("y abs mean:", y.abs().mean().item())
print("y abs max :", y.abs().max().item())
print("y power   :", (y**2).mean().item())

with torch.no_grad():
    yhat = model_gru(ch.to(device), img.to(device))  # 여기만 변경
print("yhat abs mean:", yhat.abs().mean().item())
print("yhat abs max :", yhat.abs().max().item())
print("yhat power   :", (yhat**2).mean().item())


print("\n=== LSTM Evaluation on a Batch ===")

ch, img, y = next(iter(train_loader))
print("y abs mean:", y.abs().mean().item())
print("y abs max :", y.abs().max().item())
print("y power   :", (y**2).mean().item())

with torch.no_grad():
    yhat = model_lstm(ch.to(device), img.to(device))  # 여기만 변경
print("yhat abs mean:", yhat.abs().mean().item())
print("yhat abs max :", yhat.abs().max().item())
print("yhat power   :", (yhat**2).mean().item())

y abs mean: 0.49622291326522827
y abs max : 1.0
y power   : 0.287660151720047


NameError: name 'model' is not defined

# 데이터 입력 및 형태

In [None]:
@torch.no_grad()
def debug_batch_and_forward(loader, device, model=None, name="model", max_print=1):
    """
    - loader에서 배치 하나 뽑아서 shape/device/dtype 확인
    - model이 있으면 forward까지 해서 yhat shape 확인
    - img가 (B,T,3,H,W)인지 (B,3,H,W)인지 자동 판별
    """
    batch = next(iter(loader))

    # (ch, img, y) 형태를 가정
    ch, img, y = batch

    def info(x, label):
        print(f"{label:>4}: shape={tuple(x.shape)} dtype={x.dtype} device={x.device}")

    print("\n=== one batch tensor info ===")
    info(ch, "ch")
    info(img, "img")
    info(y,  "y")

    # img 차원 설명
    if img.dim() == 5:
        print("img format: (B,T,3,H,W)")
    elif img.dim() == 4:
        print("img format: (B,3,H,W)")
    else:
        print(f"img format: unexpected dim={img.dim()}")

    # forward check
    if model is not None:
        model.eval()
        yhat = model(ch.to(device), img.to(device))
        info(yhat, f"{name}_yhat")
        return (ch, img, y, yhat)

    return (ch, img, y)


In [None]:
print("=== dataset sizes ===")
print("N(comm_frames):", len(comm_frames))
print("N(cam_files)  :", len(cam_files))
print("N(min)        :", min(len(comm_frames), len(cam_files)))
print("past_len      :", ds.past_len)
print("len(ds)       :", len(ds))
print("len(train_ds) :", len(train_ds))
print("len(val_ds)   :", len(val_ds))
print("len(train_loader):", len(train_loader))
print("len(val_loader)  :", len(val_loader))

# GRU
debug_batch_and_forward(train_loader, device, model_gru, name="GRU")

# LSTM
debug_batch_and_forward(train_loader, device, model_lstm, name="LSTM")


=== dataset sizes ===
N(comm_frames): 100
N(cam_files)  : 7012
N(min)        : 100
past_len      : 16
len(ds)       : 84
len(train_ds) : 63
len(val_ds)   : 21
len(train_loader): 2
len(val_loader)  : 1

=== one batch shapes ===
ch : (32, 16, 2048)  -> (B,T,F_in)
img: (32, 16, 3, 224, 224)  -> (B,3,224,224)
y  : (32, 2048)  -> (B,F_out)
yhat: (32, 2048)  -> (B,F_out)
this forward predicted vectors: 32 (=B)
each vector predicts elements: 2048 (=F_out)
