<a href="https://colab.research.google.com/github/rickiepark/the-lm-book/blob/main/byte_pair_encoding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<div style="display: flex; justify-content: center;">
    <div style="background-color: #f4f6f7; padding: 15px; width: 80%;">
        <table style="width: 100%">
            <tr>
                <td style="vertical-align: middle;">
                    <span style="font-size: 14px;">
                        A notebook for <a href="https://www.thelmbook.com" target="_blank" rel="noopener">The Hundred-Page Language Models Book</a> by Andriy Burkov<br><br>
                        Code repository: <a href="https://github.com/rickiepark/the-lm-book" target="_blank" rel="noopener">https://github.com/rickiepark/the-lm-book</a>
                    </span>
                </td>
                <td style="vertical-align: middle;">
                    <a href="https://www.thelmbook.com" target="_blank" rel="noopener">
                        <img src="https://thelmbook.com/img/book.png" width="80px" alt="The Hundred-Page Language Models Book">
                    </a>
                </td>
            </tr>
        </table>
    </div>
</div>

# BPE 모델 훈련하기

In [1]:
# 라이브러리 임포트
import os  # 파일과 경로 처리
import urllib.request  # 파일 다운로드
import tarfile  # tar 파일 추출
import pickle  # 토크나이저 저장 및 로딩
import re  # 정규표현식
import time  # 시간 계산
from collections import defaultdict  # 토큰 및 쌍 카운트

def download_file(url, filename):
    """
    중복 다운로드를 막기 위해 파일 존재 여부를 확인하여 로컬에 없는 경우 URL에서 다운로드합니다.

    매개변수:
        url (str): 다운로드할 파일의 URL
        filename (str): 다운로드된 파일을 저장할 로컬 경로

    반환값:
        None: 다운로드 과정에 대한 상태 메시지를 출력합니다.
    """
    # 중복 다운로드를 막기 위해 파일이 존재하는지 확인합니다.
    if not os.path.exists(filename):
        print(f"{url}에서 파일을 다운로드합니다...")
        urllib.request.urlretrieve(url, filename)
        print("다운로드 완료.")
    else:
        print(f"{filename}이 이미 다운로드되어 있습니다.")

def is_within_directory(directory, target):
    """
    경로 탐색 공격(path traversal attack)을 막기 위해
    타깃 경로를 확인하는 보안 검사를 수행합니다.
    추출된 파일이 의도한 디렉토리 안에 있도록 보장합니다.

    매개변수:
        directory (str): 베이스 디렉토리 경로
        target (str): 타깃 경로

    반환값:
        bool: target이 directory 안에 있으면 True, 그렇지 않으면 False
    """
    # 비교를 위해 두 경로를 절대 경로로 바꿉니다.
    abs_directory = os.path.abspath(directory)
    abs_target = os.path.abspath(target)
    # 포함 관계를 확인하기 위해 공통 경로를 추출합니다.
    prefix = os.path.commonprefix([abs_directory, abs_target])
    return prefix == abs_directory

def safe_extract_tar(tar_file, required_files):
    """
    보안 검사를 통해 tar 파일을 안전하게 추출합니다.
    경로 탐색 공격을 방지하고 필요한 파일만 추출합니다.

    매개변수:
        tar_file (str): tar 압축 파일 경로
        required_files (list): 추출할 파일이름 리스트

    반환값:
        None: 파일을 추출하고 과정을 출력합니다.

    예외:
        Exception: 경로 탐색 공격이 감지되는 경우
    """
    with tarfile.open(tar_file, "r:gz") as tar:
        # 압축된 모든 파일에 대해 보안 검사를 수행합니다.
        for member in tar.getmembers():
            if not is_within_directory('.', member.name):
                raise Exception("Tar 파일에서 경로 탐색 공격이 감지되었습니다.")

        # 지정된 파일만 추출합니다.
        for member in tar.getmembers():
            if any(member.name.endswith(file) for file in required_files):
                # 안전을 위해 파일에서 경로를 삭제합니다.
                member.name = os.path.basename(member.name)
                tar.extract(member, '.')
                print(f"{member.name} 추출")

def create_word_generator(filepath):
    """
    텍스트 파일에서 한 번에 하나의 단어를 반환하는 제너레이터를 만듭니다.
    대규모 텍스트 파일을 메모리 효율적으로 처리하는 방법입니다.

    매개변수:
        filepath (str): 텍스트 파일 경로

    반환값:
        generator: 파일에 있는 개별 단어
    """
    def generator():
        with open(filepath, 'r') as f:
            for line in f:
                for word in line.split():
                    yield word
    return generator()

def download_and_prepare_data(url):
    """
    훈련 데이터셋을 다운로드, 추출, 준비합니다.
    다운로드와 보안 검사를 포함한 데이터 추출을 처리합니다.

    매개변수:
        url (str): 다운로드할 데이터셋 URL

    반환값:
        generator: 훈련 데이터를 위한 단어 제너레이터
    """
    required_files = ["train.txt", "test.txt"]
    filename = os.path.basename(url)

    # 필요한 경우 데이터셋을 다운로드합니다.
    download_file(url, filename)

    # 기존에 없는 경우 필요한 파일을 추출합니다.
    if not all(os.path.exists(file) for file in required_files):
        print("파일 추출...")
        safe_extract_tar(filename, required_files)
        print("추출 완료.")
    else:
        print("'train.txt'와 'test.txt'는 이미 추출되어 있습니다.")

    # 단어 제너레이터를 만들어 반환합니다.
    return create_word_generator("train.txt")

def initialize_vocabulary(corpus):
    """
    단어를 문자로 분할하여 말뭉치에서 초기 어휘사전을 만듭니다.
    단어 경계 문자 '_'를 추가하고 고유한 문자를 추적합니다.

    매개변수:
        corpus (iterable): 처리할 단어의 반복자 또는 리스트

    반환값:
        tuple: (토큰화된 단어를 카운트에 매핑하는 어휘사전 딕셔너리, 말뭉치에 있는 고유한 문자 집합)
    """
    # 단어 카운트와 고유한 문자를 추적합니다.
    vocabulary = defaultdict(int)
    charset = set()

    for word in corpus:
        # 단어 경계 문자를 추가하고 문자로 분할합니다.
        word_with_marker = '_' + word
        characters = list(word_with_marker)
        # 고유 문자 집합을 업데이트합니다.
        charset.update(characters)
        # 공백으로 구분된 문자의 문자열을 만듭니다.
        tokenized_word = " ".join(characters)
        # 토큰화된 단어의 카운트를 증가시킵니다.
        vocabulary[tokenized_word] += 1

    return vocabulary, charset

def get_pair_counts(vocabulary):
    """
    어휘사전에 인접한 기호 쌍의 빈도를 카운트합니다.
    가장 빈번한 쌍을 식별하여 합치는데 사용합니다.

    매개변수:
        vocabulary (dict): 토큰화된 단어와 카운트를 매핑하는 딕셔너리

    Returns:
        defaultdict: 토큰 쌍과 빈도 카운트의 매핑
    """
    pair_counts = defaultdict(int)
    for tokenized_word, count in vocabulary.items():
        # 단어를 토큰으로 분할합니다.
        tokens = tokenized_word.split()
        # 단어 빈도로 인접 쌍의 카운트를 할당합니다.
        for i in range(len(tokens) - 1):
            pair = (tokens[i], tokens[i + 1])
            pair_counts[pair] += count
    return pair_counts

def merge_pair(vocab, pair):
    """
    어휘사전에 있는 특정 기호 쌍을 합칩니다.
    토큰 경계를 정확하게 매치하기 위해 정규 표현식을 사용합니다.

    매개변수:
        vocab (dict): 현재 어휘사전 딕셔너리
        pair (tuple): 병합된 토큰 쌍

    반환값:
        dict: 특정 쌍이 병합된 새로운 어휘사전
    """
    new_vocab = {}
    # 바이그램을 매칭하기 위해 정규 표현식을 만듭니다.
    bigram = re.escape(' '.join(pair))
    pattern = re.compile(r"(?<!\S)" + bigram + r"(?!\S)")

    # 어휘사전에 있는 모든 단어를 병합합니다.
    for tokenized_word, count in vocab.items():
        new_tokenized_word = pattern.sub("".join(pair), tokenized_word)
        new_vocab[new_tokenized_word] = count
    return new_vocab

def byte_pair_encoding(corpus, vocab_size):
    """
    부분단어 어휘사전을 학습하기 위해 BPE 알고리즘을 구현합니다.
    목표하는 어휘사전 크기에 도달할 때까지 가장 빈번하게 등장하는 문자 쌍을 반복적으로 병합합니다.

    매개변수:
        corpus (iterable): BPE가 학습할 단어의 반복자 또는 리스트
        vocab_size (int): 병합을 중지할 목표 어휘사전 크기

    반환값:
        tuple: (최종 어휘사전 딕셔너리, 병합 연산 리스트,
               기본 문자 집합, 모든 토큰 집합)
    """
    # 문자 수준 토큰으로 어휘사전을 초기화합니다.
    vocab, charset = initialize_vocabulary(corpus)
    merges = []
    tokens = set(charset)

    # 목표 어휘사전 크기에 도달할 때까지 계속 쌍을 병합합니다.
    while len(tokens) < vocab_size:
        # 인접한 모든 토큰 쌍의 카운트를 얻습니다.
        pair_counts = get_pair_counts(vocab)
        if not pair_counts:
            break

        # 가장 많이 등장하는 쌍을 찾아 기록합니다.
        most_frequent_pair = max(pair_counts, key=pair_counts.get)
        merges.append(most_frequent_pair)

        # 가장 많이 등장하는 쌍을 병합하여 어휘사전을 업데이트합니다.
        vocab = merge_pair(vocab, most_frequent_pair)

        # 새롭게 병합된 토큰을 토큰 집합에 추가합니다.
        new_token = "".join(most_frequent_pair)
        tokens.add(new_token)

    return vocab, merges, charset, tokens

def tokenize_word(word, merges, charset, unk_token="<UNK>"):
    """
    학습된 BPE 병합을 사용해 단일 단어를 토큰화합니다.
    알지 못하는 문자는 UNK 토큰으로 처리합니다.

    매개변수:
        word (str): 토큰화할 단어
        merges (list): 학습된 병합 연산 리스트
        charset (set): 알려진 문자 집합
        unk_token (str): 알지 못하는 문자를 대체할 토큰

    반환값:
        list: 해당 단어에 대한 토큰 리스트
    """
    # 단어 경계 문자를 추가하고 문자로 분할합니다.
    word = '_' + word
    tokens = [char if char in charset else unk_token for char in word]

    # 순서대로 병합 연산을 적용합니다.
    for left, right in merges:
        i = 0
        while i < len(tokens) - 1:
            if tokens[i:i+2] == [left, right]:
                tokens[i:i+2] = [left + right]
            else:
                i += 1
    return tokens

def build_merge_map(merges):
    """
    토큰 쌍을 병합 결과에 매핑하는 딕셔너리를 만듭니다.
    일관된 토큰화를 위해 병합 순서를 유지합니다.

    매개변수:
        merges (list): 병합 연산의 리스트

    반환값:
        dict: 병합 쌍과 (병합된 토큰, 병합 우선순위) 튜플의 매핑
    """
    merge_map = {}
    # 병합 우선순위로 매핑을 만듭니다.
    for i, (left, right) in enumerate(merges):
        merged_token = left + right
        merge_map[(left, right)] = (merged_token, i)
    return merge_map

def tokenize_word_fast(word, merge_map, vocabulary, charset, unk_token="<UNK>"):
    """
    미리 계산된 병합 매핑을 사용해 토큰화 함수를 최적화합니다.
    원본 알고리즘과 동일하지만 더 빠르게 결과를 생성합니다.

    매개변수:
        word (str): 토큰화할 단어
        merge_map (dict): 토큰 쌍과 병합 결과의 매핑
        vocabulary (dict): 현재 어휘사전 딕셔너리
        charset (set): 알려진 문자 집합
        unk_token (str): 알지 못하는 문자를 대체할 토큰

    반환값:
        list: 해당 단어의 토큰 리스트
    """
    # 단어가 어휘사전에 있는지 확인합니다.
    word_with_prefix = '_' + word
    if word_with_prefix in vocabulary:
        return [word_with_prefix]

    # 단어를 문자로 분할하고 알지 못하는 문자를 대체합니다.
    tokens = [char if char in charset else unk_token for char in word_with_prefix]

    # 더 이상 가능한 병합이 없을 때까지 병합을 계속합니다.
    while True:
        # 가능한 모든 병합 연산을 찾습니다.
        pairs_with_positions = []
        for i in range(len(tokens) - 1):
            pair = (tokens[i], tokens[i + 1])
            if pair in merge_map:
                merged_token, merge_priority = merge_map[pair]
                pairs_with_positions.append((i, pair, merged_token, merge_priority))

        # 더 이상 가능한 병합이 없으면 종룟합니다.
        if not pairs_with_positions:
            break

        # 일관성을 위해 병합 우선순위와 위치로 정렬합니다.
        pairs_with_positions.sort(key=lambda x: (x[3], x[0]))

        # 유효한 첫 번째 병합을 적용합니다.
        pos, pair, merged_token, _ = pairs_with_positions[0]
        tokens[pos:pos+2] = [merged_token]

    return tokens

def save_tokenizer(merges, charset, tokens, filename="tokenizer.pkl"):
    """
    나중을 위해 토크나이저 상태를 피클 파일에 저장합니다.

    매개변수:
        merges (list): 병합 연산 리스트
        charset (set): 알려진 문자 집합
        tokens (set): 모든 토큰 집합
        filename (str): 토크나이저 상태를 저장할 경로

    반환값:
        None: 토크나이저를 디스크에 저장합니다.
    """
    with open(filename, "wb") as f:
        pickle.dump({
            "merges": merges,
            "charset": charset,
            "tokens": tokens
        }, f)

def load_tokenizer(filename="tokenizer.pkl"):
    """
    피클 파일에서 토크나이저 상태를 로드합니다.

    매개변수:
        filename (str): 토크나이저 상태가 저장된 경로

    반환값:
        dict: 토크나이저 구성요소가 담긴 딕셔너리
    """
    with open(filename, "rb") as f:
        return pickle.load(f)

In [2]:
# 설정 파라미터
vocab_size = 5_000  # 어휘사전 크기
max_corpus_size = 500_000  # 처리할 최대 단어 개수
data_url = "https://www.thelmbook.com/data/news"  # 데이터셋

# 훈련 데이터를 다운로드하고 전처리합니다
word_gen = download_and_prepare_data(data_url)

# 말뭉치가 최대 크기에 도달할 때까지 단어를 모읍니다.
word_list = []
for word in word_gen:
    word_list.append(word)
    if len(word_list) >= max_corpus_size:
        break

# BPE 토크나이저를 훈련합니다.
print("BPE 토크나이저 훈련...")
vocab, merges, charset, tokens = byte_pair_encoding(word_list, vocab_size)

# 훈련된 토크나이저를 저장합니다.
print("토크나이저 저장...")
save_tokenizer(merges, charset, tokens)

https://www.thelmbook.com/data/news에서 파일을 다운로드합니다...
다운로드 완료.
파일 추출...


  tar.extract(member, '.')


train.txt 추출
test.txt 추출
추출 완료.
BPE 토크나이저 훈련...
토크나이저 저장...


# 훈련된 BPE 토크나이저 테스트

토크나이저가 훈련되고 나면 이를 로드하여 새로운 텍스트에 적용할 수 있습니다:

In [3]:
print("토크나이저 로딩...")
tokenizer = load_tokenizer()

# 로딩된 토크나이저로 샘플 문장을 토큰화합니다.
sentence = "Let's proceed to the language modeling part."

start_time = time.time()
tokenized_sentence = [tokenize_word(word, tokenizer["merges"], tokenizer["charset"]) for word in sentence.split()]
elapsed = time.time() - start_time
print("\n일반적인 구현으로 토큰화한 문장:")
for word, tokens in zip(sentence.split(), tokenized_sentence):
    print(f"{word} -> {tokens}")
print("--- 소요시간: %s 초 ---" % (elapsed))

merge_map = build_merge_map(tokenizer["merges"])
start_time = time.time()
fast_tokenized_sentence = [tokenize_word_fast(word, merge_map, vocab, tokenizer["charset"]) for word in sentence.split()]
elapsed = time.time() - start_time
print("\n빠른 구현으로 토큰화한 문장:")
for word, tokens in zip(sentence.split(), fast_tokenized_sentence):
    print(f"{word} -> {tokens}")
print("--- 소요시간: %s 초 ---" % (time.time() - start_time))

print("\n어휘사전 크기:", len(tokenizer["tokens"]))

토크나이저 로딩...

일반적인 구현으로 토큰화한 문장:
Let's -> ['_Let', "'", 's']
proceed -> ['_proceed']
to -> ['_to']
the -> ['_the']
language -> ['_language']
modeling -> ['_model', 'ing']
part. -> ['_part', '.']
--- 소요시간: 0.016661643981933594 초 ---

빠른 구현으로 토큰화한 문장:
Let's -> ['_Let', "'", 's']
proceed -> ['_proceed']
to -> ['_to']
the -> ['_the']
language -> ['_language']
modeling -> ['_model', 'ing']
part. -> ['_part', '.']
--- 소요시간: 0.0004608631134033203 초 ---

어휘사전 크기: 5000
