# 제품 이상여부 판별 프로젝트


## 1. 데이터 불러오기


### 필수 라이브러리


In [1]:
import os
import notebook
notebook_path = os.path.abspath("")

print(notebook_path)

/home/work/Aimers/hari


In [1]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import KFold
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable

import os
from pprint import pprint
from typing import Any, Dict

from rtdl_revisiting_models import MLP, ResNet, FTTransformer
import scipy.special
import zero
import pickle

In [2]:
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

### 데이터 읽어오기


In [3]:
ROOT_DIR = "Data"
RANDOM_STATE = 881

# Load data
train_data = pd.read_csv(os.path.join(ROOT_DIR, "train.csv"))

### 언더 샘플링


데이타 불균형을 해결하기 위해 언더 샘플링을 진행합니다.


In [4]:
df_normal = train_data[train_data["target"] == "Normal"]
df_abnormal = train_data[train_data["target"] == "AbNormal"]

num_normal = len(df_normal)
num_abnormal = len(df_abnormal)
print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}")

# normal_ratio = 14.5  # 1.0 means 1:1 ratio
normal_ratio = 1.0  # 1.0 means 1:1 ratio
df_normal = df_normal.sample(n=int(num_abnormal * normal_ratio), replace=False, random_state=RANDOM_STATE)
df_concat = pd.concat([df_normal, df_abnormal], axis=0).reset_index(drop=True)
df_concat.value_counts("target")

  Total: Normal: 38156, AbNormal: 2350


target
AbNormal    2350
Normal      2350
dtype: int64

### 데이터 분할


## 3. 모델 학습


### 데이터 전처리


In [5]:
# 데이터 전처리
features = []
target = 'target'
cat_features = []
num_features = []

# 범주형 특성과 수치형 특성 분리 및 전처리
for col in df_concat.columns:
    if col != target:
        if df_concat[col].dtype == 'object':
            le = LabelEncoder()
            df_concat[col] = le.fit_transform(df_concat[col])
            cat_features.append(col)
        else:
            if df_concat[col].nunique() > 1: 
                df_concat[col] = df_concat[col].astype(float)
                num_features.append(col)
                
for col in cat_features[:]:  # 리스트의 복사본을 생성
    if "Workorder" in col:
        cat_features.remove(col)
        
train_cardinalities  = [df_concat[cat].nunique() for cat in cat_features]     

features = num_features + cat_features

X = df_concat[features].values
y = df_concat[target].values

scaler = StandardScaler()
X[:, :len(num_features)] = scaler.fit_transform(X[:, :len(num_features)])

In [6]:
X = df_concat[features].values
y = df_concat["target"].values

In [7]:
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

In [8]:
# Focal Loss 정의
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss

In [9]:
# FT-Transformer 모델 정의
class FTT(nn.Module):
    def __init__(self, n_cont_features, cat_cardinalities, d_out=2, attention_n_heads=8, n_blocks=6, d_block=192):
        super(FTT, self).__init__()
        self.transformer = FTTransformer(
            n_cont_features=n_cont_features,
            cat_cardinalities=cat_cardinalities,
            d_out=d_out,  
            d_block=d_block,
            n_blocks=n_blocks,
            attention_n_heads=attention_n_heads,
            attention_dropout=0.1,
            ffn_d_hidden=None,
            ffn_d_hidden_multiplier=4 / 3,
            ffn_dropout=0.1,
            residual_dropout=0.0,
        )

    def forward(self, x_num, x_cat):
        return self.transformer(x_num, x_cat)

In [10]:
# # 언더샘플링 및 오버샘플링을 위한 파이프라인 구성
# over = SMOTE(random_state=RANDOM_STATE)
# under = RandomUnderSampler(random_state=RANDOM_STATE)
# pipeline = Pipeline(steps=[('u', under),('o', over)])

In [11]:
# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 4-fold cross-validation setup
kf = KFold(n_splits=4, shuffle=True, random_state=RANDOM_STATE)

f1_scores_val = []
best_model = None
best_f1_score = 0
num_epochs = 10
best_model_path = 'best_model.pt'

Using device: cuda


In [12]:
model = FTT(n_cont_features=len(num_features), cat_cardinalities=train_cardinalities).to(device)

In [13]:
for fold, (train_index, val_index) in enumerate(tqdm(kf.split(X), desc="Cross-Validation Folds")):  # Wrap the outer loop with tqdm
    
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]

    X_train_num = X_train[:, :len(num_features)]
    X_train_cat = X_train[:, len(num_features):]
    X_val_num = X_val[:, :len(num_features)]
    X_val_cat = X_val[:, len(num_features):]
    X_train_num_tensor = torch.tensor(X_train_num, dtype=torch.float32).to(device)
    X_train_cat_tensor = torch.tensor(X_train_cat, dtype=torch.long).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
    X_val_num_tensor = torch.tensor(X_val_num, dtype=torch.float32).to(device)
    X_val_cat_tensor = torch.tensor(X_val_cat, dtype=torch.long).to(device)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(device)

    train_dataset = torch.utils.data.TensorDataset(X_train_num_tensor, X_train_cat_tensor, y_train_tensor)
    val_dataset = torch.utils.data.TensorDataset(X_val_num_tensor, X_val_cat_tensor, y_val_tensor)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False)

    criterion = FocalLoss(alpha=0.25, gamma=2, reduction='mean')  
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
    
    # Training loop
    for epoch in range(num_epochs):  
        model.train()
        running_loss = 0.0
        all_train_labels = []
        all_train_preds = []
        
        for x_num, x_cat, labels in train_loader:
            x_num, x_cat, labels = x_num.to(device), x_cat.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(x_num, x_cat)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_train_preds.extend(preds.cpu().numpy())
            all_train_labels.extend(labels.cpu().numpy())
        
        avg_loss = running_loss / len(train_loader)
        train_f1 = f1_score(all_train_labels, all_train_preds, pos_label=1)
        if (epoch + 1) % 10 == 0:  # Print loss and F1 score every 10 epochs
            tqdm.write(f"Fold {fold+1}, Epoch {epoch+1}, Loss: {avg_loss:.4f}, Train F1: {train_f1:.4f}")


        scheduler.step()
    
    # Validation loop
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for x_num, x_cat, labels in val_loader:  # Wrap the validation batch loop with tqdm
            x_num, x_cat, labels = x_num.to(device), x_cat.to(device), labels.to(device)
            outputs = model(x_num, x_cat)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    val_f1 = f1_score(all_labels, all_preds, pos_label=1)
    f1_scores_val.append(val_f1)

    if val_f1 > best_f1_score:
        best_f1_score = val_f1
        best_model = model
        torch.save(model.state_dict(), best_model_path)
        acc = accuracy_score(all_preds,all_labels)
        print("best accuracy is updated as ",acc)
        print("best f1 score is updated as ",best_f1_score)

print(f'Average Validation F1-Score: {np.mean(f1_scores_val)}')
print(f'Best Validation F1-Score: {best_f1_score}')

Cross-Validation Folds: 0it [00:19, ?it/s]

Fold 1, Epoch 10, Loss: 0.0422, Train F1: 0.5810


Cross-Validation Folds: 1it [00:19, 19.25s/it]

best accuracy is updated as  0.5651063829787234
best f1 score is updated as  0.5591026747195859


Cross-Validation Folds: 1it [00:36, 19.25s/it]

Fold 2, Epoch 10, Loss: 0.0424, Train F1: 0.5718


Cross-Validation Folds: 2it [00:36, 17.93s/it]

best accuracy is updated as  0.5736170212765958
best f1 score is updated as  0.5662337662337663


Cross-Validation Folds: 2it [00:53, 17.93s/it]

Fold 3, Epoch 10, Loss: 0.0422, Train F1: 0.5737


Cross-Validation Folds: 3it [00:53, 17.52s/it]

best accuracy is updated as  0.5625531914893617
best f1 score is updated as  0.6129518072289156


Cross-Validation Folds: 3it [01:10, 17.52s/it]

Fold 4, Epoch 10, Loss: 0.0423, Train F1: 0.6136


Cross-Validation Folds: 4it [01:10, 17.57s/it]

best accuracy is updated as  0.5702127659574469
best f1 score is updated as  0.6294937637564195
Average Validation F1-Score: 0.5919455029846719
Best Validation F1-Score: 0.6294937637564195





## 4. 제출하기


### 테스트 데이터 예측


테스트 데이터 불러오기


In [14]:
test_data = pd.read_csv(os.path.join(ROOT_DIR, "test.csv"))

In [15]:
features = []
cat_features = []
num_features = []

for col in test_data.columns:
    if test_data[col].dtype == 'object':
        le = LabelEncoder()
        test_data[col] = le.fit_transform(test_data[col])
        cat_features.append(col)
    else:
        if test_data[col].nunique() > 1: 
            test_data[col] = test_data[col].astype(float)
            num_features.append(col)
            
cat_features.remove('Set ID')   

for col in cat_features[:]:  # 리스트의 복사본을 생성
    if "Workorder" in col:
        cat_features.remove(col)
        
test_cardinalities  = [test_data[cat].nunique() for cat in cat_features] 

features = num_features + cat_features

df_test_x = test_data[features]

X_test = df_test_x.values
X_test[:, :len(num_features)] = scaler.transform(X_test[:, :len(num_features)])

X_test_num = X_test[:, :len(num_features)]
X_test_cat = X_test[:, len(num_features):]

X_test_num_tensor = torch.tensor(X_test_num, dtype=torch.float32).to(device)
X_test_cat_tensor = torch.tensor(X_test_cat, dtype=torch.long).to(device)

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [17]:
# Loading the best model
best_model = FTT(n_cont_features=len(num_features), cat_cardinalities=test_cardinalities).to(device)
best_model.load_state_dict(torch.load(best_model_path, map_location=device))

<All keys matched successfully>

In [18]:
test_dataset = torch.utils.data.TensorDataset(X_test_num_tensor, X_test_cat_tensor)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [19]:
# 예측 결과를 저장할 리스트
all_preds = []

with torch.no_grad():
    for X_test_num_batch, X_test_cat_batch in test_loader:
        # 배치를 디바이스로 이동
        X_test_num_batch = X_test_num_batch.to(device)
        X_test_cat_batch = X_test_cat_batch.to(device)
        
        # 예측 수행
        batch_preds = best_model(X_test_num_batch, X_test_cat_batch)
        batch_preds = torch.argmax(batch_preds, dim=1).cpu().numpy()
        
        # 예측 결과를 리스트에 추가
        all_preds.extend(batch_preds)

print("Test Predictions:", all_preds)

Test Predictions: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

In [20]:
converted_test_preds = ["Normal" if x == 1 else "AbNormal" for x in all_preds]

In [21]:
tmp = set(converted_test_preds)
print(tmp)

{'Normal'}


### 제출 파일 작성


In [22]:
# 리스트 저장하기
with open('preds.pkl', 'wb') as file:
    pickle.dump(all_preds, file)

In [23]:
with open('preds.pkl', 'rb') as file:
    test_preds = pickle.load(file)

In [24]:
# 제출 데이터 읽어오기 (df_test는 전처리된 데이터가 저장됨)
df_sub = pd.read_csv("submission.csv")
df_sub["target"] = converted_test_preds

# 제출 파일 저장
df_sub.to_csv("submission.csv", index=False)

**우측 상단의 제출 버튼을 클릭해 결과를 확인하세요**
