<a href="https://colab.research.google.com/github/yuhui-0611/ESAA/blob/main/ESAA_YB_WEEK13_2_Code_Review.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **스마트 해운물류 x AI 미션 챌린지 : 이상신호 감지 기반 비정상 작동 진단**

[link text](https://dacon.io/competitions/official/236590/codeshare/13593?page=1&dtype=recent)

- ID : 샘플별 고유 ID
- X_01 ~ X_52
> 스케일·분포는 피처별로 상이할 수 있음
- target : 고장 진단 target

In [1]:
# 라이브러리 임포트
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import RobustScaler
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.metrics import f1_score

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# 환경 설정 및 데이터 로딩
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train = pd.read_csv('/content/drive/MyDrive/DACON/train_smart.csv')
test = pd.read_csv('/content/drive/MyDrive/DACON/test_smart.csv')

## 1. 피처와 타깃 분리

In [4]:
# 피처 선택 및 전처리
feature_cols = [col for col in train.columns if col.startswith('X_')]
train_x = train[feature_cols]
train_y = train['target']
test_x = test[feature_cols]

## 2. 다중공선성 제거

In [5]:
def remove_multicollinearity(df, threshold=0.95):
    # 피처 간 상관계수 행렬
    corr_matrix = df.corr().abs()

    # 상삼각형만 남김
    upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

    # 상관계수 > 0.95면 → 제거 대상
    to_drop = [column for column in upper_triangle.columns if any(upper_triangle[column] > threshold)]

    # 제거된 피처 이름 리스트 반환
    return df.drop(columns=to_drop), to_drop
# function end

# train 기준으로 제거 피처 결정 → test에 그대로 적용
train_x_clean, dropped_features = remove_multicollinearity(train_x, threshold=0.95)
test_x_clean = test_x.drop(columns=dropped_features)

## 3. fit_transform

- 전체 피처 중 정보량이 많은 상위 20개만 선택

Mutual Information이란?
- X를 알면 Y에 대한 불확실성이 얼마나 줄어드나?
- 비선형 관계도 잡아냄
- 신경망과 궁합이 매우 좋음

In [7]:
K_BEST = 20
selector = SelectKBest(score_func=mutual_info_classif, k=K_BEST)

# train에서만 fit, test는 transform만
train_x_selected = selector.fit_transform(train_x_clean, train_y)
test_x_selected = selector.transform(test_x_clean)

# 선택된 피처 이름 추출
selected_features = train_x_clean.columns[selector.get_support()].tolist()

train_x_selected_df = pd.DataFrame(train_x_selected, columns=selected_features)
test_x_selected_df = pd.DataFrame(test_x_selected, columns=selected_features)

# 이상치 대응
scaler = RobustScaler()
train_x_scaled = scaler.fit_transform(train_x_selected_df)
test_x_scaled = scaler.transform(test_x_selected_df)

get_support() ?
- 피처 선택 결과로, "선택된 피처가 True, 버려진 피처가 False"인 마스크(mask)를 반환하는 함수
- fit이 끝난 뒤에만 쓸 수 있음
- 반환값: boolean 배열
> array([ True, False, True, False, False, True])


```
selected_features = train_x_clean.columns[selector.get_support()]
```
이 줄의 의미를 풀면:
- train_x_clean.columns → 전체 피처 이름 배열
- selector.get_support() → True/False 마스크

> True 위치의 컬럼 이름만 골라냄


In [None]:
# MLP 모델 정의
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim, dropout_rate=0.3):
        super(MLPClassifier, self).__init__()
        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.BatchNorm1d(hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, output_dim))
        self.network = nn.Sequential(*layers)
    def forward(self, x):
        return self.network(x)

In [None]:
# K-Fold 학습 및 앙상블
INPUT_DIM = train_x_scaled.shape[1]
HIDDEN_DIMS = [256, 128, 64, 32]
OUTPUT_DIM = train_y.nunique()
DROPOUT_RATE = 0.3
LEARNING_RATE = 0.001
NUM_EPOCHS = 100
PATIENCE = 15
BATCH_SIZE = 128

N_SPLITS = 5
kfold = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_SEED)

fold_results = []
fold_models = []

for fold, (train_idx, val_idx) in enumerate(kfold.split(train_x_scaled, train_y), 1):
    X_fold_train = train_x_scaled[train_idx]
    y_fold_train = train_y.iloc[train_idx]
    X_fold_val = train_x_scaled[val_idx]
    y_fold_val = train_y.iloc[val_idx]

    X_train_tensor = torch.FloatTensor(X_fold_train)
    y_train_tensor = torch.LongTensor(y_fold_train.values)
    X_val_tensor = torch.FloatTensor(X_fold_val)
    y_val_tensor = torch.LongTensor(y_fold_val.values)

    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    model = MLPClassifier(INPUT_DIM, HIDDEN_DIMS, OUTPUT_DIM, DROPOUT_RATE).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    best_val_f1 = 0.0
    best_model_state = None
    patience_counter = 0

    for epoch in range(NUM_EPOCHS):
        model.train()
        train_preds = []
        train_labels = []
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(outputs, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(batch_y.cpu().numpy())
        train_f1 = f1_score(train_labels, train_preds, average='macro')

        model.eval()
        val_preds = []
        val_labels = []
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X)
                _, predicted = torch.max(outputs, 1)
                val_preds.extend(predicted.cpu().numpy())
                val_labels.extend(batch_y.cpu().numpy())
        val_f1 = f1_score(val_labels, val_preds, average='macro')

        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
        if patience_counter >= PATIENCE:
            break

    fold_results.append(best_val_f1)
    fold_models.append(best_model_state)

In [None]:
# 테스트 데이터 예측 및 제출 파일 생성
test_x_tensor = torch.FloatTensor(test_x_scaled).to(device)
all_fold_predictions = []
for model_state in fold_models:
    model = MLPClassifier(INPUT_DIM, HIDDEN_DIMS, OUTPUT_DIM, DROPOUT_RATE).to(device)
    model.load_state_dict(model_state)
    model.eval()
    with torch.no_grad():
        outputs = model(test_x_tensor)
        probabilities = torch.softmax(outputs, dim=1)
        all_fold_predictions.append(probabilities.cpu().numpy())
ensemble_probabilities = np.mean(all_fold_predictions, axis=0)
ensemble_predictions = np.argmax(ensemble_probabilities, axis=1)

submission = pd.read_csv('./data/sample_submission.csv')
submission['target'] = ensemble_predictions
submission.to_csv('./submission/mlp_kfold_ensemble.csv', index=False)