In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split as tts
import numpy as np
import re
import torch.nn.utils.rnn as rnn_utils

# 데이터 불러오기
df = pd.read_csv("netflix_reviews.csv")  # 파일 불러오기

# 텍스트 전처리 함수
def preprocess_text(text):
    if isinstance(text, float):
        return ""
    text = text.lower()  # 대문자를 소문자로
    text = re.sub(r'[^\w\s]', '', text)  # 구두점 제거
    text = re.sub(r'\d+', '', text)  # 숫자 제거
    text = text.strip()  # 띄어쓰기 제외하고 빈 칸 제거
    return text

df['content'] = df['content'].apply(preprocess_text)  # 텍스트 전처리

X = df['content'].tolist()  # 리뷰 리스트
y = df['score'].tolist()  # 점수 리스트

# 어휘 정수화
def text_pipeline(text):
    return [vocab[token] for token in tokenizer(text)]

def label_pipeline(label):
    return np.int64(label) - 1

X_tr, X_te, y_tr, y_te = tts(X, y, test_size=0.2, random_state=42)

# 데이터셋 클래스 정의
class ReviewDataset(Dataset):
    def __init__(self, X, y, text_pipeline, label_pipeline):
        self.X = X
        self.y = y
        self.text_pipeline = text_pipeline
        self.label_pipeline = label_pipeline

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        review = self.text_pipeline(self.X[idx])
        rating = self.label_pipeline(self.y[idx])
        return torch.tensor(review, dtype=torch.long), torch.tensor(rating, dtype=torch.long)  # dtype을 long으로 설정

# 토크나이저 정의
tokenizer = get_tokenizer('basic_english')

# 어휘 사전 생성 함수
def yield_tokens(data_iter):
    for text in data_iter:
        yield tokenizer(text)

# 데이터셋 정의
train_dataset = ReviewDataset(X_tr, y_tr, text_pipeline, label_pipeline)
test_dataset = ReviewDataset(X_te, y_te, text_pipeline, label_pipeline)

# 어휘 사전 생성
vocab = build_vocab_from_iterator(yield_tokens(X))

# 데이터 로더 정의
BATCH_SIZE = 64

def collate_batch(batch):
    reviews, labels = zip(*batch)
    reviews_padded = rnn_utils.pad_sequence(reviews, batch_first=True)
    labels_tensor = torch.tensor(labels, dtype=torch.long)  # labels_tensor도 dtype을 long으로 설정
    return reviews_padded, labels_tensor

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)

# LSTM 모델 정의 (4 레이어)
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.lstm1 = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.lstm2 = nn.LSTM(hidden_dim * 2, hidden_dim, batch_first=True, bidirectional=True)
        self.lstm3 = nn.LSTM(hidden_dim * 2, hidden_dim, batch_first=True, bidirectional=True)  # 추가된 레이어
        self.lstm4 = nn.LSTM(hidden_dim * 2, hidden_dim, batch_first=True, bidirectional=True)  # 추가된 레이어
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)  # 첫 번째 완전 연결층
        self.fc2 = nn.Linear(hidden_dim, output_dim)  # 최종 출력층

    def forward(self, text):
        embedded = self.embedding(text)
        lstm_out, (hidden, cell) = self.lstm1(embedded.unsqueeze(1))
        lstm_out, (hidden, cell) = self.lstm2(lstm_out)
        lstm_out, (hidden, cell) = self.lstm3(lstm_out)  # 3번째 LSTM 레이어
        lstm_out, (hidden, cell) = self.lstm4(lstm_out)  # 4번째 LSTM 레이어

        # 양방향의 hidden state를 결합
        hidden_cat = torch.cat((hidden[-2], hidden[-1]), dim=1)

        return self.fc2(self.fc1(hidden_cat))

# 하이퍼파라미터 정의
VOCAB_SIZE = len(vocab)
EMBED_DIM = 64
HIDDEN_DIM = 128
OUTPUT_DIM = len(set(y))  # 점수의 고유한 개수 (1, 2, 3, ... 를 고려)

# 모델 초기화
model = LSTMModel(VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM, OUTPUT_DIM)

# 손실 함수와 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.05)

# 모델 학습 함수 정의
def train_model(model, train_dataloader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_dataloader:
            reviews, labels = batch
            optimizer.zero_grad()  # 기울기 초기화
            output = model(reviews)  # 순전파
            loss = criterion(output, labels)  # 손실 계산
            loss.backward()  # 역전파
            optimizer.step()  # 매개변수 업데이트
            total_loss += loss.item()
        print(f'Epoch {epoch + 1}, Loss: {total_loss / len(train_dataloader)}')

# 모델 학습
num_epochs = 100  # 학습할 에폭 수 조정 가능
train_model(model, train_dataloader, criterion, optimizer, num_epochs)

# 테스트 세트에서 모델 평가
def evaluate_model(model, test_dataloader):
    correct = 0
    total = 0
    with torch.no_grad():  # 평가 시에는 기울기 계산을 하지 않음
        for reviews, ratings in test_dataloader:
            outputs = model(reviews)
            _, predicted = torch.max(outputs, 1)
            total += ratings.size(0)
            correct += (predicted == ratings).sum().item()

    print(f'Accuracy: {100 * correct / total}%')

evaluate_model(model, test_dataloader)



Epoch 1, Loss: 1.4411939945644079
Epoch 2, Loss: 1.438289221480438
Epoch 3, Loss: 1.4380339547635752
Epoch 4, Loss: 1.4379027957395483
Epoch 5, Loss: 1.4380565780014716
Epoch 6, Loss: 1.4382063725702592
Epoch 7, Loss: 1.4380544679563607
Epoch 8, Loss: 1.4380712150306831
Epoch 9, Loss: 1.4379007557553236
Epoch 10, Loss: 1.4382413769744768
Epoch 11, Loss: 1.437915055987778
Epoch 12, Loss: 1.437999902487615
Epoch 13, Loss: 1.4380887988484354
Epoch 14, Loss: 1.4380630917923443
Epoch 15, Loss: 1.4379224646213518
Epoch 16, Loss: 1.4379305858254026
Epoch 17, Loss: 1.4379645823618659
Epoch 18, Loss: 1.4378050892019434
Epoch 19, Loss: 1.4376814013048245
Epoch 20, Loss: 1.437378289349657
Epoch 21, Loss: 1.4371888463407654
Epoch 22, Loss: 1.43679916533187
Epoch 23, Loss: 1.4355940978681676
Epoch 24, Loss: 1.4332314667034474
