In [5]:
!pip install torch

import torch
import torch.nn as nn
import torch.nn.functional as F

import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"



In [17]:
class MultiHeadAttention(nn.Module):
  def __init__(self, embedding_dim, num_heads = 8):
    super(MultiHeadAttention, self).__init__()
    self.embedding_dim = embedding_dim
    self.num_heads = num_heads

    assert embedding_dim % self.num_heads == 0

    self.projection_dim = embedding_dim // num_heads
    self.query_dense = nn.Linear(embedding_dim, embedding_dim)
    self.key_dense = nn.Linear(embedding_dim, embedding_dim)
    self.value_dense = nn.Linear(embedding_dim, embedding_dim)
    self.dense = nn.Linear(embedding_dim, embedding_dim)

  def scaled_dot_product_attention(self, query, key, value):
    matmul_qk = torch.matmul(query, key.transpose(-2, -1))
    depth = torch.tensor(key.shape[-1], dtype=torch.float32)
    logits = matmul_qk / torch.sqrt(depth)
    attention_weights = F.softmax(logits, dim=-1)
    output = torch.matmul(attention_weights, value)
    return output, attention_weights

  def split_heads(self, x, batch_size):
    x = x.view(batch_size, -1, self.num_heads, self.projection_dim)
    return x.transpose(1, 2)

  def forward(self, inputs):
    batch_size = inputs.size(0)
    query = self.query_dense(inputs)
    key = self.key_dense(inputs)
    value = self.value_dense(inputs)
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)
    scaled_attention, _ = self.scaled_dot_product_attention(query, key, value)
    scaled_attention = scaled_attention.transpose(1, 2)
    concat_attention = scaled_attention.reshape(batch_size, -1, self.embedding_dim)
    outputs = self.dense(concat_attention)
    return outputs

In [18]:
class TransformerBlock(nn.Module):
  def __init__(self, embedding_dim, num_heads, dff, rate = 0.1):
    super(TransformerBlock, self).__init__()
    self.att = MultiHeadAttention(embedding_dim, num_heads)
    self.ffn = nn.Sequential(
        nn.Linear(embedding_dim, dff),
        nn.ReLU(),
        nn.Linear(dff, embedding_dim)
    )
    self.layernorm1 = nn.LayerNorm(embedding_dim, eps = 1e-6)
    self.layernorm2 = nn.LayerNorm(embedding_dim, eps = 1e-6)
    self.dropout1 = nn.Dropout(rate)
    self.dropout2 = nn.Dropout(rate)

  def forward(self, inputs):
    attn_output = self.att(inputs)
    attn_output = self.dropout1(attn_output)
    out1 = self.layernorm1(inputs + attn_output)
    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output)
    return self.layernorm2(out1 + ffn_output)

In [19]:
class TokenAndPositionEmbedding(nn.Module):
  def __init__(self, max_len, vocab_size, embedding_dim):
    super(TokenAndPositionEmbedding, self).__init__()
    self.token_emb = nn.Embedding(vocab_size, embedding_dim)
    self.pos_emb = nn.Embedding(max_len, embedding_dim)

  def forward(self, x):
    positions = torch.arange(0, x.size(1), dtype=torch.long).unsqueeze(0).to(x.device)
    positions = self.pos_emb(positions)
    x = self.token_emb(x)
    return x + positions

In [20]:
import pandas as pd

train = pd.read_csv("/content/drive/MyDrive/랩인턴/sentimental_analysis/train.csv")
test = pd.read_csv("/content/drive/MyDrive/랩인턴/sentimental_analysis/test (1).csv")

train_new_row = train.columns
test_new_row = test.columns

train.columns = ['y_1', 'y_2', 'x']
train = train.append(pd.Series(), ignore_index=True)
train.loc[0] = train_new_row

test.columns = ['y_1', 'y_2', 'x']
test = test.append(pd.Series(), ignore_index=True)
test.loc[0] = test_new_row

train.dropna(inplace = True)
test.dropna(inplace= True)

train.reset_index(drop=True)
test.reset_index(drop=True)

X_train = list(train['x'])
y_train = list(train['y_1'])

X_test = list(test['x'])
y_test = list(test['y_1'])

y_train[0] = 1
y_test[0] = 0

  train = train.append(pd.Series(), ignore_index=True)
  train = train.append(pd.Series(), ignore_index=True)
  test = test.append(pd.Series(), ignore_index=True)
  test = test.append(pd.Series(), ignore_index=True)


In [21]:
!pip install transformers
from transformers import BertTokenizer

max_len = 512
vocab_size = 22000

tokenizer = BertTokenizer(vocab_file = "/content/drive/MyDrive/랩인턴/translator1/wiki-vocab.txt", max_length = max_len)



In [None]:
#@title 기본 제목 텍스트
pad_token_id = tokenizer.pad_token_id

train_x = []
test_x = []

for x in X_train:
  x = tokenizer.encode(x, max_length  = max_len, truncation=True)
  rest = max_len - len(x)
  x = torch.tensor(x + [pad_token_id] * rest)
  train_x.append(x)

for x in X_test:
  x = tokenizer.encode(x, max_length= max_len, truncation=True)
  rest = max_len - len(x)
  x = torch.tensor(x + [pad_token_id] * rest)
  test_x.append(x)

In [22]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

class CustomDataset(Dataset):
  def __init__(self, data, targets, tokenizer):
    self.data =data
    self.targets = targets
    self.tokenizer = tokenizer

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    text = self.data[idx]
    tokens = self.tokenizer.encode(text, add_special_tokens=True)
    label = 1 if self.targets[idx] == 1.0 else 0
    return torch.tensor(tokens), torch.tensor(label)

def collate_fn(batch):
  inputs, targets = zip(*batch)
  inputs = pad_sequence(inputs, padding_value=0, batch_first=True)
  targets = torch.stack(targets, dim=0)
  return inputs, targets

In [23]:
train_dataset = CustomDataset(X_train, y_train, tokenizer)
test_dataset = CustomDataset(X_test, y_test, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn = collate_fn)

In [10]:
embedding_dim = 32
num_heads = 2
dff = 32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class MyModel(nn.Module):
  def __init__(self, max_len, vocab_size, embedding_dim, num_heads, dff):
    super(MyModel, self).__init__()
    self.embedding_layer = TokenAndPositionEmbedding(max_len, vocab_size, embedding_dim)
    self.transformer_block = TransformerBlock(embedding_dim, num_heads, dff)
    self.dropout1 = nn.Dropout(0.1)
    self.linear1 = nn.Linear(embedding_dim, 20)
    self.dropout2 = nn.Dropout(0.1)
    self.linear2 = nn.Linear(20, 2)

  def forward(self, inputs):
    x = self.embedding_layer(inputs)
    x = self.transformer_block(x)
    x = torch.mean(x, dim=1)
    x = x.squeeze(-1)
    x = self.dropout1(x)
    x = self.linear1(x)
    x = self.dropout2(x)
    x = self.linear2(x)
    return F.log_softmax(x, dim=-1)

model = MyModel(max_len, vocab_size, embedding_dim, num_heads, dff).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

for epoch in range(2):
  model.train()
  total_loss = 0.0
  total_samples = 0
  print("epoch: ", epoch)
  for inputs, targets in train_loader:
    inputs = inputs.to(device)
    targets = targets.to(device)

    optimizer.zero_grad()
    output = model(inputs)

    loss = criterion(output, targets)
    loss.backward()
    optimizer.step()

    total_loss += loss.item() * len(inputs)
    total_samples += len(inputs)
  epoch_loss = total_loss / total_samples
  print("Epoch %d, Training Loss: %.4f" %(epoch + 1, epoch_loss))

epoch:  0
Epoch 1, Training Loss: 0.5812
epoch:  1
Epoch 2, Training Loss: 0.5615


In [11]:
model.eval()
with torch.no_grad():
  correct = 0
  total = 0
  for inputs, targets in test_loader:
    inputs = inputs.to(device)
    targets = targets.to(device)

    output = model(inputs)
    _, predicted = torch.max(output, dim=1)
    total += targets.size(0)
    correct += (predicted == targets).sum().item()

accuracy = correct / total
print("테스트 정확도: %.4f" % accuracy)

테스트 정확도: 0.7022


In [None]:
def predict_sentiment(text):
  # token_ids = tokenizer(text)
  # token_ids = tokenizer.convert_tokens_to_ids(token_ids)
  # token_ids = torch.tensor(token_ids).unsqueeze(0).to(device)

  token_ids = tokenizer.encode(text)
  token_ids = torch.tensor(token_ids).unsqueeze(0).to(device)
  token_ids = pad_sequence(token_ids, padding_value=0, batch_first=True)

  with torch.no_grad():
    output = model(token_ids)
    _, preds = torch.max(output, dim=1)

    print(preds)

  if preds.item() == 1:
    print("긍정 리뷰입니다.")
  else:
    print("부정 리뷰입니다.")

In [None]:
predict_sentiment("너무 좋아요 ㅎㅎ")

tensor([1], device='cuda:0')
긍정 리뷰입니다.


In [None]:
X_test[2]

'제품도 빨리 배송해주시고 꼼꼼하게 잘챙겨주셨어요'

*2*. Multi class classification

In [24]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

class CustomDataset(Dataset):
  def __init__(self, data, targets, tokenizer):
    self.data =data
    self.targets = targets
    self.tokenizer = tokenizer

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    text = self.data[idx]
    tokens = self.tokenizer.encode(text, add_special_tokens=True)
    label = torch.tensor(self.targets[idx], dtype=torch.long)
    return torch.tensor(tokens), label

def collate_fn(batch):
  inputs, targets = zip(*batch)
  inputs = pad_sequence(inputs, padding_value=0, batch_first=True)
  targets = torch.stack(targets, dim=0)
  return inputs, targets

In [25]:
X_train = list(train['x'])
y_train = list(train['y_2'])

X_test = list(test['x'])
y_test = list(test['y_2'])

y_train[0] = 5
y_test[0] = 2

In [35]:
for i in range(len(y_train)):
  y = y_train[i]
  y_train[i] = y -1

for i in range(len(y_test)):
  y = y_test[i]
  y_test[i] = y - 1

In [None]:
y_train

In [26]:
train_dataset = CustomDataset(X_train, y_train, tokenizer)
test_dataset = CustomDataset(X_test, y_test, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn = collate_fn)

In [37]:
embedding_dim = 32
num_heads = 2
dff = 32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class MyModel(nn.Module):
  def __init__(self, max_len, vocab_size, embedding_dim, num_heads, dff):
    super(MyModel, self).__init__()
    self.embedding_layer = TokenAndPositionEmbedding(max_len, vocab_size, embedding_dim)
    self.transformer_block = TransformerBlock(embedding_dim, num_heads, dff)
    self.dropout1 = nn.Dropout(0.1)
    self.linear1 = nn.Linear(embedding_dim, 20)
    self.dropout2 = nn.Dropout(0.1)
    self.linear2 = nn.Linear(20, 5)

  def forward(self, inputs):
    x = self.embedding_layer(inputs)
    x = self.transformer_block(x)
    x = torch.mean(x, dim=1)
    x = x.squeeze(-1)
    x = self.dropout1(x)
    x = self.linear1(x)
    x = self.dropout2(x)
    x = self.linear2(x)
    return F.log_softmax(x, dim=-1)

model = MyModel(max_len, vocab_size, embedding_dim, num_heads, dff).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

for epoch in range(2):
  model.train()
  total_loss = 0.0
  total_samples = 0
  print("epoch: ", epoch)
  for inputs, targets in train_loader:
    inputs = inputs.to(device)
    targets = targets.to(device)

    optimizer.zero_grad()
    output = model(inputs)

    loss = criterion(output, targets)
    loss.backward()
    optimizer.step()

    total_loss += loss.item() * len(inputs)
    total_samples += len(inputs)
  epoch_loss = total_loss / total_samples
  print("Epoch %d, Training Loss: %.4f" %(epoch + 1, epoch_loss))

epoch:  0
Epoch 1, Training Loss: 1.1491
epoch:  1
Epoch 2, Training Loss: 1.1241


In [41]:
import numpy as np

def calculate_rmse(predictions, targets):
  mse = np.mean((predictions - targets) ** 2)
  rmse = np.sqrt(mse)
  return rmse

predictions = []
targets_list = []

model.eval()
with torch.no_grad():
  for inputs, targets in test_loader:
    inputs = inputs.to(device)
    targets = targets.to(device)

    output = model(inputs)
    _, predicted = torch.max(output, dim=1)
    predictions.extend(predicted.cpu().numpy())
    targets_list.extend(targets.cpu().numpy())

predictions = np.array(predictions)
targets = np.array(targets_list)
rmse = calculate_rmse(predictions, targets)

print("RMSE: ", rmse)

RMSE:  1.8568295394694192


In [42]:
def predict_score(text):
  token_ids = tokenizer.encode(text)
  token_ids = torch.tensor(token_ids).unsqueeze(0).to(device)
  token_ids = pad_sequence(token_ids, padding_value=0, batch_first=True)

  with torch.no_grad():
    output = model(token_ids)
    predicted_class = torch.argmax(output, dim=1)
    predicted_score = predicted_class.item() + 1
  return predicted_score

In [45]:
predict_score("별로예요")

2