### torchtext 라이브러리로 텍스트 분류 <hr>
- 1단계 - 데이터 전처리 : 숫자형식으로 변환하는 것 까지
- 2단계 - 모델 구현

In [67]:
import torch
from torchtext.datasets import AG_NEWS

# DataPipe 타입 >>> iterator 타입 형변환
train_iter = iter(AG_NEWS(split='train'))
test_iter = iter(AG_NEWS(split='test'))

In [68]:
train_label,test_label = [], []
for label,txt in train_iter:
    train_label.append(label)

for label,txt in test_iter:
    test_label.append(label)

In [3]:
import string
import re

preprocessed_sentences = []

# train_iter는 여기서 예시로 제공된 변수입니다.
# 이 코드를 사용하려면 train_iter를 적절히 정의해야 합니다.
import string
import re
from konlpy.tag import Okt

okt = Okt()

preprocessed_sentences = []

train_iter = iter(AG_NEWS(split='train'))

# train_iter는 여기서 예시로 제공된 변수입니다.
# 이 코드를 사용하려면 train_iter를 적절히 정의해야 합니다.
for _, sentence in train_iter:
    # 구두점 제거
    sentence = sentence.translate(str.maketrans('', '', string.punctuation))
    
    # 한글과 영문만 남기기
    sentence = re.sub(r'[^a-zA-Z가-힣]', ' ', sentence)  # 다른 문자들을 공백으로 대체

    sentence = re.sub(r'\b\w{1,2}\b', '', sentence)

    # 토큰화된 결과를 리스트에 추가
    preprocessed_sentences.append(sentence)


In [4]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from collections import Counter
from konlpy.tag import Okt

# 토큰화 인스턴스 생성
tokenizer = Okt()

In [5]:
train_tokens = [tokenizer.morphs(str(txt), stem=True) for txt in preprocessed_sentences]

In [8]:
test_tokens = [tokenizer.morphs(str(txt), stem=True) for label,txt in test_iter]

In [9]:
print(f'[train_tokens]     {len(train_tokens)}개')
print(f'[test_tokens]      {len(test_tokens)}개')
print(f'[train_tokens[0]]  {len(train_tokens[0])}개')
print(f'[test_tokens[0]]   {len(test_tokens[0])}개') 

[train_tokens]     120000개
[test_tokens]      7600개
[train_tokens[0]]  18개
[test_tokens[0]]   28개


In [10]:
# 영어 불용어 제거
from nltk.corpus import stopwords

stop_words = stopwords.words('english')
print('불용어 개수 :', len(stop_words))

불용어 개수 : 179


In [11]:
result = []
for word in train_tokens: 
    if word not in stop_words: 
        result.append(word)

In [12]:
# 단어 사전 생성 함수
# 단어사전 생성하기
def build_vocab(corpus, vocab_size, special_tokens):
    counter = Counter() # 단어 세는 애

    # 단어 / 토큰에 대한 빈도 수 계산
    for tokens in corpus:
        counter.update(tokens)

    # 단어 / 어휘 사전 생성하기
    vocab = special_tokens

    # 빈도수가 높은 단어부터 단어사전에 추가하기. 
    for token, count in counter.most_common(vocab_size):
        vocab.append(token)
    return vocab

In [13]:
vocab = build_vocab(result, vocab_size=5000, special_tokens=['<PAD>', '<UNK>']) # 0, 1

In [14]:
vocab

['<PAD>',
 '<UNK>',
 'the',
 'and',
 'for',
 'that',
 'The',
 'with',
 'its',
 'said',
 'has',
 'Reuters',
 'from',
 'his',
 'will',
 'was',
 'after',
 'have',
 'new',
 'their',
 'over',
 'are',
 'first',
 'more',
 'but',
 'two',
 'Monday',
 'Wednesday',
 'Tuesday',
 'Thursday',
 'this',
 'New',
 'Friday',
 'company',
 'Inc',
 'out',
 'against',
 'into',
 'year',
 'not',
 'than',
 'about',
 'yesterday',
 'last',
 'Iraq',
 'who',
 'were',
 'one',
 'Microsoft',
 'been',
 'they',
 'million',
 'had',
 'United',
 'Corp',
 'years',
 'week',
 'Sunday',
 'would',
 'which',
 'AFP',
 'could',
 'oil',
 'quot',
 'people',
 'today',
 'prices',
 'government',
 'percent',
 'President',
 'says',
 'when',
 'three',
 'time',
 'NEW',
 'Saturday',
 'world',
 'next',
 'game',
 'night',
 'off',
 'YORK',
 'software',
 'win',
 'back',
 'season',
 'China',
 'World',
 'team',
 'may',
 'second',
 'Internet',
 'announced',
 'Bush',
 'billion',
 'market',
 'security',
 'can',
 'some',
 'all',
 'victory',
 'killed'

In [15]:
# 인코딩 : 문자 > 숫자 변환
token2id = {token: idx for idx, token in enumerate(vocab)}

# 인코딩 : 숫자 > 문자 변환
id2token = {idx: token for idx, token in enumerate(vocab)}

In [16]:
# 리뷰에 문자를 정수로 변환 및 단어/어휘 사전에 없는 문자도 처리
UNK_ID =token2id.get('<UNK>') 

trainID = [[token2id.get(token, UNK_ID) for token in text] for text in train_tokens]
testID = [[token2id.get(token, UNK_ID) for token in text] for text in test_tokens]

In [17]:
def pad_sequences(sentences, max_length, pad, start = 'R'):
    result = []
    for sen in sentences:
        sen = sen[:max_length] if start == 'R' else sen[:-max_length] #start 매개변수가 R이면 오른쪽에서 잘라내기
        padd_sen = list(sen) + [pad]*(max_length - len(sen)) if start =='R' else ([pad]*(max_length - len(sen)) + list(sen)) # start 매개변수가 R이면 오른쪽부터 패딩 넣기
        result.append(padd_sen)

    return result

In [18]:
# 학습용, 테스트용 데이터 패딩 처리
PAD_ID = token2id.get('<PAD>')
MAX_LENGTH = 32  # 학습할 때 총 32개의 히든 스테이트를 갖게 됨

# 32개의 단어들만 존재
train_ids = pad_sequences(trainID, MAX_LENGTH, PAD_ID)
test_ids = pad_sequences(testID, MAX_LENGTH, PAD_ID)

In [19]:
print(f'[train_ids] ---> {len(train_ids[0])}개')
print(f'[test_ids] ---> {len(test_ids[0])}개')

[train_ids] ---> 32개
[test_ids] ---> 32개


In [78]:
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd

In [69]:
train = pd.Series(train_label)
train_label = train.replace({1:0,2:1,3:2,4:3})
train_label = train_label.to_list()

In [70]:
pd.Series(train_label).value_counts()

2    30000
3    30000
1    30000
0    30000
Name: count, dtype: int64

In [79]:
test = pd.Series(test_label)
test_label = test.replace({1:0,2:1,3:2,4:3})
test_label = test_label.to_list()

In [80]:
pd.Series(test_label).value_counts()

2    1900
3    1900
1    1900
0    1900
Name: count, dtype: int64

In [100]:
## 데이터셋 생성 : list -> tensor
# 학습용 데이터셋
dataTS = torch.LongTensor(train_ids)
labelTS = torch.tensor(train_label, dtype=torch.float32)


print(dataTS.shape, labelTS.shape)
trainDS = TensorDataset(dataTS, labelTS)


# 테스트용 데이터셋
testdataTS = torch.LongTensor(test_ids)
testlabelTS = torch.tensor(test_label, dtype=torch.float32)
testDS = TensorDataset(testdataTS, testlabelTS)

torch.Size([120000, 32]) torch.Size([120000])


In [101]:
# 데이터 로더 생성
BATCH_SIZE = 32

trainDL = DataLoader(trainDS, batch_size = BATCH_SIZE, shuffle = True)
testDL = DataLoader(testDS, batch_size = BATCH_SIZE, shuffle = True)

In [102]:
import torch
import torch.nn as nn
import numpy as np

In [103]:
class SentenceClassifier(nn.Module):
    def __init__(
            self,
            n_vocab,
            hidden_dim,
            embedding_dim,
            n_layers,
            dropout=0.5,
            bidirectional=True,
            model_type='lstm'
    ):
        super().__init__()

        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=embedding_dim,
            padding_idx=0
        )
        if model_type == 'rnn':
            self.model = nn.RNN(
                input_size=embedding_dim,
                hidden_size=hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout,
                batch_first=True,  # batch_first = True → (batch, seq, feature)
            )
        elif model_type =='lstm':
            self.model = nn.LSTM(
                input_size = embedding_dim,
                hidden_size=hidden_dim,
                num_layers=n_layers,
                bidirectional=bidirectional,
                dropout=dropout,
                batch_first=True,
            )
        if bidirectional:
            self.classifier = nn.Linear(hidden_dim*2, 1)
        else:
            self.classifier = nn.Linear(hidden_dim,1)
        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        output, _ = self.model(embeddings)
        last_output = output[:,-1,:]
        last_output = self.dropout(last_output)
        logits = self.classifier(last_output)
        return logits

In [104]:
# 학습 관련 변수
from torch.optim.lr_scheduler import ReduceLROnPlateau

MODEL = SentenceClassifier(len(token2id),64,128,2)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
EPOCHS = 5
LOSS_FN = nn.CrossEntropyLoss()
OPTIMIZER = torch.optim.Adam(MODEL.parameters(), lr=0.001)
SCHEDULER = ReduceLROnPlateau(OPTIMIZER, mode = 'min', patience = 3)

In [108]:
def training(model, DL, loss_fn, optimizer):
    model.train()
    lossList = []
    for i, (ids, label) in enumerate(DL): 
        label = label.to(DEVICE).unsqueeze(1)
        ids = ids.to(DEVICE)

        output = model(ids)
        
        loss = loss_fn(output, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        lossList.append(loss.item())
        
    train_loss = np.mean(lossList)
    print(f'[Train loss] ==> {train_loss:.4f}')
    return train_loss


In [109]:
def testing(model, DL, loss_fn):
    model.eval()
    losses = []
    correct = []
    with torch.no_grad():
        for ids, label in DL:
            label = label.to(DEVICE).unsqueeze(1)
            ids = ids.to(DEVICE)

            output = model(ids)
            loss = loss_fn(output, label) 
            losses.append(loss.item())
            yhat = torch.sigmoid(output) > 0.5
            correct.extend(
                torch.eq(yhat, label).cpu().tolist()
            )
        
    val_loss = np.mean(losses)
    val_acc = np.mean(correct) 
    print(f'[Valid loss] ==> {val_loss}    [Valid Accuracy] ==> {val_acc}')
    return val_loss, val_acc

In [110]:
train_losses = []
test_losses = []
test_acc = []

for epoch in range(EPOCHS):
    train_loss = training(MODEL, trainDL, LOSS_FN, OPTIMIZER)
    val_loss, val_acc = testing(MODEL, testDL, LOSS_FN)
    
    train_losses.append(train_loss)
    test_losses.append(val_loss) 
    test_acc.append(val_acc)

    SCHEDULER.step(val_loss)

    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss : {train_loss:.4f}, Val Loss : {val_loss:.4f}, Val Acc : {val_acc:.4f}")

    if SCHEDULER.num_bad_epochs >= SCHEDULER.patience:
        print(f'조기 종료 at epoch {epoch}')
        break

[Train loss] ==> 0.0000
[Valid loss] ==> 0.0    [Valid Accuracy] ==> 0.2505263157894737
Epoch 1/5, Train Loss : 0.0000, Val Loss : 0.0000, Val Acc : 0.2505
[Train loss] ==> 0.0000
[Valid loss] ==> 0.0    [Valid Accuracy] ==> 0.2505263157894737
Epoch 2/5, Train Loss : 0.0000, Val Loss : 0.0000, Val Acc : 0.2505
[Train loss] ==> 0.0000
[Valid loss] ==> 0.0    [Valid Accuracy] ==> 0.2505263157894737
Epoch 3/5, Train Loss : 0.0000, Val Loss : 0.0000, Val Acc : 0.2505
[Train loss] ==> 0.0000
[Valid loss] ==> 0.0    [Valid Accuracy] ==> 0.2505263157894737
Epoch 4/5, Train Loss : 0.0000, Val Loss : 0.0000, Val Acc : 0.2505
조기 종료 at epoch 3


In [111]:
# 모델 평가
train_loss_avg = np.mean(train_losses)
test_loss_avg = np.mean(test_losses)
print(f"Train Loss : {train_loss_avg:.4f}  Test Loss: {test_loss_avg:.4f}")

Train Loss : 0.0000  Test Loss: 0.0000
