In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

import numpy as np
import pandas as pd
import time
import os

from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.metrics import (
    classification_report, 
    accuracy_score, 
    f1_score, 
    cohen_kappa_score
)
from sklearn.utils.class_weight import compute_class_weight

In [None]:
# ==========================================
# Configuration
# ==========================================
BATCH_SIZE = 64
NUM_EPOCHS = 100
LEARNING_RATE = 5e-4
PATIENCE = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {DEVICE}")

In [None]:
# 1. Load Data
print("--- Loading Data ---")
df_train = pd.read_csv("dataset/train_interpolated.csv")
df_valid = pd.read_csv("dataset/valid_interpolated.csv")
df_test  = pd.read_csv("dataset/test_interpolated.csv")

# 2. Apply Mapping and Drop Invalid Rows
mapping_crop = {
    27: "Sesame", 2: "Pepper", 8: "Aralia", 1: "Sweet potato",
    17: "Sudangrass", 29: "Soybean", 9: "Perilla", 19: "Greenhouse",
    24: "Yuzu", 23: "Maize", 28: "Kiwi", 22: "Onion",
    16: "Apple", 30: "Grape", 14: "Peach", 10: "Garlic",
    12: "Pear", 13: "Cabbage", 11: "Sapling", 31: "Radish"
}

for df in (df_train, df_valid, df_test):
    df["crop_name"] = df["CR_ID"].map(mapping_crop)
    df.dropna(subset=["crop_name"], inplace=True)

# 3. Define Features
months = [f"2021{m:02d}" for m in range(7, 13)]
bands = ['b02','b03','b04','b05','b06','b07','b08','b8a','b11','b12']
features = [f"{b}_{mon}_{d}" for b in bands for mon in months for d in range(1, 4)]

# 4. Prepare X and y
le = LabelEncoder().fit(df_train["crop_name"])

X_train = df_train[features].values
y_train = le.transform(df_train["crop_name"])

X_valid = df_valid[features].values
y_valid = le.transform(df_valid["crop_name"])

X_test  = df_test[features].values
y_test  = le.transform(df_test["crop_name"])

# 5. Scaling (MinMaxScaler)
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled  = scaler.transform(X_test)

# 6. Tensor Conversion & Reshaping
# Structure: (Batch_Size, 1, Channels/Bands, Sequence_Length)
# Assumes 18 steps (6 months * 3 intervals) and 12 bands
n_steps = 18 
n_features = X_train_scaled.shape[1]
n_channels = n_features // n_steps # Should be 12

print(f"Feature Info: Channels={n_channels}, Steps={n_steps}, Total Features={n_features}")

def to_tensor_4d(x_scaled, n_channels, n_steps):
    # Reshape to (N, 1, C, L) for Conv1d processing
    return torch.tensor(x_scaled, dtype=torch.float32).reshape(-1, 1, n_channels, n_steps)

X_train_t = to_tensor_4d(X_train_scaled, n_channels, n_steps)
X_valid_t = to_tensor_4d(X_valid_scaled, n_channels, n_steps)
X_test_t  = to_tensor_4d(X_test_scaled, n_channels, n_steps)

y_train_t = torch.tensor(y_train, dtype=torch.long)
y_valid_t = torch.tensor(y_valid, dtype=torch.long)
y_test_t  = torch.tensor(y_test, dtype=torch.long)

# 7. Create DataLoaders
train_ds = TensorDataset(X_train_t, y_train_t)
valid_ds = TensorDataset(X_valid_t, y_valid_t)
test_ds  = TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)
val_loader   = DataLoader(valid_ds, batch_size=256, shuffle=False, num_workers=4, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=256, shuffle=False, num_workers=4, pin_memory=True)

print("Data processing complete.")

In [None]:
# CNN+MLP

class CNN_MLP(nn.Module):
    def __init__(self, num_bands=12, seq_len=18, n_classes=20):
        super(CNN_MLP, self).__init__()

        # CNN Block: Temporal Convolution over the sequence length
        # Input shape expected: (N, Bands, Seq_Len)
        self.temporal_cnn = nn.Sequential(
            nn.Conv1d(in_channels=num_bands, out_channels=64, kernel_size=3, padding=1), 
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),

            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU()
        )

        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Classifier Block (MLP)
        self.classifier = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(64, n_classes)
        )

    def forward(self, x):
        # Input x shape: (N, 1, Bands, Seq_Len)
        x = x.squeeze(1)            # Remove the extra dimension -> (N, Bands, Seq_Len)
        x = self.temporal_cnn(x)    # -> (N, 256, Seq_Len)
        x = self.global_pool(x)     # -> (N, 256, 1)
        x = x.squeeze(-1)           # -> (N, 256)
        out = self.classifier(x)    # -> (N, n_classes)
        return out

# Initialize Model
model = CNN_MLP(
    num_bands=n_channels, 
    seq_len=n_steps, 
    n_classes=len(le.classes_)
).to(DEVICE)

print("Model initialized.")

In [None]:
# CNN+MLP TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_model.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CNN+MLP TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CNN+MLP.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))

In [None]:
#CBAM+MLP
class CBAM1D(nn.Module):
    def __init__(self, channels, reduction=16, kernel_size=7):
        super(CBAM1D, self).__init__()
        
        # Channel Attention Module
        self.channel_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.channel_max_pool = nn.AdaptiveMaxPool1d(1)

        self.channel_mlp = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(),
            nn.Linear(channels // reduction, channels)
        )

        # Spatial Attention Module
        # Using kernel_size=7 as default for larger receptive field in 1D
        self.spatial_conv = nn.Conv1d(2, 1, kernel_size=kernel_size, padding=kernel_size // 2)

    def forward(self, x):
        # ----- Channel Attention -----
        # Input: (N, C, L)
        avg_pool = self.channel_avg_pool(x).squeeze(-1)  # (N, C)
        max_pool = self.channel_max_pool(x).squeeze(-1)  # (N, C)
        
        channel_attn = self.channel_mlp(avg_pool) + self.channel_mlp(max_pool)
        channel_attn = torch.sigmoid(channel_attn).unsqueeze(-1)  # (N, C, 1)
        
        x = x * channel_attn # Broadcast multiplication

        # ----- Spatial Attention -----
        # Compress channel dimension using Mean and Max
        avg_out = torch.mean(x, dim=1, keepdim=True)    # (N, 1, L)
        max_out, _ = torch.max(x, dim=1, keepdim=True)  # (N, 1, L)
        
        spatial_attn = torch.cat([avg_out, max_out], dim=1)       # (N, 2, L)
        spatial_attn = torch.sigmoid(self.spatial_conv(spatial_attn)) # (N, 1, L)
        
        x = x * spatial_attn
        return x

# 2. Main Classifier with CBAM
class CBAM_MLP(nn.Module):
    def __init__(self, num_bands=12, seq_len=18, n_classes=20):
        super(CBAM_MLP, self).__init__()

        # Temporal Feature Extraction
        self.temporal_cnn = nn.Sequential(
            nn.Conv1d(in_channels=num_bands, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Conv1d(64, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),

            nn.Conv1d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU()
        )

        # Attention Mechanism
        self.cbam = CBAM1D(channels=256, reduction=16, kernel_size=7)

        # Global Pooling
        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Classification Head (MLP)
        self.classifier = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(64, n_classes)
        )

    def forward(self, x):
        # Input: (N, 1, Bands, Seq_Len)
        x = x.squeeze(1)            # (N, Bands, Seq_Len)
        x = self.temporal_cnn(x)    # (N, 256, Seq_Len)
        x = self.cbam(x)            # (N, 256, Seq_Len) - Refined features
        x = self.global_pool(x)     # (N, 256, 1)
        x = x.squeeze(-1)           # (N, 256)
        out = self.classifier(x)    # (N, n_classes)
        return out

# Initialize Model
model_cbam = CBAM_MLP(
    num_bands=n_channels, 
    seq_len=n_steps, 
    n_classes=len(le.classes_)
).to(DEVICE)

print("CBAM Model initialized.")

In [None]:
# CBAM+MLP TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "CBAM+MLP.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CBAM+MLP TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CBAM+MLP.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))

In [None]:
# CNN+TF
class CNN_TF(nn.Module):
    def __init__(
        self,
        num_bands=10,
        seq_len=18,
        n_classes=21,
        embed_dim=512,
        num_heads=4,
        num_layers=2,
        cnn_dropout=0.1,
        cls_dropout=0.1
    ):
        super().__init__()

        # --- 1) Temporal CNN + 1D-Dropout + Pooling (18 -> 9) ---
        self.temporal_cnn = nn.Sequential(
            nn.Conv1d(in_channels=num_bands, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64), nn.ReLU(), nn.Dropout1d(cnn_dropout),

            nn.Conv1d(64, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout1d(cnn_dropout),

            nn.Conv1d(256, embed_dim, kernel_size=3, padding=1),
            nn.BatchNorm1d(embed_dim), nn.ReLU(), nn.Dropout1d(cnn_dropout),

            nn.MaxPool1d(kernel_size=2)  # seq_len 18 -> 9
        )
        reduced_len = seq_len // 2  # 9

        # --- 2) Learnable CLS token + Positional Embedding ---
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_emb   = nn.Parameter(torch.randn(1, reduced_len + 1, embed_dim))

        # --- 3) Transformer Encoder ---
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=512,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # --- 4) Classifier on CLS token ---
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, 128),
            nn.ReLU(),
            nn.Dropout(cls_dropout),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        # x: (B, 1, num_bands, seq_len)
        x = x.squeeze(1)             # -> (B, num_bands, seq_len)
        x = self.temporal_cnn(x)     # -> (B, embed_dim, reduced_len)
        x = x.permute(0, 2, 1)       # -> (B, reduced_len, embed_dim)

        B, T, D = x.shape
        cls = self.cls_token.expand(B, -1, -1)         # -> (B, 1, D)
        x   = torch.cat([cls, x], dim=1)               # -> (B, T+1, D)
        x   = x + self.pos_emb                         # add positional info

        x = self.transformer(x)                       # -> (B, T+1, D)
        cls_feat = x[:, 0, :]                         # take CLS

        out = self.classifier(cls_feat)               # -> (B, n_classes)
        return out
    
model = CNN_TF(
    num_bands=10,
    seq_len=n_steps,
    n_classes=len(le.classes_),
    embed_dim=512,
    num_heads=4,
    num_layers=2

).to(DEVICE)


In [None]:
# CNN+TF TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "CNN+TF.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CNN+TF TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CNN+TF.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))

In [None]:
# CBAM+TF

class CBAMChannel1D(nn.Module):
    def __init__(self, channels:int, reduction:int=16, use_max:bool=True, init_alpha:float=0.5):
        super().__init__()
        hidden = max(1, channels // reduction)  
        self.use_max = use_max

        # Squeeze: (B, C, 1) → (B, C)
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        if use_max:
            self.max_pool = nn.AdaptiveMaxPool1d(1)

        # Shared MLP
        self.mlp = nn.Sequential(
            nn.Linear(channels, hidden, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(hidden, channels, bias=True)
        )

        # learnable gate
        self.alpha = nn.Parameter(torch.tensor(init_alpha, dtype=torch.float32))

        for m in self.mlp:
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                nn.init.zeros_(m.bias)

    def forward(self, x):  # x: (B, C, T)
        avg = self.avg_pool(x).squeeze(-1)       # (B, C)
        s_avg = self.mlp(avg)                    # (B, C)

        if self.use_max:
            mx = self.max_pool(x).squeeze(-1)    # (B, C)
            s_max = self.mlp(mx)                 # (B, C)
            s = s_avg + s_max
        else:
            s = s_avg

        scale = torch.sigmoid(s).unsqueeze(-1)   # (B, C, 1)

        # Residual gating
        y = x * (1.0 + self.alpha * scale)
        return y

    
class TempCNNTransformerCropClassifier(nn.Module):
    def __init__(
        self,
        num_bands=10,
        seq_len=18,
        n_classes=10,
        embed_dim=256,
        num_heads=4,
        num_layers=2,
        cnn_dropout=0.2,
        cls_dropout=0.2
    ):
        super().__init__()
        
        # ----- CNN -----
        self.temporal_cnn = nn.Sequential(
            nn.Conv1d(in_channels=num_bands, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Conv1d(64, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),

            nn.Conv1d(256, embed_dim, kernel_size=3, padding=1),
            nn.BatchNorm1d(embed_dim),
            nn.ReLU()
        )

        # ----- CBAM + Dropout -----
        self.cbam = CBAMChannel1D(embed_dim, reduction=16, use_max=True, init_alpha=0.5)
        self.cnn_dropout = nn.Dropout(cnn_dropout)

        # ----- Pooling 후 Transformer 입력 길이 줄이기 -----
        self.pool = nn.AdaptiveAvgPool1d(9)  # → 시퀀스 길이 9으로 압축
        reduced_len = 9

        # ----- CLS token + Positional Embedding -----
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))  # (1, 1, 256)
        self.pos_emb = nn.Parameter(torch.randn(1, reduced_len + 1, embed_dim))  # (1, 10, 256)

        # ----- Transformer Encoder -----
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=1024,
            dropout=0.2,
            batch_first=True,
            norm_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # ----- Classifier -----
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, 64),
            nn.ReLU(),
            nn.Dropout(cls_dropout),
            nn.Linear(64, n_classes)
        )

    def forward(self, x):
        # x: (B, 1, num_bands, seq_len)
        x = x.squeeze(1)                        # → (B, num_bands, seq_len)
        x = self.temporal_cnn(x)               # → (B, embed_dim, seq_len)
        x = self.cbam(x)                        # → (B, embed_dim, seq_len)
        x = self.cnn_dropout(x)
        x = self.pool(x)                        # → (B, embed_dim, 9)
        x = x.permute(0, 2, 1)                  # → (B, 9, embed_dim)

        # CLS token 추가
        B = x.size(0)
        cls = self.cls_token.expand(B, -1, -1)  # (B, 1, embed_dim)
        x = torch.cat([cls, x], dim=1)          # (B, 10, embed_dim)
        x = x + self.pos_emb                    # Positional Encoding 추가

        x = self.transformer(x)                 # → (B, 10, embed_dim)
        cls_feat = x[:, 0, :]                   # → (B, embed_dim)
        return self.classifier(cls_feat)        # → (B, n_classes)



    
model = TempCNNTransformerCropClassifier(
    num_bands=10,
    seq_len=n_steps,
    n_classes=n_classes,
    embed_dim=512,
    num_heads=4,
    num_layers=2,
    cnn_dropout=0.2,
    cls_dropout=0.2
).to(device)

# Class weights
from sklearn.utils.class_weight import compute_class_weight
class_weight_array = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=device)

# Criterion, Optimizer, Scheduler
criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, 10), gamma=0.8)



In [None]:
# CBAM+TF TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "CBAM+TF.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CBAM+TF TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CBAM+TF.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))

In [None]:
# CNN
class CNN(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1,  64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d((2, 2)),
            nn.Dropout(0.2),

            # Block 2
            nn.Conv2d(64,128,3,padding=1), nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Conv2d(128,128,3,padding=1), nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d((2, 2)),
            nn.Dropout(0.2),

            # Block 3 
            nn.Conv2d(128,256,3,padding=1), nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256,3,padding=1), nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d((2, 1)),
            nn.Dropout(0.2),
        )
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),  
            nn.Flatten(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, n_classes)
        )

    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)

model = CNN(n_classes=len(le.classes_)).to(DEVICE)




In [None]:
# CNN TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "CNN.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CNN TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CNN.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))

In [None]:
# CBAM

class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_planes, in_planes // ratio),
            nn.ReLU(inplace=True),
            nn.Linear(in_planes // ratio, in_planes)
        )
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        b, c, _, _ = x.size()
        avg_out = self.fc(self.avg_pool(x).view(b, c))
        max_out = self.fc(self.max_pool(x).view(b, c))
        out = avg_out + max_out
        return self.sigmoid(out).view(b, c, 1, 1)


class SpatialAttention(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Conv2d(2, 1, kernel_size=7, padding=3)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        return self.sigmoid(self.conv(x))


class CBAM(nn.Module):
    def __init__(self, channels, ratio=16):
        super().__init__()
        self.ca = ChannelAttention(channels, ratio)
        self.sa = SpatialAttention()

    def forward(self, x):
        x = x * self.ca(x)
        x = x * self.sa(x)
        return x

# ================================================================
# 2) Residual + CBAM 
# ================================================================
class CNN_CBAM(nn.Module):
    def __init__(self, n_classes):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64)
        )
        self.cbam1 = CBAM(64)
        self.pool1 = nn.Sequential(
            nn.MaxPool2d((2, 2)),
            nn.Dropout(0.3)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(64)
        )
        self.cbam2 = CBAM(64)
        self.pool2 = nn.Sequential(
            nn.MaxPool2d((2, 2)),
            nn.Dropout(0.3)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 256, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, 3, padding=1), nn.ReLU(),
            nn.BatchNorm2d(256)
        )
        self.cbam3 = CBAM(256)
        self.pool3 = nn.Sequential(
            nn.MaxPool2d((2, 1)),
            nn.Dropout(0.3)
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(256, n_classes)
        )
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.cbam1(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.cbam2(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.cbam3(x)
        x = self.pool3(x)


        return self.classifier(x)

# ================================================================
# 4) 사용 예시
# ================================================================
model = CNN_CBAM(n_classes=len(le.classes_)).to(DEVICE)

In [None]:
# CBAM TRAIN
#  1. Setup Loss, Optimizer, Scheduler
class_weight_array = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = torch.tensor(class_weight_array, dtype=torch.float32, device=DEVICE)

criterion  = nn.CrossEntropyLoss(weight=class_weights)
optimizer  = torch.optim.Adam(model.parameters(), lr=5e-4, weight_decay=5e-3)
scheduler  = torch.optim.lr_scheduler.StepLR(optimizer, step_size=max(1, NUM_EPOCHS // 10), gamma=0.8)

# 2. Training Loop with Early Stopping
best_val_loss = np.inf
patience_cnt = 0

print("--- Starting Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    # Train Phase
    model.train()
    train_loss = 0.0
    
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * xb.size(0)

    # Validation Phase
    model.eval()
    val_loss = 0.0
    
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            logits = model(xb)
            loss = criterion(logits, yb)
            val_loss += loss.item() * xb.size(0)

    # Calculate average loss
    train_loss /= len(train_loader.dataset)
    val_loss   /= len(val_loader.dataset)
    
    # Scheduler step
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']

    print(f"[{epoch:3d}/{NUM_EPOCHS}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | LR: {current_lr:.6g}")

    # Early Stopping Logic
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "CBAM.pt")
        patience_cnt = 0
    else:
        patience_cnt += 1
        if patience_cnt >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}")
            break

In [None]:
# CBAM TEST
# 1. Load Best Model
print("--- Loading Best Model for Evaluation ---")
model.load_state_dict(torch.load("weights/CBAM.pt"))
model.eval()

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Total Parameters: {n_params}")

# 2. Measure Inference Time & Predict
all_preds, all_labels = [], []
start_time = time.time()

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        logits = model(xb)
        preds = logits.argmax(dim=1).cpu().numpy()
        
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

end_time = time.time()

# 3. Performance Metrics
total_time = end_time - start_time
inference_time_per_sample = (total_time / len(all_labels)) * 1000

print(f"\nInference Time: {inference_time_per_sample:.4f} ms/sample")

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average="macro")
kappa = cohen_kappa_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Macro F1: {macro_f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print("-" * 60)
print(classification_report(all_labels, all_preds, target_names=le.classes_, digits=4))