In [None]:
import os
import numpy as np
import scipy.io
from scipy import signal
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# ==========================================
# 1. 데이터셋 클래스 (전처리 및 마스킹 통합)
# ==========================================
class IntegratedEMGDataset(Dataset):
    def __init__(self, file_paths, target_freq=1000, window_ms=500, target_channels=16):
        """
        Args:
            file_paths (list): .mat 파일 경로 리스트
            target_freq (int): 통일할 샘플링 주파수 (Hz)
            window_ms (int): 윈도우 크기 (ms)
            target_channels (int): 최대 채널 수 (보통 16)
        """
        self.data = []
        self.labels = []
        self.masks = []
        self.window_size = int(target_freq * (window_ms / 1000))
        
        for fp in file_paths:
            self._load_and_process(fp, target_freq, target_channels)

    def _load_and_process(self, file_path, target_freq, target_channels):
        try:
            mat = scipy.io.loadmat(file_path)
        except:
            print(f"File load error: {file_path}")
            return

        # 1-1. 변수명 매핑 (Ninapro, Kaggle 등 데이터셋 대응)
        if 'emg' in mat: raw_emg = mat['emg']
        elif 'data' in mat: raw_emg = mat['data'].T # (Ch, Time) -> (Time, Ch)
        else: return

        if 'stimulus' in mat: raw_label = mat['stimulus']
        elif 'restimulus' in mat: raw_label = mat['restimulus']
        else: raw_label = np.zeros((raw_emg.shape[0], 1))

        # 1-2. 원본 주파수 추정 및 필터링
        # 파일명이나 데이터 특성에 따라 분기 (DB5=200Hz, DB2=2000Hz 등)
        if 'db5' in file_path.lower(): original_freq = 200
        elif 'db1' in file_path.lower(): original_freq = 100
        else: original_freq = 2000 # 기본값
        
        # 필터링 (Nyquist 고려)
        nyq = 0.5 * original_freq
        if nyq > 450: # 고해상도 데이터
            b, a = signal.butter(4, [20/nyq, 450/nyq], btype='band')
            emg_filtered = signal.filtfilt(b, a, raw_emg, axis=0)
        elif nyq > 20: # 저해상도 데이터 (High-pass only)
            b, a = signal.butter(4, 20/nyq, btype='high')
            emg_filtered = signal.filtfilt(b, a, raw_emg, axis=0)
        else:
            emg_filtered = raw_emg

        # 1-3. Resampling
        num_samples = int(len(emg_filtered) * target_freq / original_freq)
        emg_resampled = signal.resample(emg_filtered, num_samples, axis=0)
        label_resampled = signal.resample(raw_label, num_samples, axis=0).round().astype(int)

        # 정규화 (Standardization)
        emg_resampled = (emg_resampled - np.mean(emg_resampled, axis=0)) / (np.std(emg_resampled, axis=0) + 1e-6)

        # 1-4. Sliding Window & Zero-Padding
        curr_channels = emg_resampled.shape[1]
        step = int(self.window_size * 0.5) # 50% overlap

        for i in range(0, len(emg_resampled) - self.window_size, step):
            window_x = emg_resampled[i:i+self.window_size, :]
            window_y = label_resampled[i:i+self.window_size]
            
            # 라벨 결정 (최빈값)
            vals, counts = np.unique(window_y, return_counts=True)
            label = vals[np.argmax(counts)]
            if label == 0: continue # 휴식(Rest) 제외
            
            # Padding & Masking
            # 입력: (Time, Ch) -> 전치 -> (Ch, Time)
            x_tensor = torch.FloatTensor(window_x.T) 
            
            if curr_channels < target_channels:
                pad_len = target_channels - curr_channels
                padding = torch.zeros(pad_len, self.window_size)
                x_padded = torch.cat([x_tensor, padding], dim=0)
                mask = torch.cat([torch.ones(curr_channels), torch.zeros(pad_len)], dim=0)
            else:
                x_padded = x_tensor[:target_channels, :] # 16채널 초과는 자름
                mask = torch.ones(target_channels)

            self.data.append(x_padded)
            self.labels.append(label)
            self.masks.append(mask)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Label을 0부터 시작하게 조정 (필요 시)
        return self.data[idx], torch.tensor(self.labels[idx], dtype=torch.long) - 1, self.masks[idx]

# ==========================================
# 2. 모델 아키텍처 (CWE-AP Model)
# ==========================================
class ChannelAgnosticNetwork(nn.Module):
    def __init__(self, num_classes=17, d_model=128, window_size=500):
        super(ChannelAgnosticNetwork, self).__init__()
        
        # [Stage 1] Shared Encoder (1D-CNN)
        # 모든 채널이 이 레이어를 공유함
        self.shared_encoder = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=15, padding=7),
            nn.BatchNorm1d(32), nn.ReLU(), nn.MaxPool1d(2),
            
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2),
            
            nn.Conv1d(64, d_model, kernel_size=3, padding=1),
            nn.AdaptiveAvgPool1d(1), # 시간축을 1개로 압축
            nn.Flatten()             # (B*C, d_model)
        )
        
        # [Stage 2] Attention Mechanism
        self.attn_fc = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.Tanh(),
            nn.Linear(d_model // 2, 1)
        )
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(d_model, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )

    def forward(self, x, mask):
        """
        x: (Batch, 16, 500)
        mask: (Batch, 16) - 실제 데이터=1, 패딩=0
        """
        B, C, T = x.shape
        
        # 1. Reshape for Shared Encoder
        # (Batch * Channel, 1, Time)
        x_reshaped = x.view(B * C, 1, T)
        
        # 2. Local Feature Extraction
        features = self.shared_encoder(x_reshaped) # (B*C, d_model)
        features = features.view(B, C, -1)         # (B, C, d_model)
        
        # 3. Masked Attention Pooling
        # Attention Score 계산
        attn_scores = self.attn_fc(features).squeeze(-1) # (B, C)
        
        # Masking: 패딩된 채널의 점수를 매우 낮게 설정하여 Softmax에서 0이 되게 함
        if mask is not None:
            mask_bool = (mask == 0) # True where padding exists
            attn_scores = attn_scores.masked_fill(mask_bool, -1e9)
            
        attn_weights = torch.softmax(attn_scores, dim=1).unsqueeze(-1) # (B, C, 1)
        
        # Weighted Sum (Channel Dimension 통합)
        # (B, C, d) * (B, C, 1) -> sum(dim=1) -> (B, d)
        global_feature = torch.sum(features * attn_weights, dim=1)
        
        # 4. Classification
        out = self.classifier(global_feature)
        return out

# ==========================================
# 3. 실행 및 학습 루프 예시
# ==========================================
if __name__ == "__main__":
    # 1. 데이터 준비 (경로 리스트)
    # 실제 존재하는 .mat 파일 경로들을 리스트에 넣으세요.
    file_list = [
        "S1_E2_A1.mat", 
        # "Subject2_session1.mat", 
        # "Kaggle_User1.mat" 
    ]
    
    # 파일이 실제로 있을 때만 실행
    if os.path.exists(file_list[0]):
        dataset = IntegratedEMGDataset(file_list, target_channels=16)
        
        # Train/Test Split
        train_size = int(0.8 * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
        
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
        
        # 2. 모델 초기화
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = ChannelAgnosticNetwork(num_classes=17).to(device) # 클래스 수는 데이터셋에 맞게 조정
        
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        
        # 3. 학습 루프
        print("Training Started...")
        for epoch in range(10): # Epoch 수 조절
            model.train()
            total_loss = 0
            
            for batch_x, batch_y, batch_mask in train_loader:
                batch_x = batch_x.to(device)
                batch_y = batch_y.to(device)
                batch_mask = batch_mask.to(device) # 마스크 전달 필수
                
                optimizer.zero_grad()
                
                # 모델 Forward (마스크 포함)
                outputs = model(batch_x, batch_mask)
                loss = criterion(outputs, batch_y)
                
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")
            
        print("Training Finished.")
    else:
        print("데이터 파일 경로를 확인해주세요.")

In [None]:
import scipy.io
import numpy as np
from scipy import signal

def preprocess_ninapro_robust(file_path, target_freq=1000, window_ms=500, overlap_ratio=0.5):
    # 1. .mat 파일 로드
    try:
        mat = scipy.io.loadmat(file_path)
    except Exception as e:
        print(f"파일 로드 실패: {e}")
        return None, None
    
    # 변수명 확인 및 데이터 추출
    if 'emg' in mat:
        emg = mat['emg']
    elif 'data' in mat: # 일부 데이터셋 대응
        emg = mat['data']
    else:
        print(f"Key Error: .mat 파일 내에 'emg' 키가 없습니다. 포함된 키: {mat.keys()}")
        return None, None
        
    # 라벨 추출 (DB5는보통 'stimulus' 또는 'restimulus' 사용)
    if 'stimulus' in mat:
        label = mat['stimulus']
    elif 'restimulus' in mat:
        label = mat['restimulus']
    else:
        label = np.zeros((emg.shape[0], 1))

    # --- [핵심 수정 사항] ---
    # 파일 경로에 'db5'가 보입니다. Ninapro DB5는 200Hz입니다.
    # 만약 DB2라면 2000, DB1이라면 100, DB5라면 200으로 설정해야 합니다.
    if 'db5' in file_path.lower():
        original_freq = 200
    elif 'db1' in file_path.lower():
        original_freq = 100
    else:
        original_freq = 2000 # 기본값 (DB2 등)

    # 2. 필터 적용 (동적 조정)
    # Nyquist 주파수 계산 (샘플링 레이트의 절반)
    nyq = 0.5 * original_freq
    
    # 목표 필터 주파수
    low_cut = 20   # 움직임 잡음 제거 (High-pass)
    high_cut = 450 # 고주파 노이즈 제거 (Low-pass)
    
    b, a = None, None
    
    # 필터 설계 로직 수정:
    # 데이터의 한계(Nyquist)가 450Hz보다 낮으면 Low-pass 필터를 생략하거나 조정해야 함
    if nyq > high_cut:
        # 2000Hz 데이터 등 충분히 고해상도일 때: Band-pass (20~450Hz)
        b, a = signal.butter(4, [low_cut/nyq, high_cut/nyq], btype='band')
        emg_filtered = signal.filtfilt(b, a, emg, axis=0)
    elif nyq > low_cut:
        # 200Hz 데이터(Nyq=100)일 때: High-pass (20Hz)만 적용 (450Hz 필터링 불가)
        # 100Hz 이상 성분은 이미 물리적으로 존재하지 않으므로 Low-pass 불필요
        b, a = signal.butter(4, low_cut/nyq, btype='high')
        emg_filtered = signal.filtfilt(b, a, emg, axis=0)
        print(f"Warning: {original_freq}Hz 데이터라 450Hz 필터는 생략하고 20Hz High-pass만 적용했습니다.")
    else:
        # 100Hz 이하 데이터 등: 필터 적용 위험하므로 원본 사용 혹은 단순 정규화
        emg_filtered = emg
        print("Warning: 샘플링 주파수가 너무 낮아 필터를 적용하지 않았습니다.")

    # 3. Resampling (target_freq로 통일, 예: 1000Hz)
    num_samples = int(len(emg_filtered) * target_freq / original_freq)
    emg_resampled = signal.resample(emg_filtered, num_samples, axis=0)
    label_resampled = signal.resample(label, num_samples, axis=0).round().astype(int)

    # 4. Sliding Window 처리
    window_size = int(target_freq * (window_ms / 1000))
    step_size = int(window_size * (1 - overlap_ratio))
    
    X, Y = [], []
    
    for i in range(0, len(emg_resampled) - window_size, step_size):
        window_data = emg_resampled[i:i+window_size, :] 
        window_label = label_resampled[i:i+window_size]
        
        unique, counts = np.unique(window_label, return_counts=True)
        final_label = unique[np.argmax(counts)]
        
        if final_label == 0: continue 
        
        X.append(window_data.T) # (Channel, Time)
        Y.append(final_label)
        
    return np.array(X), np.array(Y)

# 실행 (경로 확인)
x_data, y_data = preprocess_ninapro_robust('ninapro_db5/ninapro_db_1/S1_E2_A1.mat')

if x_data is not None:
    print(f"\n--- 전처리 결과 ---")
    print(f"데이터 shape: {x_data.shape}") # (Batch, 16, 500)
    print(f"라벨 shape: {y_data.shape}")

In [None]:
# 튜플 형태 제거 및 숫자형 변환
ninapro_df['Restimulus'] = ninapro_df['Restimulus'].apply(lambda x: x[0] if isinstance(x, (list, tuple, np.ndarray)) else x)
ninapro_df['rerepetition'] = ninapro_df['rerepetition'].apply(lambda x: x[0] if isinstance(x, (list, tuple, np.ndarray)) else x)

# 메모리 절약을 위해 데이터 타입 변환 (float32, int8)
ninapro_df.iloc[:, :16] = ninapro_df.iloc[:, :16].astype('float32')
ninapro_df['Restimulus'] = ninapro_df['Restimulus'].astype('int8')
ninapro_df['rerepetition'] = ninapro_df['rerepetition'].astype('int8')

def create_windows(data, window_size=40, step_size=20):
    """
    window_size: 200Hz 기준 40샘플은 200ms입니다.
    step_size: 윈도우 간의 겹침(Overlap) 정도입니다.
    """
    X = []
    y = []
    
    # 데이터 추출 (16개 EMG 컬럼)
    emg_data = data.iloc[:, :16].values
    labels = data['Restimulus'].values
    
    for i in range(0, len(data) - window_size, step_size):
        window_x = emg_data[i : i + window_size]
        # 윈도우 내에서 가장 많이 등장한 레이블을 해당 윈도우의 정답으로 선택
        window_y = np.bincount(labels[i : i + window_size]).argmax()
        
        X.append(window_x)
        y.append(window_y)
        
    return np.array(X), np.array(y)
# 1. Train / Test 세트 분리 (예: 2, 5회차를 Test로 사용)
test_reps = [2, 5]
train_df = ninapro_df[~ninapro_df['rerepetition'].isin(test_reps)]
test_df = ninapro_df[ninapro_df['rerepetition'].isin(test_reps)]

# 2. 윈도우 데이터 생성
# window_size=40 (200ms), step_size=10 (50% overlap 등 설정 가능)
X_train, y_train = create_windows(train_df, window_size=40, step_size=10)
X_test, y_test = create_windows(test_df, window_size=40, step_size=10)

print(f"Train 데이터 형태: {X_train.shape}") # (샘플 수, 40, 16)
print(f"Test 데이터 형태: {X_test.shape}")   # (샘플 수, 40, 16)