In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import (DataLoader, TensorDataset)
from sklearn.metrics import accuracy_score

In [None]:
class SpacingRNN(nn.Module):
    def __init__(self, config):
        super(SpacingRNN, self).__init__()

        # 음절 -> idx
        self.eumjeol_vocab_size = config['eumjeol_vocab_size']
        self.embedding_size = config['embedding_size']
        self.hidden_size = config['hidden_size']
        self.number_of_labels = config['number_of_labels'] # label의 수 = 3

        self.embedding = nn.Embedding(num_embeddings=self.eumjeol_vocab_size, embedding_dim=self.embedding_size, padding_idx=0)

        self.dropout = nn.Dropout(config['dropout'])

        self.bi_lstm = nn.LSTM(input_size=self.embedding_size, hidden_size=self.hidden_size, num_layers=1, bidirectional=True, batch_first=True)

        # (batch_size, max_length, hidden_size*2) -> (batch_size, max_length, number_of_labels)
        self.linear = nn.Linear(in_features=self.hidden_size*2, out_features=self.number_of_labels)

    def forward(self, inputs):
        # (batch_size, max_length) -> (batch_size, max_length, embedding_size)
        eumjeol_inputs = self.embedding(inputs)

        hidden_outputs, hidden_states = self.bi_lstm(eumjeol_inputs)

        # (batch_size, max_length, hidden_size*2)
        hidden_outputs = self.dropout(hidden_outputs)

        # (batch_size, max_length, hidden_size*2) -> (batch_size, max_length, number_of_labels)
        hypothesis = self.linear(hidden_outputs)

        return hypothesis

---

In [None]:
def read_datas(file_path):
    with open(file_path, 'r', encoding='utf-8') as inFile:
        lines = inFile.readlines()

    datas = []
    for line in lines:
        pieces = line.strip().split('\t')
        eumjeol_sequence, label_sequence = pieces[0].split(), pieces[1].split()
        datas.append((eumjeol_sequence, label_sequence))
    return datas

In [None]:
def read_vocab_data(eumjeol_vocab_data_path):
    label2idx, idx2label = {"<PAD>": 0, "B": 1, "I": 2}, {0: "PAD", 1: "B", 2: "I"}
    eumjeol2idx, idx2eumjeol = {}, {}

    with open(eumjeol_vocab_data_path, 'r', encoding='utf-8') as inFile:
        lines = inFile.readlines()

    for line in lines:
        eumjeol = line.strip()
        eumjeol2idx[eumjeol] = len(eumjeol2idx)
        idx2eumjeol[eumjeol2idx[eumjeol]] = eumjeol

    return eumjeol2idx, idx2eumjeol, label2idx, idx2label

In [None]:
def load_dataset(config):
    datas = read_datas(config["input_data"])
    eumjeol2idx, idx2eumjeol, label2idx, idx2label = read_vocab_data(config["eumjeol_vocab"])

    eumjeol_features, eumjeol_feature_lengths, label_features = [], [], []

    for eumjeol_sequence, label_sequence in datas:
        eumjeol_feature = [eumjeol2idx[eumjeol] for eumjeol in eumjeol_sequence]
        label_feature = [label2idx[label] for label in label_sequence]

        eumjeol_feature_length = len(eumjeol_feature)

        eumjeol_feature += [0] * (config["max_length"] - eumjeol_feature_length)
        label_feature += [0] * (config["max_length"] - eumjeol_feature_length)

        eumjeol_features.append(eumjeol_feature)
        eumjeol_feature_lengths.append(eumjeol_feature_length)
        label_features.append(label_feature)

    eumjeol_features = torch.tensor(eumjeol_features, dtype=torch.long)
    eumjeol_feature_lengths = torch.tensor(eumjeol_feature_lengths, dtype=torch.long)
    label_features = torch.tensor(label_features, dtype=torch.long)

    return eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol,label2idx, idx2label

In [None]:
def train(config):
    model = SpacingRNN(config).cuda()

    eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol,label2idx, idx2label = load_dataset(config)

    train_features = TensorDataset(eumjeol_features, eumjeol_feature_lengths, label_features)
    train_dataloader = DataLoader(train_features, shuffle=True, batch_size=config['batch_size'])

    loss_func = nn.CrossEntropyLoss(ignore_index=0)

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(config["epoch"]):
        model.train()
        costs = []

        for step, batch in enumerate(train_dataloader):
            batch = tuple(t.cuda() for t in batch)

            # labels : (batch_size, max_length) -> (batch_size*max_length, )
            inputs, input_lengths, labels = batch

            # hypothesis : (batch_size, max_length, number_of_labels) -> (batch_size*max_length, number_of_labels)
            hypothesis = model(inputs)

            cost = loss_func(hypothesis.reshape(-1, len(label2idx)), labels.flatten())

            ''' Eg. hypothesis.reshape(-1, len(label2idx))의 과정
                :  batch_size=2, max_length=2, number_of_labels(len(label2idx))=3

                hypothesis =
                [
                    [  # 첫 번째 배치
                        [0.1, 0.7, 0.2],  # 첫 번째 위치의 예측 확률
                        [0.6, 0.2, 0.2],  # 두 번째 위치의 예측 확률
                    ],
                    [  # 두 번째 배치
                        [0.3, 0.4, 0.3],  # 첫 번째 위치의 예측 확률
                        [0.8, 0.1, 0.1],  # 두 번째 위치의 예측 확률
                    ]
                ]

                reshaped_hypothesis =
                [
                    [0.1, 0.7, 0.2],  # 첫 번째 배치, 첫 번째 위치
                    [0.6, 0.2, 0.2],  # 첫 번째 배치, 두 번째 위치
                    [0.3, 0.4, 0.3],  # 두 번째 배치, 첫 번째 위치
                    [0.8, 0.1, 0.1]   # 두 번째 배치, 두 번째 위치
                ]

                -> 각 배치의 모든 위치에 대한 예측 결과를 담게 됨
            '''

            ''' flatten 함수
                - 다차원 공간 배열을 1차원으로 평탄화해주는 함수

                Eg.
                labels =
                [
                    [1, 0],  # 첫 번째 배치의 레이블 (첫 번째, 두 번째 위치)
                    [2, 1]   # 두 번째 배치의 레이블 (첫 번째, 두 번째 위치)
                ]

                flattened_labels = [1, 0, 2, 1]
            '''

            cost.backward()

            optimizer.step()

            costs.append(cost.data.item())

        torch.save(model.state_dict(), os.path.join(config["output_dir"], "epoch_{0:d}_pt".format(epoch+1)))

        print("Average Cost : {}".format(np.mean(costs)))

In [None]:
def make_sentence(inputs, predicts, labels, idx2eumjeol, idx2label):
    predict_sentence, correct_sentence = "", ""

    for index in range(len(inputs)):
        eumjeol = idx2eumjeol[inputs[index]]
        correct_label = idx2label[labels[index]]
        predict_label = idx2label[predicts[index]]

        if index == 0:
            predict_sentence += eumjeol
            correct_sentence += eumjeol
            continue

        if predict_label == "B":
            predict_sentence += " "
        predict_sentence += eumjeol

        if correct_label == "B":
            correct_sentence += " "
        correct_sentence += eumjeol

    return predict_sentence, correct_sentence

In [None]:
def tensor2list(input_tensor):
    return input_tensor.cpu().detach().numpy().tolist()

In [None]:
def test(config):
    eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol, label2idx, idx2label = load_dataset(config)

    test_features = TensorDataset(eumjeol_features, eumjeol_feature_lengths, label_features)
    test_dataloader = DataLoader(test_features, shuffle=True, batch_size=1)

    model = SpacingRNN(config).cuda()

    model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["model_name"])))

    total_hypothesis, total_labels = [], []

    for step, batch in enumerate(test_dataloader):
        model.eval()

        batch = tuple(t.cuda() for t in batch)

        inputs, input_lengths, labels = batch

        hypothesis = model(inputs)

        hypothesis = torch.argmax(hypothesis, dim=-1)

        input_length = tensor2list(input_lengths[0])
        input = tensor2list(inputs[0])[:input_length]
        label = tensor2list(labels[0])[:input_length]
        hypothesis = tensor2list(hypothesis[0])[:input_length]

        total_hypothesis += hypothesis
        total_labels += label

        if step < 10:
            predict_sentence, correct_sentence = make_sentence(input, hypothesis, label, idx2eumjeol, idx2label)
            print("정답 : " + correct_sentence)
            print("출력 : " + predict_sentence)
            print()

    print("Accuracy : {}".format(accuracy_score(total_labels, total_hypothesis)))

---

In [None]:
if(__name__=="__main__"):
    root_dir = "/content/drive/MyDrive/연구실/기계학습/rnn"
    output_dir = os.path.join(root_dir, "scaling_rnn_output")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    config = {"mode": "train",
              "model_name":"epoch_5_pt",
              "input_data":os.path.join(root_dir, "train.txt"),
              "output_dir":output_dir,
              "eumjeol_vocab": os.path.join(root_dir, "eumjeol_vocab.txt"),
              "label_vocab": os.path.join(root_dir, "label_vocab.txt"),
              "eumjeol_vocab_size": 2458,
              "embedding_size": 100,
              "hidden_size": 100,
              "max_length": 920,
              "number_of_labels": 3,
              "epoch":5,
              "batch_size":64,
              "dropout":0.3
              }

    if(config["mode"] == "test"):
        train(config)
    else:
        test(config)

  model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["model_name"])))


정답 : 은숙도 오빠도 그때부터 과외공부를 했다.
출력 : 은 숙도 오빠도 그 때부터 과 외공부를 했다.

정답 : 레이첼:
출력 : 레 이첼:

정답 : 내가 일어서자 정화가 따라서 일어났다.
출력 : 내가 일어서 자정화가 따라서 일어났다.

정답 : 우선 수운과 해월의 사상은 그 뿌리를 전통 유교에 두고 있는 것으로 생각되는데, 동학은 개혁 유교 학자에 따라서는 세속화된 유교라고 보는 시각도 있다라고 볼 수 있을 정도로 교리가 전통 유교, 그 중에서도 성리학과 관계되는 경우가 많았다.
출력 : 우선 수운과 해 월의 사상은 그 뿌리를 전통 유교에 두고 있는 것으로 생각되는데, 동학은 개혁 유 교학자에 따라서는 세속화된 유교라고 보는 시각도 있다라고 볼 수 있을 정도로 교리가 전통 유교, 그중에서도 성리학과 관계되는 경우가 많았다.

정답 : "그것만은 확실하게 맹세할 수 있어."
출력 : "그 것만은 확실하게 맹세할 수 있어."

정답 : "얼굴이 왜 그래?"
출력 : "얼굴이 왜 그래?"

정답 : 뒤집으면 영어로 같은 것들이 인쇄되어 있었다.
출력 : 뒤집으면 영어로 같은 것들이인 쇄되어 있었다.

정답 : 강한섭이 키스를 하거나 그녀의 몸을 패팅할 때면 그녀는 온 몸이 뜨겁게 달았다.
출력 : 강한섭 이 키스를 하거나 그녀의 몸을 패팅할 때면 그녀는 온몸이 뜨 겁게 달았다.

정답 : "친구집에 있는다고 어제 연락은 해뒀었어요."
출력 : "친 구 집에 있는다고 어제 연락은 해뒀었어요."

정답 : 다시 수건을 뭉쳐서 민신혜의 입 속으로 쑤셔넣었다.
출력 : 다시 수건을 뭉쳐서 민신혜의 입속으로 쑤셔 넣었다.

Accuracy : 0.9197536873874312
