In [3]:
import os

# root path
ROOT_PATH = os.path.abspath(".") # this makes compatible absolute path both for local and server

# designate root path for the data
DATA_ROOT_PATH = os.path.join(ROOT_PATH, 'data')

# designate path for each dataset files
LYRIC_PATH = os.path.join(DATA_ROOT_PATH, "lyrics_kor.txt")
BILLBOARD_PATH = os.path.join(DATA_ROOT_PATH, "rawdata_김지훈_201500844.tsv")
GEULSTAGRAM_PATH = os.path.join(DATA_ROOT_PATH, "geulstagram.csv")

print(ROOT_PATH)

/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry


In [4]:
from datetime import datetime
from easydict import EasyDict

# Initialize configuration
CFG = EasyDict()

# Dataset Config as constants
CFG.DEBUG = True
CFG.num_workers = 4
CFG.train_batch_size = 16

# Train configuration
CFG.user_name = "snoop2head"
today = datetime.now().strftime("%m%d_%H:%M")
CFG.file_base_name = f"{CFG.user_name}_{today}"
CFG.model_dir = "skt/ko-gpt-trinity-1.2B-v0.5" # designate the model's name registered on huggingface: https://huggingface.co/skt/ko-gpt-trinity-1.2B-v0.5
CFG.max_token_length = 42
CFG.learning_rate = 5e-5
CFG.weight_decay = 1e-2 # https://paperswithcode.com/method/weight-decay

# training steps configurations
CFG.save_steps = 500
CFG.early_stopping_patience = 5
CFG.warmup_steps = 500
CFG.logging_steps = 100
CFG.evaluation_strategy = 'epoch'
CFG.evaluation_steps = 500

# Directory configuration
CFG.result_dir = os.path.join(ROOT_PATH, "results")
CFG.saved_model_dir = os.path.join(ROOT_PATH, "best_models")
CFG.logging_dir = os.path.join(ROOT_PATH, "logs")
CFG.baseline_dir = os.path.join(ROOT_PATH, 'baseline-code')

print(CFG)

{'DEBUG': True, 'num_workers': 4, 'train_batch_size': 16, 'user_name': 'snoop2head', 'file_base_name': 'snoop2head_1116_14:24', 'model_dir': 'skt/ko-gpt-trinity-1.2B-v0.5', 'max_token_length': 42, 'learning_rate': 5e-05, 'weight_decay': 0.01, 'save_steps': 500, 'early_stopping_patience': 5, 'warmup_steps': 500, 'logging_steps': 100, 'evaluation_strategy': 'epoch', 'evaluation_steps': 500, 'result_dir': '/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/results', 'saved_model_dir': '/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/best_models', 'logging_dir': '/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/logs', 'baseline_dir': '/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/baseline-code'}


In [5]:
import random
import torch
import pandas as pd
import numpy as np

os.environ["TOKENIZERS_PARALLELISM"] = "true"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

def seed_everything(seed) :
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)
seed_everything(42)

In [6]:
# read txt file from line by line
def read_txt(path):
    with open(path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    return lines

# make sampling function from the list
def sampling(list_lines:list, n:int) -> list:
    # sampling
    list_lines = np.random.choice(list_lines, n)
    list_lines = list(list_lines)
    return list_lines

In [8]:
import torch
from transformers import GPT2LMHeadModel

CFG.saved_model_dir = "./results"
# model_path = "/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/results/snoop2head_1114_05_58_loss_0.3655.pt"
CFG.model_dir = "/Users/noopy/Documents/BERT-PROJECTS/kor-3-line-poetry/results/checkpoint-18200"

# Attach Language model Head to the pretrained GPT model
model = GPT2LMHeadModel.from_pretrained(CFG.model_dir) # KoGPT3 shares the same structure as KoGPT2. 

# move the model to device
if torch.cuda.is_available() and CFG.DEBUG == False:
    device = torch.device("cuda:0")
    # model.load_state_dict(torch.load(model_path))
elif CFG.DEBUG == True:
    device = torch.device("cpu")
    # model.load_state_dict(torch.load(model_path, map_location=device))

model.to(device)
model.eval()
print(device)

cpu


In [9]:
if device == torch.device("cuda:0"):
    os.system("nvidia-smi")

In [11]:
from transformers import GPT2Tokenizer, PreTrainedTokenizerFast

CFG.model_dir = "skt/ko-gpt-trinity-1.2B-v0.5" # designate the model's name registered on huggingface: https://huggingface.co/skt/ko-gpt-trinity-1.2B-v0.5

# https://huggingface.co/transformers/preprocessing.html
# Load the Tokenizer: "Fast" means that the tokenizer code is written in Rust Lang
tokenizer = PreTrainedTokenizerFast.from_pretrained(
    CFG.model_dir,
    max_len = CFG.max_token_length,
    padding='max_length',
    add_special_tokens = True,
    return_tensors="pt",
    truncation = True,
    bos_token = "<s>",
    eos_token = "</s>",
    unk_token = "<unk>",
    pad_token = "<pad>",
    mask_token = "<mask>",
)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


In [16]:
def infer_sentence(input_sentence, k, output_token_length):

    # encode the sample sentence
    input_ids = tokenizer.encode(
        input_sentence, 
        add_special_tokens=False, # CLS, SEP, PAD, UNK
        return_tensors="pt"
        )

    # decode the output sequence and print its outcome
    list_decoded_sequences = []
    while len(list_decoded_sequences) < k:
        # generate output sequence from the given encoded input sequence
        output_sequences = model.generate(
            input_ids=input_ids.to(device), 
            do_sample=True, 
            max_length=output_token_length, 
            num_return_sequences=k
            )

        for index, generated_sequence in enumerate(output_sequences):
            generated_sequence = generated_sequence.tolist()
            # remove padding from the generated sequence
            generated_sequence = generated_sequence[:generated_sequence.index(tokenizer.pad_token_id)]
            decoded_sequence = tokenizer.decode(generated_sequence, clean_up_tokenization_spaces=True)
            # print(f"{index} : {decoded_sequence}")
            list_decoded_sequences.append(decoded_sequence)
        list_decoded_sequences = list(set(list_decoded_sequences))
    
    return list_decoded_sequences

input_sentence = "나는 너의"
print(f"Inferred sentences given '{input_sentence}'")
inferred_sentences = infer_sentence(input_sentence, k=10, output_token_length=CFG.max_token_length)
inferred_sentences

Inferred sentences given '나는 너의'


['나는 너의 모든 시간을 알고 싶다',
 '나는 너의 결핍이면 곳곳 너를 두곤 했다.',
 '나는 너의 척추를 꼭꼭 싶어먹었다.',
 '나는 너의 이름을 그 누가 보아도 아름답고 소중한 사람이라 부를 수 있겠다',
 '나는 너의 모든 것을 담고 싶을 만큼 너를 좋아했었다.',
 '나는 너의 마음을 읽지 못했고 늘 같은 말만 했다',
 '나는 너의 그 손을 잡고 싶었지만 차마 두 손으로 너의 머리를 감싸고 말았다',
 '나는 너의 시가 되고 싶다.',
 '나는 너의 마음을 정하지마',
 '나는 너의 그 미소는',
 '나는 너의 머리를 살짝 쥐어박고 말했다']

In [13]:
def make_samhaengshi(input_letter, k, output_token_length):
    list_samhaengshi = []
    for one_letter in input_letter:
        list_decoded_sequences = infer_sentence(one_letter, k=k, output_token_length=output_token_length)
        list_samhaengshi.extend(list_decoded_sequences)
    return list_samhaengshi

make_samhaengshi(input_letter="자탄풍", k=1, output_token_length=CFG.max_token_length)

['자정을 알리는 첫 차 소리', '탄 것 말고 좋은 것 더 드셨음 지금 곁에 계시나.', '풍차가 도는 조각 공원 곱게 수놓인']

In [14]:

def make_residual_samhaengshi(input_letter, k, output_token_length):
    # make letter string into 
    list_samhaengshi = []
    
    # initializing text and index for iteration purpose
    index = 0

    # iterating over the input letter string
    for index, letter_item in enumerate(input_letter):
        # initializing the input_letter
        if index == 0:
            residual_text = letter_item
        else:
            pass
        
        # infer and add to the output
        list_sentences = infer_sentence(residual_text, k, output_token_length) #  옥 -> [옥석은 세월이 가려내리라.], "옥석은 세월이 가려내리라. 수" -> []
        inferred_sentence = list_sentences[0] # first item of the inferred list
        if index != 0:
            # remove previous sentence from the output
            inferred_sentence = inferred_sentence.replace(list_samhaengshi[index-1], "").strip() 
        else:
            pass
        list_samhaengshi.append(inferred_sentence)
        
        # until the end of the input_letter, give the previous residual_text to the next iteration
        if index < len(input_letter) - 1: 
            residual_sentence = list_samhaengshi[index]
            next_letter = input_letter[index + 1]
            residual_text = f"{residual_sentence} {next_letter}" # previous sentence + next letter # "옥석은 세월이 가려내리라. 수"
            # print(residual_text)

        elif index == len(input_letter) - 1: # end of the input_letter
            # Concatenate strings in the list without intersection

            return list_samhaengshi

In [18]:
sample_item = "옥수수수염차" 
inferred_samhaengshi = make_residual_samhaengshi(sample_item, k=1, output_token_length=CFG.max_token_length)
for item in inferred_samhaengshi:
    print(item)
    

옥석은 세월이 가려내리라.
수줍은 아이처럼
수줍은 얼굴 하고 흰 눈만
수줍게 쌓였네.
염라대왕은 청년을 노려보며 착하게 살긴 살았는데 시간을 덧없이 보냈구나.
차돌


### ToDo
- probably make candidate 10 sentences per letter and pick sentences with sentence transformer trained with Next Sentence Prediction Task?
- Filter out similar sentences based on levenstein distance or sentence bert
- remove curse words, person words with pororo or other tools -> either from dataset or inference process

In [12]:
# https://github.com/lovit/levenshtein_finder