In [5]:
import pandas as pd
import numpy as np
import joblib
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from imblearn.over_sampling import BorderlineSMOTE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score, f1_score, recall_score

# 노트북이 notebooks 폴더에 있으므로 상위 폴더 경로를 사용합니다
data_path = '../data/spotify_churn_dataset.csv'
onnx_output = '../models/spotify_dl_model.onnx'
preprocessor_output = '../models/dl_preprocessor.pkl'
metrics_json = '../data/model_metrics.json'

print("데이터를 불러와 학습 준비를 시작합니다...")
df = pd.read_csv(data_path)
if 'user_id' in df.columns:
    df = df.drop(columns=['user_id'])

# 의미 있는 분석을 위한 파생 변수들을 생성합니다
df['ad_burden'] = df['ads_listened_per_week'] / (df['listening_time'] + 1)
df['satisfaction_score'] = df['songs_played_per_day'] * (1 - df['skip_rate'])
df['time_per_song'] = df['listening_time'] / (df['songs_played_per_day'] + 1)

X = df.drop(columns=['is_churned'])
y = df['is_churned']

# 데이터를 학습, 검증, 테스트용으로 6:2:2 비율로 나눕니다
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42, stratify=y_train_val)

# 수치형과 범주형 데이터를 각각 처리하는 파이프라인입니다
num_features = ['age', 'listening_time', 'songs_played_per_day', 'skip_rate', 'ads_listened_per_week', 'offline_listening', 'ad_burden', 'satisfaction_score', 'time_per_song']
cat_features = ['gender', 'country', 'subscription_type', 'device_type']

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), num_features),
        ('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), cat_features)
    ])

X_train_proc = preprocessor.fit_transform(X_train)
X_val_proc = preprocessor.transform(X_val)
X_test_proc = preprocessor.transform(X_test)

# 데이터 불균형을 해결하기 위해 학습 데이터에만 SMOTE를 적용합니다
smote = BorderlineSMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train_proc, y_train)

# 파이토치 모델 학습을 위해 데이터를 텐서로 변환합니다
X_train_t = torch.FloatTensor(X_train_res)
y_train_t = torch.FloatTensor(y_train_res.values).view(-1, 1)
X_val_t = torch.FloatTensor(X_val_proc)
y_val_t = torch.FloatTensor(y_val.values).view(-1, 1)
X_test_t = torch.FloatTensor(X_test_proc)

train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=64, shuffle=True)

# 추천드린 LayerNorm을 적용한 신경망 구조입니다
class ChurnDNN(nn.Module):
    def __init__(self, input_size):
        super(ChurnDNN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_size, 512),
            nn.LayerNorm(512),
            nn.SiLU(),
            nn.Dropout(0.4),
            
            nn.Linear(512, 256),
            nn.LayerNorm(256),
            nn.SiLU(),
            nn.Dropout(0.4),
            
            nn.Linear(256, 128),
            nn.LayerNorm(128),
            nn.SiLU(),
            nn.Dropout(0.3),
            
            nn.Linear(128, 64),
            nn.LayerNorm(64),
            nn.SiLU(),
            nn.Dropout(0.3),
            
            nn.Linear(64, 32),
            nn.LayerNorm(32),
            nn.SiLU(),
            nn.Dropout(0.2),
            
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        return self.layers(x)

input_dim = X_train_res.shape[1]
model = ChurnDNN(input_dim)
optimizer = optim.Adam(model.parameters(), lr=0.0005)
loss_fn = nn.BCELoss()

# 학습을 진행하며 가장 성능이 좋은 시점의 가중치를 저장합니다
print("딥러닝 모델 학습을 시작합니다...")
best_val_loss = float('inf')
for epoch in range(150):
    model.train()
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        loss_fn(model(batch_x), batch_y).backward()
        optimizer.step()
    
    model.eval()
    with torch.no_grad():
        val_l = loss_fn(model(X_val_t), y_val_t)
        if val_l < best_val_loss:
            best_val_loss = val_l
            torch.save(model.state_dict(), 'best_temp.pt')

# 저장된 베스트 가중치를 불러와 ONNX 파일로 변환합니다
model.load_state_dict(torch.load('best_temp.pt'))
model.eval()
dummy_input = torch.randn(1, input_dim)
torch.onnx.export(model, dummy_input, onnx_output, input_names=['input'], output_names=['output'], dynamic_axes={'input': {0: 'batch_size'}})
os.remove('best_temp.pt')

# 최적의 임계값을 찾아 최종 성능을 측정합니다
with torch.no_grad():
    y_scores = model(X_test_t).numpy().flatten()

best_threshold, top_f1 = 0.5, 0
for t in np.arange(0.3, 0.7, 0.01):
    current_f1 = f1_score(y_test, (y_scores >= t).astype(int))
    if current_f1 > top_f1:
        top_f1 = current_f1
        best_threshold = t

# 최종 지표 계산 및 결과 저장
y_final_pred = (y_scores >= best_threshold).astype(int)
final_acc = accuracy_score(y_test, y_final_pred)
final_rec = recall_score(y_test, y_final_pred)

try:
    with open(metrics_json, 'r') as f:
        all_metrics = json.load(f)
except:
    all_metrics = {}

all_metrics['Deep Learning (DNN)'] = {
    'Accuracy': float(final_acc),
    'Recall': float(final_rec),
    'F1-Score': float(top_f1),
    'Best Threshold': float(best_threshold)
}

with open(metrics_json, 'w') as f:
    json.dump(all_metrics, f, indent=4)

joblib.dump(preprocessor, preprocessor_output)
print(f"학습 완료! [정확도: {final_acc:.4f}, 리콜: {final_rec:.4f}, F1: {top_f1:.4f}]")

데이터를 불러와 학습 준비를 시작합니다...
딥러닝 모델 학습을 시작합니다...


  torch.onnx.export(model, dummy_input, onnx_output, input_names=['input'], output_names=['output'], dynamic_axes={'input': {0: 'batch_size'}})


[torch.onnx] Obtain model graph for `ChurnDNN([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `ChurnDNN([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...
[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
학습 완료! [정확도: 0.7980, 리콜: 0.8962, F1: 0.7383]
