In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, accuracy_score, classification_report, recall_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.metrics import accuracy_score, recall_score, classification_report


# -----------------------
# Dataset 정의
# -----------------------
class TabularDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y.values if isinstance(y, pd.Series) else y, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class ResidualBlock1D(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=False):
        super().__init__()
        stride = 2 if downsample else 1
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.downsample = downsample
        self.proj = (
            nn.Conv1d(in_channels, out_channels, 1, stride=stride) if downsample or in_channels!=out_channels else nn.Identity()
        )
    def forward(self, x):
        identity = self.proj(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        return F.relu(out)

class ImprovedCNN1DClassifier(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.initial_bn = nn.BatchNorm1d(input_dim)
        self.conv = nn.Sequential(
            nn.Conv1d(1, 64, 3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            ResidualBlock1D(64, 128, downsample=True),
            ResidualBlock1D(128, 256, downsample=True),
            nn.AdaptiveMaxPool1d(1),
            nn.Flatten(),
        )

        self.mlp = nn.Sequential(
            nn.Linear(256, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 32),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, 1)
        )
    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv(x)
        return self.mlp(x)

# -----------------------
# 데이터 로드
# -----------------------
x_train = pd.read_csv("./x.csv")
y_train = pd.read_csv("./y.csv")

# 클래스 정수형 확인
y_train = y_train.squeeze()
y_train = y_train.astype(int)

# 표준화
scaler = StandardScaler()
X_scaled = scaler.fit_transform(x_train)

# SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_scaled, y_train)

# 훈련/검증 분리
X_train, X_val, y_train_split, y_val = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=42)

train_dataset = TabularDataset(X_train, y_train_split)
val_dataset = TabularDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True) #->64->128
val_loader = DataLoader(val_dataset, batch_size=256) # 64->128

# -----------------------
# 모델 설정
# -----------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedCNN1DClassifier(input_dim=x_train.shape[1]).to(device)

# pos_weight 계산
labels = np.unique(y_train_split)
class_weights = compute_class_weight(class_weight='balanced', classes=labels, y=y_train_split)
weight_dict = dict(zip(labels, class_weights))
pos_weight = torch.tensor([weight_dict[1] / weight_dict[0]]).to(device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)  ##4->3

train_losses =[]
val_losses =[]

# -----------------------
# 학습
# -----------------------
num_epochs = 100
for epoch in range(1, num_epochs + 1):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        logits = model(X_batch)
        loss = criterion(logits, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # 검증 손실
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            logits = model(X_batch)
            val_loss += criterion(logits, y_batch).item()
    avg_val_loss = val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    print(f"[Epoch {epoch}] Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
torch.save(model.state_dict(), './CNN_MLP_final_model.pth')

In [None]:
# -----------------------
# 평가
# -----------------------
model.eval()
all_probs, all_targets = [], []

with torch.no_grad():
    for X_batch, y_batch in val_loader:
        X_batch = X_batch.to(device)
        logits = model(X_batch)
        probs = torch.sigmoid(logits).cpu().numpy()
        all_probs.extend(probs)
        all_targets.extend(y_batch.numpy())

all_probs = np.array(all_probs).flatten()
all_targets = np.array(all_targets).flatten()

# Threshold 튜닝
best_f1, best_thresh = 0, 0
for t in np.arange(0.1, 0.9, 0.01):
    preds = (all_probs > t).astype(int)
    f1 = f1_score(all_targets, preds)
    if f1 > best_f1:
        best_f1 = f1
        best_thresh = t

# 최종 예측
final_preds = (all_probs > best_thresh).astype(int)
acc = accuracy_score(all_targets, final_preds)
report = classification_report(all_targets, final_preds, digits=4)

print(f"\n✅ Best Threshold: {best_thresh:.2f}")
print(f"✅ Accuracy       : {acc:.4f}")
print(f"✅ F1 Score       : {best_f1:.4f}")
print(report)
