# PyTorch LSTM을 이용한 감정 분석 모델 구현

- 데이터 준비 및 전처리
  - 데이터 다운로드 및 로드
  - 전처리
  - 데이터셋 분할 및 DataLoader 생성
- 모델 구축
  - LSTM 기반 감정 분석 모델 정의
- 모델 학습
  - 손실 함수 및 최적화기 설정
  - 모델 학습 루프 구현
- 모델 평가
  - 테스트 데이터로 평가
- 예측 및 결과 확인
  - 임의 문장으로 예측 수행

## 데이터 준비 및 전처리

데이터 다운로드 및 로드

In [None]:
!wget https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz
import os

def load_data(path):
    texts = []
    labels = []
    for label_type in ['neg', 'pos']:
        dir_name = os.path.join(path, label_type)
        for fname in os.listdir(dir_name):
            if fname.endswith('.txt'):
                with open(os.path.join(dir_name, fname), 'r', encoding='utf-8') as f:
                    texts.append(f.read())
                labels.append(0 if label_type == 'neg' else 1)
    return texts, labels

# 훈련 데이터 로드
train_texts, train_labels = load_data('aclImdb/train')

# 테스트 데이터 로드
test_texts, test_labels = load_data('aclImdb/test')

전처리

In [None]:
import re
from nltk.tokenize import word_tokenize
import nltk
nltk.download('punkt_tab')

def preprocess(text):
    # HTML 태그 제거
    text = re.sub(r"<.*?>", "", text)
    # 알파벳 이외의 문자 제거
    text = re.sub(r"[^a-zA-Z]", " ", text)
    # 소문자 변환
    text = text.lower()
    # 토큰화
    tokens = word_tokenize(text)
    return tokens

# 훈련 데이터 전처리
train_tokens = [preprocess(text) for text in train_texts]

# 테스트 데이터 전처리
test_tokens = [preprocess(text) for text in test_texts]

In [None]:
from collections import Counter

# 모든 단어를 모아서 빈도수 계산
all_tokens = [token for tokens in train_tokens for token in tokens]
word_counts = Counter(all_tokens)

# 가장 많이 등장한 단어 순으로 정렬하여 단어 사전 생성
vocab = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.items() if count >= 5]

# 단어와 인덱스를 매핑
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}

vocab_size = len(vocab)
print(f"단어 사전 크기: {vocab_size}")

In [None]:
# 최대 시퀀스 길이 설정
max_seq_len = 200

def tokens_to_indices(tokens_list, word_to_idx, max_seq_len):
    sequences = []
    for tokens in tokens_list:
        seq = [word_to_idx.get(token, word_to_idx['<UNK>']) for token in tokens]
        if len(seq) < max_seq_len:
            seq += [word_to_idx['<PAD>']] * (max_seq_len - len(seq))
        else:
            seq = seq[:max_seq_len]
        sequences.append(seq)
    return sequences

# 훈련 데이터 변환
train_sequences = tokens_to_indices(train_tokens, word_to_idx, max_seq_len)

# 테스트 데이터 변환
test_sequences = tokens_to_indices(test_tokens, word_to_idx, max_seq_len)

데이터셋 분할 및 DataLoader 생성

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = torch.tensor(self.sequences[idx], dtype=torch.long)
        label = torch.tensor(self.labels[idx], dtype=torch.float)
        return sequence, label

In [None]:
# 데이터셋 생성
train_dataset = TextDataset(train_sequences, train_labels)
test_dataset = TextDataset(test_sequences, test_labels)

# DataLoader 생성
batch_size = 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 모델 구축

In [None]:
import torch.nn as nn

class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, output_size, num_layers):
        super(SentimentLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x: [batch_size, seq_len]
        embedded = self.embedding(x)
        # embedded: [batch_size, seq_len, embed_size]
        lstm_out, (hidden, cell) = self.lstm(embedded)
        # lstm_out: [batch_size, seq_len, hidden_size]
        # hidden: [num_layers, batch_size, hidden_size]
        out = self.fc(hidden[-1])
        # out: [batch_size, output_size]
        out = self.sigmoid(out)
        return out.squeeze()

## 모델 학습

손실 함수 및 최적화기 설정

In [None]:
# 하이퍼파라미터 설정
embed_size = 128
hidden_size = 128
output_size = 1
num_layers = 2
num_epochs = 5
learning_rate = 0.001

# 모델 초기화
model = SentimentLSTM(vocab_size, embed_size, hidden_size, output_size, num_layers)

# 손실 함수와 최적화기 정의
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

모델 학습 루프 구현

In [None]:
model.train()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    total_loss = 0
    for sequences, labels in train_loader:
        sequences, labels = sequences.to(device), labels.to(device)

        # 기울기 초기화
        optimizer.zero_grad()

        # 모델 예측
        outputs = model(sequences)

        # 손실 계산
        loss = criterion(outputs, labels)

        # 역전파
        loss.backward()

        # 가중치 업데이트
        optimizer.step()

        total_loss += loss.item()
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

## 모델 평가

In [None]:
model.eval()

with torch.no_grad():
    correct = 0
    total = 0
    for sequences, labels in test_loader:
        sequences, labels = sequences.to(device), labels.to(device)
        outputs = model(sequences)
        predicted = (outputs >= 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    print(f"테스트 정확도: {100 * correct / total:.2f}%")

## 예측 및 결과 확인

In [None]:
def predict_sentiment(text):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    tokens = preprocess(text)
    seq = [word_to_idx.get(token, word_to_idx['<UNK>']) for token in tokens]
    if len(seq) < max_seq_len:
        seq += [word_to_idx['<PAD>']] * (max_seq_len - len(seq))
    else:
        seq = seq[:max_seq_len]
    sequence = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
    model.eval()
    with torch.no_grad():
        output = model(sequence)
        prediction = '긍정' if output.item() >= 0.5 else '부정'
        print(f"입력 문장: {text}")
        print(f"예측 확률: {output.item():.4f}")
        print(f"예측 결과: {prediction}")

임의 문장으로 예측 수행

In [None]:
# 예시 문장
test_sentence = "This movie was fantastic! I really enjoyed it."
predict_sentiment(test_sentence)

In [None]:
torch.save(model.state_dict(), "lstm_5epoch.pt")

In [None]:
model2 = SentimentLSTM(vocab_size, embed_size, hidden_size, output_size, num_layers)
model2.load_state_dict(torch.load("lstm_5epoch.pt"))
model2.eval()

def predict_sentiment(text):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    tokens = preprocess(text)
    seq = [word_to_idx.get(token, word_to_idx['<UNK>']) for token in tokens]
    if len(seq) < max_seq_len:
        seq += [word_to_idx['<PAD>']] * (max_seq_len - len(seq))
    else:
        seq = seq[:max_seq_len]
    sequence = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
    model2.eval()
    with torch.no_grad():
        output = model(sequence)
        prediction = '긍정' if output.item() >= 0.5 else '부정'
        print(f"입력 문장: {text}")
        print(f"예측 확률: {output.item():.4f}")
        print(f"예측 결과: {prediction}")

test_sentence = "This movie was fantastic! I really enjoyed it."
predict_sentiment(test_sentence)