# 스팸메일 데이터셋 로드 & 전처리

In [1]:
import urllib.request
import pandas as pd
import numpy as np
import torch

In [2]:
# 스팸 메일 분류를 위한 데이터 다운로드
urllib.request.urlretrieve("https://raw.githubusercontent.com/mohitgupta-omg/Kaggle-SMS-Spam-Collection-Dataset-/master/spam.csv", filename="spam.csv")
data = pd.read_csv('spam.csv', encoding='latin-1')
data[:5]

Unnamed: 0,v1,v2,Unnamed: 2,Unnamed: 3,Unnamed: 4
0,ham,"Go until jurong point, crazy.. Available only ...",,,
1,ham,Ok lar... Joking wif u oni...,,,
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...,,,
3,ham,U dun say so early hor... U c already then say...,,,
4,ham,"Nah I don't think he goes to usf, he lives aro...",,,


In [17]:
# spam을 1로 그렇지 않은 것을 0으로 변환하고 label을 분리
data['v1'] = data['v1'].replace(['ham', 'spam'], [0, 1])
y_data = list(data['v1'])
X_data = list(data['v2'])
X_data[:5]

['Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...',
 'Ok lar... Joking wif u oni...',
 "Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's",
 'U dun say so early hor... U c already then say...',
 "Nah I don't think he goes to usf, he lives around here though"]

In [18]:
y_data[:5]

[0, 0, 1, 0, 0]

In [19]:
# 토크나이징을 수행
X_data = [sent.split() for sent in X_data]
X_data[:2]

[['Go',
  'until',
  'jurong',
  'point,',
  'crazy..',
  'Available',
  'only',
  'in',
  'bugis',
  'n',
  'great',
  'world',
  'la',
  'e',
  'buffet...',
  'Cine',
  'there',
  'got',
  'amore',
  'wat...'],
 ['Ok', 'lar...', 'Joking', 'wif', 'u', 'oni...']]

In [27]:
n_of_train = int(len(X_data) * 0.8)
n_of_test = int(len(X_data) - n_of_train)
print(n_of_train, n_of_test)

4457 1115


In [32]:
# 8:2로 train / test 데이터셋을 분리
X_train = X_data[:n_of_train]
y_train = y_data[:n_of_train]
X_test = X_data[n_of_train:]
y_test = y_data[n_of_train:]

In [34]:
X_train[:2]

[['Go',
  'until',
  'jurong',
  'point,',
  'crazy..',
  'Available',
  'only',
  'in',
  'bugis',
  'n',
  'great',
  'world',
  'la',
  'e',
  'buffet...',
  'Cine',
  'there',
  'got',
  'amore',
  'wat...'],
 ['Ok', 'lar...', 'Joking', 'wif', 'u', 'oni...']]

In [35]:
y_train[:2]

[0, 0]

# ELMO를 이용한 분류

In [None]:
!pip install allennlp

In [60]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from allennlp.modules.elmo import Elmo, batch_to_ids

# ELMo 옵션 파일과 가중치 파일 URL
options_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_options.json"
weight_file = "https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_weights.hdf5"

class ELMoEmbedding(nn.Module):
  def __init__(self):
    super(ELMoEmbedding, self).__init__()
    self.elmo = Elmo(options_file, weight_file, num_output_representations=1, dropout=0)

  def forward(self, character_ids):
    embeddings = self.elmo(character_ids) # 이를 입력으로 ELMo 모델로 임베딩을 계산
    return embeddings['elmo_representations'][0] # 문장 단위로 평균을 낸다

class ELMoModel(nn.Module):
  def __init__(self):
    super(ELMoModel, self).__init__()
    self.elmo_embedding = ELMoEmbedding()
    self.fc1 = nn.Linear(1024, 256)
    self.fc2 = nn.Linear(256, 1)

  def forward(self, x):
    x = self.elmo_embedding(x)      # 입력된 문장을 ELMoEmbedding으로 임베딩 계산
    x = x.mean(dim=1)
    x = F.relu(self.fc1(x))
    x = torch.sigmoid(self.fc2(x))
    return x

In [61]:
model = ELMoModel()
print(model)

ELMoModel(
  (elmo_embedding): ELMoEmbedding(
    (elmo): Elmo(
      (_elmo_lstm): _ElmoBiLm(
        (_token_embedder): _ElmoCharacterEncoder(
          (char_conv_0): Conv1d(16, 32, kernel_size=(1,), stride=(1,))
          (char_conv_1): Conv1d(16, 32, kernel_size=(2,), stride=(1,))
          (char_conv_2): Conv1d(16, 64, kernel_size=(3,), stride=(1,))
          (char_conv_3): Conv1d(16, 128, kernel_size=(4,), stride=(1,))
          (char_conv_4): Conv1d(16, 256, kernel_size=(5,), stride=(1,))
          (char_conv_5): Conv1d(16, 512, kernel_size=(6,), stride=(1,))
          (char_conv_6): Conv1d(16, 1024, kernel_size=(7,), stride=(1,))
          (_highways): Highway(
            (_layers): ModuleList(
              (0): Linear(in_features=2048, out_features=4096, bias=True)
              (1): Linear(in_features=2048, out_features=4096, bias=True)
            )
          )
          (_projection): Linear(in_features=2048, out_features=512, bias=True)
        )
        (_elmo_lstm): E

In [62]:
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [45]:
# ELMo는 padding을 넣어 길이를 맞춰줄 필요가 없는데 torch의 DataLoader를 사용하면 길이가 맞지 않을 때 오류가 발생한다

# from torch.utils.data import DataLoader
# class CustomDataset(torch.utils.data.Dataset):
#     def __init__(self, X, y):
#         self.X = X
#         self.y = y
#     def __len__(self):
#         return len(self.X)
#     def __getitem__(self, idx):
#         return self.X[idx], self.y[idx]

# dataset = CustomDataset(X_train, y_train)
# dataloader = DataLoader(dataset, batch_size=60)
# test_dataset = CustomDataset(X_test, y_test)
# test_dataloader = DataLoader(test_dataset, batch_size=60, shuffle=False)

In [69]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
for epoch in range(5):
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0
    for sentence, label in zip(X_train, y_train):
        sentence = [sentence] # batch_to_ids는 List[List[str]]을 기대
        optimizer.zero_grad()  # 옵티마이저의 기울기 초기화
        character_ids = batch_to_ids(sentence).to(device)
        outputs = model(character_ids)[0] # batch가 1개이므로
        label = torch.tensor([label], dtype=torch.float32).view(-1).to(device)  # 레이블을 텐서로 변환 및 차원 맞추기
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f'Epoch {epoch+1}/{5}, Loss: {total_loss/len(X_train)}')

Epoch 1/5, Loss: 0.07178671832040222
Epoch 2/5, Loss: 0.04388555014879575
Epoch 3/5, Loss: 0.031172774039077745
Epoch 4/5, Loss: 0.022764607873046374
Epoch 5/5, Loss: 0.01090305675246012


In [70]:
from sklearn.metrics import accuracy_score

# 모델을 평가 모드로 전환
model.eval()

# 테스트 예측 결과를 저장할 리스트 초기화
all_predictions = []
all_labels = []

with torch.no_grad():  # 테스트 시에는 기울기 계산을 하지 않음
    for sentence, label in zip(X_test, y_test):
        sentence = [sentence]  # batch_to_ids는 List[List[str]] 형식을 기대
        character_ids = batch_to_ids(sentence).to(device)
        outputs = model(character_ids)[0]  # batch가 1개이므로 [0]으로 접근
        prediction = (outputs >= 0.5).float() # 예측값을 0 또는 1로 변환 (0.5를 기준으로)

        # 예측값과 실제 레이블 저장
        all_predictions.append(prediction.cpu().numpy())
        all_labels.append(label)

# 정확도 계산
accuracy = accuracy_score(all_labels, all_predictions)
print(f'Test Accuracy: {accuracy * 100:.2f}%')

Test Accuracy: 98.65%
