# All Model saves here

## import

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset, DataLoader, random_split
import DeepMIMOv3
import numpy as np
from pprint import pprint
import matplotlib.pyplot as plt
import time


plt . rcParams [ 'figure.figsize' ]  =  [ 12 ,  8 ]  # 기본 플롯 크기 설정

## GPU Settings

In [2]:
# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
import torch
print(torch.version.cuda)                   
print(torch.backends.cudnn.version())       
print("CUDA available:", torch.cuda.is_available())  # True

12.6
90501
CUDA available: True


## DeepMIMOv3 dataset

In [4]:
parameters = DeepMIMOv3.default_params()

In [5]:
## Change parameters for the setup
# Scenario O1_60 extracted at the dataset_folder
#LWM dynamic senario
# parameters['dataset_folder'] = r'/content/drive/MyDrive/Colab Notebooks/LWM'
scene = 15 # scene 15
# change my linux route
parameters['dataset_folder'] = '/home/dlghdbs200/LWM'

# scnario = 02_dyn_3p5 <- download file
parameters['scenario'] = 'O2_dyn_3p5'
parameters['dynamic_scenario_scenes'] = np.arange(scene) #scene 0~9

# Up to 10 multipath paths per user-to-base station channel
parameters['num_paths'] = 10

# User rows 1-100
parameters['user_rows'] = np.arange(100)
# User subsampling
parameters['user_subsampling'] = 0.01

# Activate only the first basestation
parameters['active_BS'] = np.array([1])

parameters['activate_OFDM'] = 1

parameters['OFDM']['bandwidth'] = 0.05 # 50 MHz
parameters['OFDM']['subcarriers'] = 512 # OFDM with 512 subcarriers
parameters['OFDM']['selected_subcarriers'] = np.arange(0, 64, 1)
#parameters['OFDM']['subcarriers_limit'] = 64 # Keep only first 64 subcarriers

parameters['ue_antenna']['shape'] = np.array([1, 1]) # Single antenna
parameters['bs_antenna']['shape'] = np.array([1, 32]) # ULA of 32 elements
#parameters['bs_antenna']['rotation'] = np.array([0, 30, 90]) # ULA of 32 elements
#parameters['ue_antenna']['rotation'] = np.array([[0, 30], [30, 60], [60, 90]]) # ULA of 32 elements
#parameters['ue_antenna']['radiation_pattern'] = 'isotropic'
#parameters['bs_antenna']['radiation_pattern'] = 'halfwave-dipole'

In [None]:
## dataset setting (chunked on‑the‑fly generation)
import time, gc
from tqdm import tqdm

# 0~999 scene index , process 50 at that time
scene_indices = np.arange(scene)
chunk_size   = 5
all_data     = []

# Call generate_data for each scene chunk
for i in tqdm(range(0, len(scene_indices), chunk_size)):
    chunk = scene_indices[i : i+chunk_size].tolist()
    parameters['dynamic_scenario_scenes'] = chunk

    start = time.time()
    data_chunk = DeepMIMOv3.generate_data(parameters)
    print(f"Scenes {chunk[0]}–{chunk[-1]} generation time: {time.time() - start:.2f}s")

    # combine all_data or save in the Disk
    all_data.extend(data_chunk)

    # free memory 
    del data_chunk
    gc.collect()

# comvine Dataset
dataset = all_data


print(parameters['user_rows'])

## About Information
User : 737
UE antenna : 1
BS antenna : 32  Shape(a+bj)
subcarrier : 64

In [7]:
# Unmasked Data Model(gru
# separate maksed data and unmasked data

## Data Preprocessing

In [8]:
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import IterableDataset, DataLoader
import numpy as np
import torch

class UnMaskedChannelSeqDataset(IterableDataset):
    """
    IterableDataset for masked channel sequence data.
    - Predicts the next-step channel vector from a sequence of past vectors.
    - Applies power normalization and MinMax scaling to both inputs and targets.
    """
    def __init__(self, scenes, seq_len=5, eps=1e-9):
        super().__init__()
        self.scenes = scenes
        self.seq_len = seq_len
        self.eps = eps

        # Determine dimensions: users (U), antennas (A), subcarriers (S), and vector length
        ch0 = scenes[0][0]['user']['channel']  # Example shape: (U, 1, A, S), complex values
        self.U = ch0.shape[0]                  # Number of users
        self.A = ch0.shape[2]                  # Number of antennas
        self.S = ch0.shape[3]                  # Number of subcarriers
        self.vec_len = 2 * self.A              # Real+imag length after concatenation

        # ----------------------------------------------------------------------
        # Precompute MinMax scaler on entire dataset
        # ----------------------------------------------------------------------
        X_list, y_list = [], []
        T = len(scenes)
        # Slide over time index to collect sequences and targets
        for t in range(self.seq_len, T):
            past = scenes[t - self.seq_len : t]
            target = scenes[t]
            for u in range(self.U):
                for s in range(self.S):
                    # Build numpy sequence of shape (seq_len, vec_len)
                    seq_np = np.stack([
                        np.concatenate([
                            ps[0]['user']['channel'][u, 0, :, s].real,
                            ps[0]['user']['channel'][u, 0, :, s].imag
                        ])
                        for ps in past
                    ], axis=0).astype(np.float32)

                    # Build numpy target of shape (vec_len,)
                    target_np = np.concatenate([
                        target[0]['user']['channel'][u, 0, :, s].real,
                        target[0]['user']['channel'][u, 0, :, s].imag
                    ]).astype(np.float32)

                    # Skip if all zeros (invalid data)
                    if not np.any(seq_np) or not np.any(target_np):
                        continue

                    # Flatten sequence for fitting scaler
                    X_list.append(seq_np.reshape(-1, self.vec_len))
                    y_list.append(target_np)

        # Stack all data for fitting the MinMax scaler
        X_all = np.vstack(X_list)  # Shape: (num_samples*seq_len, vec_len)
        y_all = np.stack(y_list)   # Shape: (num_samples, vec_len)

        # Fit MinMax scalers for inputs and targets
        self.scaler_x = MinMaxScaler().fit(X_all)
        self.scaler_y = MinMaxScaler().fit(y_all)

    def __iter__(self):
        """
        Yield power-normalized and MinMax-scaled sequences, mask positions, and targets.
        Each item: (seq_tensor, masked_pos_tensor, target_tensor)
        Shapes: seq_tensor (seq_len, vec_len), masked_pos_tensor (1,), target_tensor (vec_len,)
        """
        T = len(self.scenes)
        for t in range(self.seq_len, T):
            past = self.scenes[t - self.seq_len : t]
            target = self.scenes[t]
            for u in range(self.U):
                for s in range(self.S):
                    # Compute power-normalized numpy arrays
                    seq_np = np.stack([
                        self._power_norm(ps[0]['user']['channel'][u, 0, :, s])
                        for ps in past
                    ], axis=0)
                    target_np = self._power_norm(target[0]['user']['channel'][u, 0, :, s])

                    # Skip sequences or targets that are all zero
                    if not np.any(seq_np) or not np.any(target_np):
                        continue

                    # Apply MinMax scaling: reshape, transform, and reshape back
                    N, D = seq_np.shape
                    seq_np = self.scaler_x.transform(seq_np.reshape(-1, D)).reshape(N, D)
                    target_np = self.scaler_y.transform(target_np.reshape(1, -1)).reshape(-1,)

                    # Convert to torch tensors and yield with masked position
                    seq = torch.from_numpy(seq_np)
                    target = torch.from_numpy(target_np)
                    yield seq, target

    def _power_norm(self, h: np.ndarray) -> np.ndarray:
        """
        Convert complex-valued vector to concatenated real-imag vector and normalize power to 1.
        """
        v = np.concatenate([h.real, h.imag]).astype(np.float32)
        power = np.mean(v * v) + self.eps
        return v / np.sqrt(power)

    def __len__(self):
        """
        Total number of valid (sequence, target) pairs in the dataset.
        """
        return (len(self.scenes) - self.seq_len) * self.U * self.S


In [9]:
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import IterableDataset, DataLoader
import numpy as np
import torch
import random

class MaskedChannelSeqDataset(IterableDataset):
    """
    IterableDataset for masked channel sequence data.
    - Predicts the next-step channel vector from a sequence of past vectors.
    - Applies power normalization and MinMax scaling to both inputs and targets.
    - MCM is 15% about the all data
    - MCM 
      :80% probability: replace the selected patch entirely with a fixed mask vector m (e.g., a vector of zeros)
      :10% probability: replace it with a random noise vector sampled from a normal distribution (e.g., N(0, σ²))
      :10% probability: leave the original patch unchanged
    """
    def __init__(self, scenes, seq_len=5, eps=1e-9, noise_std = 1.0):
        super().__init__()
        self.scenes = scenes
        self.seq_len = seq_len
        self.eps = eps

        # Determine dimensions: users (U), antennas (A), subcarriers (S), and vector length
        ch0 = scenes[0][0]['user']['channel']  # Example shape: (U, 1, A, S), complex values
        self.U = ch0.shape[0]                  # Number of users
        self.A = ch0.shape[2]                  # Number of antennas
        self.S = ch0.shape[3]                  # Number of subcarriers
        self.vec_len = 2 * self.A              # Real+imag length after concatenation

        # masked parameter
        self.mask_value = torch.zeros(self.vec_len, dtype=torch.float32)  
        self.noise_std = noise_std

        # ----------------------------------------------------------------------
        # Precompute MinMax scaler on entire dataset
        # ----------------------------------------------------------------------
        X_list, y_list = [], []
        T = len(scenes)
        # Slide over time index to collect sequences and targets
        for t in range(self.seq_len, T):
            past = scenes[t - self.seq_len : t]
            target = scenes[t]
            mpos = random.randrange(self.seq_len)
            for u in range(self.U):
                for s in range(self.S):
                    # Build numpy sequence of shape (seq_len, vec_len)
                    seq_np = np.stack([
                        np.concatenate([
                            ps[0]['user']['channel'][u, 0, :, s].real,
                            ps[0]['user']['channel'][u, 0, :, s].imag
                        ])
                        for ps in past
                    ], axis=0).astype(np.float32)

                    # Build numpy target of shape (vec_len,)
                    target_np = np.concatenate([
                        target[0]['user']['channel'][u, 0, :, s].real,
                        target[0]['user']['channel'][u, 0, :, s].imag
                    ]).astype(np.float32)

                    # Skip if all zeros (invalid data)
                    if not np.any(seq_np) or not np.any(target_np):
                        continue

                    # Flatten sequence for fitting scaler
                    X_list.append(seq_np.reshape(-1, self.vec_len))
                    y_list.append(target_np)

        # Stack all data for fitting the MinMax scaler
        X_all = np.vstack(X_list)  # Shape: (num_samples*seq_len, vec_len)
        y_all = np.stack(y_list)   # Shape: (num_samples, vec_len)

        # Fit MinMax scalers for inputs and targets
        self.scaler_x = MinMaxScaler().fit(X_all)
        self.scaler_y = MinMaxScaler().fit(y_all)

    def __iter__(self):
        """
        Yield power-normalized and MinMax-scaled sequences, mask positions, and targets.
        Each item: (seq_tensor, masked_pos_tensor, target_tensor)
        Shapes: seq_tensor (seq_len, vec_len), masked_pos_tensor (1,), target_tensor (vec_len,)
        """
        T = len(self.scenes)
        for t in range(self.seq_len, T):
            past = self.scenes[t - self.seq_len : t]
            target = self.scenes[t]
            for u in range(self.U):
                for s in range(self.S):
                    # Compute power-normalized numpy arrays
                    seq_np = np.stack([
                        self._power_norm(ps[0]['user']['channel'][u, 0, :, s])
                        for ps in past
                    ], axis=0)
                    target_np = self._power_norm(target[0]['user']['channel'][u, 0, :, s])

                    # Skip sequences or targets that are all zero
                    if not np.any(seq_np) or not np.any(target_np):
                        continue

                    # Apply MinMax scaling: reshape, transform, and reshape back
                    N, D = seq_np.shape
                    seq_np = self.scaler_x.transform(seq_np.reshape(-1, D)).reshape(N, D)
                    target_np = self.scaler_y.transform(target_np.reshape(1, -1)).reshape(-1,)

                    # Convert to torch tensors and yield with masked position
                    seq = torch.from_numpy(seq_np)
                    target = torch.from_numpy(target_np)

                    # select mask position
                    mpos = random.randrange(self.seq_len)

                    # 80/10/10 rules
                    if random.random() < 0.15:
                        # select mpos position
                        mpos = random.randrange(self.seq_len)

                        # 80/10/10
                        r = random.random()
                        seq_masked = seq.clone()
                        
                        if r < 0.8:
                            # 80% full masked
                            seq_masked[mpos] = self.mask_value
                        elif r < 0.9:
                            # 10% random noise -> std
                            seq_masked[mpos] = torch.randn(self.vec_len) * self.noise_std
                        
                        yield seq_masked, torch.tensor([mpos], dtype=torch.long), target

    def _power_norm(self, h: np.ndarray) -> np.ndarray:
        """
        Convert complex-valued vector to concatenated real-imag vector and normalize power to 1.
        """
        v = np.concatenate([h.real, h.imag]).astype(np.float32)
        power = np.mean(v * v) + self.eps
        return v / np.sqrt(power)

    def __len__(self):
        """
        Total number of valid (sequence, target) pairs in the dataset.
        """
        return (len(self.scenes) - self.seq_len) * self.U * self.S


## Split Train/Val

In [10]:
# ❷ Train/Validation DataLoader split train : val = 6 : 4
seq_len      = 5
split_ratio  = 0.6
split_idx    = int(len(dataset) * split_ratio)

In [11]:
unmasked_train_ds = UnMaskedChannelSeqDataset(dataset[:split_idx], seq_len=seq_len)
unmasked_val_ds   = UnMaskedChannelSeqDataset(dataset[split_idx:], seq_len=seq_len)

# iterate over train_ds to compute min and max of features/targets

batch_size   = 32
unmasked_train_loader = DataLoader(unmasked_train_ds, batch_size=batch_size, shuffle=False)
unmasked_val_loader   = DataLoader(unmasked_val_ds,   batch_size=batch_size, shuffle=False)
# ─────────────────────────────────────────────


In [12]:
# ❷ Train/Validation DataLoader split train : val = 6 : 4

masked_train_ds = MaskedChannelSeqDataset(dataset[:split_idx], seq_len=seq_len)
masked_val_ds   = MaskedChannelSeqDataset(dataset[split_idx:], seq_len=seq_len)

# iterate over train_ds to compute min and max of features/targets

batch_size   = 32
masked_train_loader = DataLoader(masked_train_ds, batch_size=batch_size, shuffle=False)
masked_val_loader   = DataLoader(masked_val_ds,   batch_size=batch_size, shuffle=False)
# ─────────────────────────────────────────────


## Define Model

LWMWithHead: A wrapper class that uses a pre-trained LWM (Transformer encoder) as the backbone,
             and attaches a new fully-connected (FC) head for downstream tasks
             (regression, classification, etc.).

Changes:
- input_dim: Dimension of the actual input data (e.g., 64)
- patch_length: Patch length expected by the backbone (e.g., 16)
- Replaces the original element_length parameter with these two distinct parameters
- Applies a projection layer (self.input_proj) in forward()


In [15]:
import torch
import torch.nn as nn
from lwm_model import lwm

class LWMWithHead(nn.Module):
    """
    LWMWithHead: A wrapper class that uses a pre-trained LWM (Transformer encoder) as the backbone,
                 and attaches a new fully-connected (FC) head for downstream tasks
                 (regression, classification, etc.).

    Changes:
    - input_dim: Dimension of the actual input data (e.g., 64)
    - patch_length: Patch length expected by the backbone (e.g., 16)
    - Replaces the original element_length parameter with these two distinct parameters
    - Applies a projection layer (self.input_proj) in forward()
    """
    def __init__(
        self,
        input_dim: int,                 # Dimension of the actual input data (e.g., 64)
        patch_length: int,              # Patch length expected by the backbone (e.g., 16)
        d_model: int = 64,              # LWM hidden size
        max_len: int = 129,             # Positional encoding max length
        n_layers: int = 12,             # Number of Transformer encoder layers
        hidden_dim: int = 256,          # FC head hidden dimension
        out_dim: int = 64,              # FC head output dimension
        freeze_backbone: bool = True,   # Whether to freeze the backbone
        checkpoint_path: str | None = "./model_weights.pth",
        device: str = "cuda"
    ):
        super().__init__()

        # apply a projection layer to match backbone's expected patch_length
        self.input_proj = nn.Linear(input_dim, patch_length)

        # initialize backbone
        if checkpoint_path is None:
            # randomly initialized backbone
            self.backbone = lwm(
                element_length=patch_length,
                d_model=d_model,
                max_len=max_len,
                n_layers=n_layers
            ).to(device)
        else:
            # load pre-trained weights
            self.backbone = lwm.from_pretrained(
                ckpt_name=checkpoint_path,
                device=device
            )

        # freeze backbone parameters if required
        if freeze_backbone:
            for p in self.backbone.parameters():
                p.requires_grad = False

        # attach a new fully-connected head for downstream tasks
        self.head = nn.Sequential(
            nn.Linear(d_model, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        )

    def forward(self, input_ids: torch.Tensor, masked_pos: torch.Tensor) -> torch.Tensor:
        """
        Args:
            input_ids: Tensor of shape (B, L, input_dim)
            masked_pos: Tensor of shape (B, num_mask)
        Returns:
            out: Tensor of shape (B, out_dim)
        """
        # project inputs to patch_length dimension
        x = self.input_proj(input_ids)

        # backbone forward: returns (logits_lm, enc_output)
        _, enc_output = self.backbone(x, masked_pos)

        # extract CLS token feature (first token)
        feat = enc_output[:, 0, :]

        # pass through FC head to get final output
        out = self.head(feat)
        return out


In [17]:
import torch
import torch.nn as nn

class GRUWithHead(nn.Module):
    """
    GRUWithHead: A wrapper class that uses a GRU backbone and attaches a fully-connected (FC) head
                 for downstream tasks (regression, classification, etc.).
    """
    def __init__(
        self,
        feat_dim: int = 16,           # Dimension of input features (patch_length / element_length)
        d_model: int = 64,            # GRU hidden size
        n_layers: int = 12,           # Number of GRU layers to stack
        bidirectional: bool = True,   # Whether to use a bidirectional GRU
        dropout: float = 0.1,         # Dropout probability between GRU layers
        hidden_dim: int = 256,        # FC head hidden dimension
        out_dim: int = 64,            # FC head output dimension
        freeze_backbone: bool = False # Whether to freeze GRU backbone weights
    ):
        super().__init__()

        # 1) GRU backbone
        self.backbone = nn.GRU(
            input_size   = feat_dim,
            hidden_size  = d_model,
            num_layers   = n_layers,
            batch_first  = True,
            bidirectional= bidirectional,
            dropout      = dropout if n_layers > 1 else 0.0
        )

        # 2) Optionally freeze backbone parameters
        if freeze_backbone:
            for p in self.backbone.parameters():
                p.requires_grad = False

        # 3) Build FC head for downstream tasks
        gru_out_dim = d_model * (2 if bidirectional else 1)
        self.head = nn.Sequential(
            nn.Linear(gru_out_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Tensor of shape (batch_size, seq_len, feat_dim)
        Returns:
            out: Tensor of shape (batch_size, out_dim)
        """
        # 1) Pass through GRU backbone
        out, _ = self.backbone(x)  # out shape: (B, seq_len, num_directions * d_model)

        # 2) Take the last time-step output as sequence representation
        feat = out[:, -1, :]       # shape: (B, gru_out_dim)

        # 3) Pass through FC head to get final output
        return self.head(feat)     # shape: (B, out_dim)


In [None]:
import torch
import torch.nn as nn

# 1) 정의할 때 하이퍼파라미터를 지정합니다
input_size  = 10
hidden_size = 20
num_layers  = 12
bidirectional = False

# 2) GRU 레이어 생성
gru = nn.GRU(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional,
    batch_first=True
)

# 3) 파라미터 이름(name)과 텐서(shape)를 출력
for name, param in gru.named_parameters():
    print(f"{name:20s} shape={tuple(param.shape)}")
