# Setting

In [56]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 데이터 불러오기

In [57]:
!pip install konlpy



In [58]:
### 라이브러리
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import re
import urllib.request
import random
import torch
import warnings
warnings.filterwarnings('ignore')

from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

### 폰트 설정
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt

!apt-get install -qq fonts-nanum
font_path = '/usr/share/fonts/truetype/nanum/NanumGothic.ttf'
fm.fontManager.addfont(font_path)
plt.rc('font', family='NanumGothic')

In [59]:
### 라이브러리
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split


### 장치 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [60]:
### 시드 고정
random.seed(2024)
np.random.seed(2024)
torch.manual_seed(2024)
torch.cuda.manual_seed(2024)
torch.cuda.manual_seed_all(2024)
torch.backends.cudnn.deterministic = True

In [61]:
%cd '/content/drive/MyDrive/Euron-interm'

/content/drive/MyDrive/Euron-interm


## From here:

In [62]:
### 데이터
train = pd.read_csv('3_NLP/review_train.csv', encoding='utf-8')
test = pd.read_csv('3_NLP/review_test.csv', encoding='utf-8')
print(train.shape, test.shape)

(2026, 2) (400, 2)


In [63]:
### label encoding
from sklearn.preprocessing import LabelEncoder
n = train.shape[0]
y = pd.concat([train,test])

le = LabelEncoder()
y['keyword2'] = le.fit_transform(y['keyword2'])
train = y[:n]
test = y[n:]

### NLP

In [64]:
### okt
from konlpy.tag import Okt
okt = Okt()

### 불용어 리스트 불러오기
with open('/content/drive/MyDrive/Euron-interm/3_NLP/new_stopwords.txt', 'r', encoding='utf-8') as file:
    stop_words = file.readlines()
stop_words = [word.strip() for word in stop_words]

### TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer
tfidf_okt = TfidfVectorizer(tokenizer=okt.morphs, ngram_range=(1,2), min_df=2,
                            max_df=0.95, stop_words=stop_words)
tfidf_okt_matrix = tfidf_okt.fit_transform(train['reviews'])
tfidf_okt_matrix_test = tfidf_okt.transform(test['reviews'])

In [65]:
### X/y split
X = tfidf_okt_matrix
y = train['keyword2']
X_test = tfidf_okt_matrix_test
y_test = test['keyword2']

### train/valid split
X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.2, shuffle=True, random_state=2024
)

print(X_train.shape, X_valid.shape, X_test.shape)
print(y_train.shape, y_valid.shape, y_test.shape)

(1620, 92075) (406, 92075) (400, 92075)
(1620,) (406,) (400,)


### Baseline

In [66]:
### 베이스라인 LSTM 모델 구축
class LSTMModel(nn.Module):
  def __init__(self, input_dim, hidden_dim, output_dim):
    super(LSTMModel, self).__init__()
    self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True,
                        dropout=0.4, bidirectional=False)
    self.dense = nn.Linear(hidden_dim, hidden_dim)
    #self.dense1 = nn.Linear(hidden_dim, hidden_dim // 2)  # 추가된 Dense layer
    #self.dense2 = nn.Linear(hidden_dim // 2, hidden_dim // 4)
    self.relu = nn.ReLU()
    #self.leaky_relu = nn.LeakyReLU()
    self.fc = nn.Linear(hidden_dim, output_dim)
  def forward(self, x):
    x, _ = self.lstm(x)
    x = x[:, -1, :]
    x = self.dense(x)
    x = self.relu(x)
    x = self.fc(x)
    return x

In [67]:
### 모델 훈련 함수
def train(model, train_loader, criterion, optimizer, device):
  model.train()
  total_loss = 0
  correct = 0

  for X, y in tqdm(train_loader, desc='Training', leave=False):
    X = X.unsqueeze(1)  # (batch_size, seq_length, input_dim)
    X, y = X.to(device), y.to(device)
    optimizer.zero_grad()
    outputs = model(X)
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()
    total_loss += loss.item()
    _, predicted = torch.max(outputs, 1)
    correct += (predicted == y).sum().item()

  return total_loss / len(train_loader), correct / len(train_loader.dataset)

### 모델 평가 함수
def evaluate(model, test_loader, criterion, device):
  model.eval()
  total_loss = 0
  correct = 0
  with torch.no_grad():
    for X, y in test_loader:
      X = X.unsqueeze(1)  # (batch_size, seq_length, input_dim)
      X, y = X.to(device), y.to(device)
      outputs = model(X)
      loss = criterion(outputs, y)
      total_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      correct += (predicted == y).sum().item()

  return total_loss / len(test_loader), correct / len(test_loader.dataset)

In [68]:
X_tensor = torch.tensor(tfidf_okt_matrix.toarray(), dtype=torch.float32)
y_tensor = torch.tensor(y.values, dtype=torch.long)

### cross-validation
from sklearn.model_selection import KFold
kfold = KFold(n_splits=10, shuffle=True, random_state=2024)
BATCH_SIZE = 32


In [None]:
### 모델 설정
input_dim = tfidf_okt_matrix.shape[1]
hidden_dim = 64
output_dim = 13
lr = 0.001
epochs = 10
weight_decay = 1e-4
early_stopping_rounds = 3  # validation loss가 개선되지 않으면 n epoch 후 조기 중단
MIN_DELTA = 0
fold_val_loss = []
fold_val_acc = []

for fold, (train_indices, val_indices) in enumerate(kfold.split(X_tensor)):
  print(f'Fold {fold+1}/{kfold.n_splits}')

  train_dataset = TensorDataset(X_tensor[train_indices], y_tensor[train_indices])
  valid_dataset = TensorDataset(X_tensor[val_indices], y_tensor[val_indices])

  train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
  valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)

  model = LSTMModel(input_dim, hidden_dim, output_dim)
  model.to(device)

  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

  best_val_loss = float('inf')
  best_model_state = None
  patience_counter = 0

  for epoch in range(epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    print(f"[Training] Epoch {epoch+1}: Loss = {train_loss:.4f}, Accuracy = {train_acc:.4f}")

    valid_loss, valid_acc = evaluate(model, valid_loader, criterion, device)
    print(f"[Validation] Epoch {epoch+1}: Loss = {valid_loss:.4f}, Accuracy = {valid_acc:.4f}")

    # Early stopping 조건 확인
    if valid_loss < best_val_loss - MIN_DELTA:
      best_val_loss = valid_loss
      best_model_state = model.state_dict()
      patience_counter = 0  # 갱신되면 카운터 초기화
    else:
      patience_counter += 1

    if patience_counter >= early_stopping_rounds:
      print(f"Early stopping at epoch {epoch+1}")
      break

  # 모델 상태를 최상의 validation loss를 보였던 상태로 롤백
  if best_model_state:
    model.load_state_dict(best_model_state)

  fold_val_loss.append(best_val_loss)
  fold_val_acc.append(valid_acc)

print(f'Average Validation Loss: {np.mean(fold_val_loss):.4f}')
print(f'Average Validation Accuracy: {np.mean(fold_val_acc):.4f}')


Fold 1/10




[Training] Epoch 1: Loss = 2.5459, Accuracy = 0.1245
[Validation] Epoch 1: Loss = 2.5246, Accuracy = 0.1724




[Training] Epoch 2: Loss = 2.3267, Accuracy = 0.4213
[Validation] Epoch 2: Loss = 2.0008, Accuracy = 0.5369




[Training] Epoch 3: Loss = 1.4087, Accuracy = 0.6336
[Validation] Epoch 3: Loss = 0.9928, Accuracy = 0.7044




[Training] Epoch 4: Loss = 0.6907, Accuracy = 0.8892
[Validation] Epoch 4: Loss = 0.5822, Accuracy = 0.8571




[Training] Epoch 5: Loss = 0.2901, Accuracy = 0.9879
[Validation] Epoch 5: Loss = 0.3594, Accuracy = 0.9507




[Training] Epoch 6: Loss = 0.1101, Accuracy = 0.9984
[Validation] Epoch 6: Loss = 0.2648, Accuracy = 0.9606




[Training] Epoch 7: Loss = 0.0603, Accuracy = 0.9989
[Validation] Epoch 7: Loss = 0.2268, Accuracy = 0.9704


Training:  42%|████▏     | 24/57 [00:00<00:00, 56.94it/s]

In [None]:
### 모델 평가
y_test = torch.tensor(np.array(y_test), dtype=torch.long)
test_dataset = torch.utils.data.TensorDataset(torch.tensor(X_test.toarray(), dtype=torch.float32), y_test)
test_loader = torch.utils.data.DataLoader(test_dataset, shuffle=False, batch_size=BATCH_SIZE)

test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"[Test] Loss = {test_loss:.4f}, Accuracy = {test_acc:.4f}")

In [None]:
def predict_review(model, review, tfidf_vectorizer, okt, device, label_encoder):
    model.eval()
    with torch.no_grad():
        # 입력된 리뷰를 TF-IDF로 변환
        review_tfidf = tfidf_vectorizer.transform([review])
        review_tensor = torch.tensor(review_tfidf.toarray(), dtype=torch.float32).to(device)

        # LSTM이 3D 텐서를 기대하므로, 텐서의 차원을 확장해 줍니다.
        review_tensor = review_tensor.unsqueeze(0)  # (batch_size, seq_len, input_dim) 형태로 변환

        # 모델에 입력하고 예측 수행
        output = model(review_tensor)
        _, predicted_label = torch.max(output, 1)

        # 인코딩된 라벨을 원래의 라벨로 디코딩
        decoded_label = label_encoder.inverse_transform([predicted_label.item()])[0]

        return decoded_label

# 예시: 콘솔에 입력받아 모델로 예측 및 디코딩
for _ in range(5):
  review_input = input("리뷰를 입력하세요: ")

  predicted_label = predict_review(model, review_input, tfidf_okt, okt, device, le)
  print(f"예측된 키워드: {predicted_label}")
