In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# MIEL

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import os

#

# ============================================================================
# 1. UCI-HAR Dataset (Load from Inertial Signals)
# ============================================================================

class UCIHARDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.FloatTensor(data)
        self.labels = torch.LongTensor(labels)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

def load_uci_har(root_path='/content/drive/MyDrive/Colab Notebooks/UCI_HAR_Dataset'):
    """Load UCI-HAR raw inertial signals with fixed paths"""

    def load_signals(folder):
        """Load 9-axis IMU signals from Inertial Signals folder"""
        signal_types = [
            'body_acc_x', 'body_acc_y', 'body_acc_z',
            'body_gyro_x', 'body_gyro_y', 'body_gyro_z',
            'total_acc_x', 'total_acc_y', 'total_acc_z'
        ]

        signals = []
        for signal in signal_types:
            filepath = os.path.join(folder, f'{signal}_{os.path.basename(os.path.dirname(folder))}.txt')
            # UCI HAR 파일명은 보통 'body_acc_x_train.txt' 형식이므로 경로 조정이 필요할 수 있음
            # 여기서는 단순하게 파일명만 호출하도록 가정하거나, 실제 데이터셋 구조에 맞춰야 합니다.
            # 일반적인 구조를 가정하여 파일명 생성:
            suffix = 'train' if 'train' in folder else 'test'
            filepath = os.path.join(folder, f'{signal}_{suffix}.txt')

            try:
                data = np.loadtxt(filepath)
                signals.append(data)
            except OSError:
                print(f"Error loading {filepath}. Check path.")
                return None

        # Stack to (N, 128, 9)
        signals = np.stack(signals, axis=-1)
        return signals

    # Load train data
    train_folder = os.path.join(root_path, 'train', 'Inertial Signals')
    X_train = load_signals(train_folder)
    y_train_path = os.path.join(root_path, 'train', 'y_train.txt')
    y_train = np.loadtxt(y_train_path) - 1

    # Load test data
    test_folder = os.path.join(root_path, 'test', 'Inertial Signals')
    X_test = load_signals(test_folder)
    y_test_path = os.path.join(root_path, 'test', 'y_test.txt')
    y_test = np.loadtxt(y_test_path) - 1

    if X_train is None or X_test is None:
        raise FileNotFoundError("Dataset files not found. Please check the 'root_path'.")

    return X_train, y_train.astype(np.int64), X_test, y_test.astype(np.int64)


# ============================================================================
# 2. Potential Energy Network (Neural Hamiltonian)
# ============================================================================

class PotentialEnergyField(nn.Module):
    """
    Learns U(x,t): potential energy at each spatiotemporal point
    """
    def __init__(self, input_dim=9, hidden_dim=128):
        super().__init__()
        self.energy_net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, 1)  # Scalar potential
        )

    def forward(self, x):
        return self.energy_net(x)


class EnergyGradientFlow(nn.Module):
    """
    Computes ∇U(x,t): energy gradient (force field)
    """
    def __init__(self, input_dim=9, hidden_dim=128):
        super().__init__()
        self.gradient_net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, input_dim)  # Gradient vector
        )

    def forward(self, x):
        return self.gradient_net(x)


# ============================================================================
# 3. Landscape Pattern Encoder (FIXED)
# ============================================================================

class LandscapeGeometryEncoder(nn.Module):
    """
    [Upgrade] Residual Connection이 추가된 인코더
    """
    def __init__(self, input_dim=9, hidden_dim=256):
        super().__init__()

        combined_dim = input_dim * 2 + 1

        # 첫 진입 차원 맞추기
        self.input_proj = nn.Conv1d(combined_dim, hidden_dim, kernel_size=1)

        # Residual Blocks (Conv + Norm + Gelu)
        # 커널 사이즈에 맞는 패딩 설정: (K-1)/2
        self.conv1 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=5, padding=2)
        self.conv3 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=7, padding=3)

        self.norm1 = nn.LayerNorm(hidden_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.norm3 = nn.LayerNorm(hidden_dim)

        self.dropout = nn.Dropout(0.2) # 드롭아웃 추가
        self.attention = nn.MultiheadAttention(hidden_dim, num_heads=8, batch_first=True)

    def forward(self, energy, gradient, x_original):
        # 1. Combine
        landscape_state = torch.cat([x_original, gradient, energy], dim=-1) # (B, T, 19)

        # 2. Projection (B, 19, T) -> (B, H, T)
        x = self.input_proj(landscape_state.transpose(1, 2))

        # 3. Residual CNN Blocks
        # Block 1
        resid = x
        h1 = self.conv1(x)
        h1 = self.norm1(h1.transpose(1, 2)).transpose(1, 2)
        h1 = F.gelu(h1)
        x = h1 + resid  # Skip Connection

        # Block 2
        resid = x
        h2 = self.conv2(x)
        h2 = self.norm2(h2.transpose(1, 2)).transpose(1, 2)
        h2 = F.gelu(h2)
        x = h2 + resid  # Skip Connection

        # Block 3
        resid = x
        h3 = self.conv3(x)
        h3 = self.norm3(h3.transpose(1, 2)).transpose(1, 2)
        h3 = F.gelu(h3)
        x = h3 + resid  # Skip Connection

        x = self.dropout(x)

        # 4. Self-attention
        h_attn_in = x.transpose(1, 2) # (B, T, H)
        h_attn_out, _ = self.attention(h_attn_in, h_attn_in, h_attn_in)

        return h_attn_out


# ============================================================================
# 4. MIEL-HAR Model
# ============================================================================

class MIELHAR(nn.Module):
    def __init__(self, input_dim=9, hidden_dim=256, num_classes=6):
        super().__init__()

        self.potential_field = PotentialEnergyField(input_dim, hidden_dim)
        self.gradient_flow = EnergyGradientFlow(input_dim, hidden_dim)

        # Encoder now handles the combined dimension internally
        self.landscape_encoder = LandscapeGeometryEncoder(input_dim, hidden_dim)

        self.global_pool = nn.AdaptiveAvgPool1d(1)

        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, num_classes)
        )

        self.energy_reg_weight = 0.01

    def forward(self, x, return_energy=False):
        # x: (B, T, 9)
        energy = self.potential_field(x)   # (B, T, 1)
        gradient = self.gradient_flow(x)   # (B, T, 9)

        # Feed ALL components to the encoder
        landscape_features = self.landscape_encoder(energy, gradient, x) # (B, T, H)

        # Pooling: (B, T, H) -> (B, H, T) -> (B, H, 1)
        global_features = self.global_pool(landscape_features.transpose(1, 2)).squeeze(-1)

        logits = self.classifier(global_features)

        if return_energy:
            return logits, energy, gradient
        return logits

    def compute_energy_loss(self, energy, gradient, labels):
        # Energy smoothness
        energy_diff = energy[:, 1:] - energy[:, :-1]
        smoothness_loss = torch.mean(energy_diff ** 2)

        # Gradient magnitude regularization
        gradient_mag = torch.norm(gradient, dim=-1)
        gradient_loss = torch.mean(gradient_mag) # Changed from abs(mag) since norm is positive

        return smoothness_loss + 0.1 * gradient_loss


# ============================================================================
# 5. Training & Eval (Unchanged logic, cleaner paths)
# ============================================================================

def train_epoch(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []

    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        optimizer.zero_grad()
        logits, energy, gradient = model(batch_x, return_energy=True)

        cls_loss = F.cross_entropy(logits, batch_y)
        energy_loss = model.compute_energy_loss(energy, gradient, batch_y)
        loss = cls_loss + model.energy_reg_weight * energy_loss

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

    return total_loss / len(train_loader), accuracy_score(all_labels, all_preds)

def evaluate(model, test_loader, device):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            logits = model(batch_x)
            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())

    return accuracy_score(all_labels, all_preds), \
           f1_score(all_labels, all_preds, average='macro'), \
           confusion_matrix(all_labels, all_preds)


# ============================================================================
# 6. Execution Block (Modified for Multiple Epochs)
# ============================================================================

if __name__ == "__main__":
    # Setup
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    DATA_ROOT = '/content/drive/MyDrive/Colab Notebooks/UCI_HAR_Dataset'

    print(f"Using device: {device}")
    print("=" * 60)

    try:
        # 1. Load Data
        X_train, y_train, X_test, y_test = load_uci_har(DATA_ROOT)

        train_dataset = UCIHARDataset(X_train, y_train)
        test_dataset = UCIHARDataset(X_test, y_test)

        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

        # 2. Initialize Model
        model = MIELHAR(input_dim=9, hidden_dim=128, num_classes=6).to(device)

        # [New] 파라미터 수 계산 및 출력
        param_count = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print(f"Model Parameter Count: {param_count:,}") # 천 단위 콤마

        # 3. Optimizer & Scheduler & Loss
        optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

        # [Upgrade] 스케줄러: 에포크가 진행될수록 학습률을 코사인 곡선으로 줄임
        NUM_EPOCHS = 50
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)

        # [Upgrade] Label Smoothing Loss
        criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

        best_acc = 0.0
        print(f"\nStart Training for {NUM_EPOCHS} epochs with Scheduler...")
        print("-" * 60)

        for epoch in range(NUM_EPOCHS):
            # Train Loop
            model.train()
            total_loss = 0
            for batch_x, batch_y in train_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)

                optimizer.zero_grad()
                logits, energy, gradient = model(batch_x, return_energy=True)

                # Label Smoothing이 적용된 CrossEntropy 사용
                cls_loss = criterion(logits, batch_y)
                energy_loss = model.compute_energy_loss(energy, gradient, batch_y)
                loss = cls_loss + model.energy_reg_weight * energy_loss

                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            # 스케줄러 스텝 (매 에포크 끝난 후)
            scheduler.step()

            # Validation
            test_acc, test_f1, _ = evaluate(model, test_loader, device)

            if test_acc > best_acc:
                best_acc = test_acc
                best_msg = "(*Best)"
                # torch.save(model.state_dict(), 'best_model_upgraded.pth')
            else:
                best_msg = ""

            current_lr = optimizer.param_groups[0]['lr']
            print(f"Epoch [{epoch+1}/{NUM_EPOCHS}] "
                  f"LR: {current_lr:.5f} | "
                  f"Loss: {total_loss/len(train_loader):.4f} | "
                  f"Test Acc: {test_acc:.4f} {best_msg}")

        print("=" * 60)
        print(f"Final Best Accuracy: {best_acc:.4f}")

    except Exception as e:
        import traceback
        traceback.print_exc()

Using device: cuda
Model Parameter Count: 370,960

Start Training for 50 epochs with Scheduler...
------------------------------------------------------------
Epoch [1/50] LR: 0.00100 | Loss: 0.7433 | Test Acc: 0.8728 (*Best)
Epoch [2/50] LR: 0.00100 | Loss: 0.5525 | Test Acc: 0.9186 (*Best)
Epoch [3/50] LR: 0.00099 | Loss: 0.5352 | Test Acc: 0.9220 (*Best)
Epoch [4/50] LR: 0.00098 | Loss: 0.5282 | Test Acc: 0.8982 
Epoch [5/50] LR: 0.00098 | Loss: 0.5288 | Test Acc: 0.9094 
Epoch [6/50] LR: 0.00096 | Loss: 0.5210 | Test Acc: 0.9070 
Epoch [7/50] LR: 0.00095 | Loss: 0.5215 | Test Acc: 0.9277 (*Best)
Epoch [8/50] LR: 0.00094 | Loss: 0.5132 | Test Acc: 0.9213 
Epoch [9/50] LR: 0.00092 | Loss: 0.5095 | Test Acc: 0.9362 (*Best)
Epoch [10/50] LR: 0.00090 | Loss: 0.5025 | Test Acc: 0.9372 (*Best)
Epoch [11/50] LR: 0.00089 | Loss: 0.5029 | Test Acc: 0.9253 
Epoch [12/50] LR: 0.00086 | Loss: 0.5022 | Test Acc: 0.9372 
Epoch [13/50] LR: 0.00084 | Loss: 0.5001 | Test Acc: 0.9270 
Epoch [14/50] L