# Phase 1.1: Train Korean SentencePiece Tokenizer

Train a SentencePiece tokenizer on Korean corpus for vocabulary expansion.

## Contents
1. Setup and Load Corpus
2. Train SentencePiece Tokenizer
3. Analyze Token Distribution
4. Test Tokenization
5. Save Tokenizer

In [None]:
# Setup
import sys
import os
sys.path.append("..")

import sentencepiece as spm
from collections import Counter
import json
from tqdm import tqdm
import matplotlib.pyplot as plt

# Directories
DATA_DIR = "../data"
RAW_DIR = f"{DATA_DIR}/raw"
MODEL_DIR = "../models/tokenizer"

os.makedirs(MODEL_DIR, exist_ok=True)

print(f"Model directory: {MODEL_DIR}")

---
## 1. Setup and Load Corpus

In [None]:
# Check for corpus file
corpus_path = f"{RAW_DIR}/korean_corpus_for_tokenizer.txt"

if os.path.exists(corpus_path):
    file_size_gb = os.path.getsize(corpus_path) / (1024**3)
    print(f"Corpus file: {corpus_path}")
    print(f"Size: {file_size_gb:.2f} GB")
    
    # Count lines
    with open(corpus_path, "r", encoding="utf-8") as f:
        line_count = sum(1 for _ in f)
    print(f"Lines: {line_count:,}")
else:
    print(f"Corpus file not found: {corpus_path}")
    print("Run phase0_data_preparation/02_collect_korean_medical.ipynb first")

In [None]:
# Preview corpus
print("Corpus preview:")
with open(corpus_path, "r", encoding="utf-8") as f:
    for i, line in enumerate(f):
        print(f"\nLine {i+1}: {line[:200]}...")
        if i >= 4:
            break

---
## 2. Train SentencePiece Tokenizer

Following EEVE methodology:
- Train intermediate tokenizer with 40,000 tokens
- Will filter by frequency in next notebook

In [None]:
# SentencePiece training parameters (EEVE-style)
VOCAB_SIZE = 40000  # Intermediate size, will filter later
MODEL_PREFIX = f"{MODEL_DIR}/korean_sp"

training_params = {
    "input": corpus_path,
    "model_prefix": MODEL_PREFIX,
    "vocab_size": VOCAB_SIZE,
    "character_coverage": 0.9995,  # High coverage for Korean
    "model_type": "bpe",  # BPE like most LLMs
    "pad_id": 0,
    "unk_id": 1,
    "bos_id": 2,
    "eos_id": 3,
    "num_threads": 16,
    "train_extremely_large_corpus": True,
    "max_sentence_length": 16384,
    "input_sentence_size": 5000000,  # Sample 5M sentences for training
    "shuffle_input_sentence": True,
}

print("Training parameters:")
for key, value in training_params.items():
    print(f"  {key}: {value}")

In [None]:
# Train tokenizer
print("\nTraining SentencePiece tokenizer...")
print("This may take 10-30 minutes depending on corpus size.")

spm.SentencePieceTrainer.train(**training_params)

print(f"\nTraining complete!")
print(f"Model saved to: {MODEL_PREFIX}.model")
print(f"Vocab saved to: {MODEL_PREFIX}.vocab")

In [None]:
# Verify output files
model_file = f"{MODEL_PREFIX}.model"
vocab_file = f"{MODEL_PREFIX}.vocab"

if os.path.exists(model_file):
    model_size_mb = os.path.getsize(model_file) / (1024**2)
    print(f"Model file: {model_file} ({model_size_mb:.2f} MB)")

if os.path.exists(vocab_file):
    vocab_size_mb = os.path.getsize(vocab_file) / (1024**2)
    print(f"Vocab file: {vocab_file} ({vocab_size_mb:.2f} MB)")

---
## 3. Load and Analyze Tokenizer

In [None]:
# Load trained tokenizer
sp = spm.SentencePieceProcessor()
sp.Load(f"{MODEL_PREFIX}.model")

print(f"Vocabulary size: {sp.GetPieceSize()}")
print(f"\nSpecial tokens:")
print(f"  PAD: {sp.pad_id()} -> '{sp.IdToPiece(sp.pad_id())}'")
print(f"  UNK: {sp.unk_id()} -> '{sp.IdToPiece(sp.unk_id())}'")
print(f"  BOS: {sp.bos_id()} -> '{sp.IdToPiece(sp.bos_id())}'")
print(f"  EOS: {sp.eos_id()} -> '{sp.IdToPiece(sp.eos_id())}'")

In [None]:
# Analyze vocabulary composition
def analyze_vocabulary(sp_processor):
    """Analyze token types in vocabulary"""
    
    korean_tokens = []
    english_tokens = []
    number_tokens = []
    special_tokens = []
    other_tokens = []
    
    for i in range(sp_processor.GetPieceSize()):
        piece = sp_processor.IdToPiece(i)
        
        # Remove SentencePiece prefix
        clean_piece = piece.replace("▁", "")
        
        if not clean_piece:
            special_tokens.append(piece)
        elif any('가' <= c <= '힣' for c in clean_piece):
            korean_tokens.append(piece)
        elif clean_piece.isascii() and clean_piece.isalpha():
            english_tokens.append(piece)
        elif clean_piece.isdigit():
            number_tokens.append(piece)
        else:
            other_tokens.append(piece)
    
    return {
        "korean": korean_tokens,
        "english": english_tokens,
        "numbers": number_tokens,
        "special": special_tokens,
        "other": other_tokens,
    }

vocab_analysis = analyze_vocabulary(sp)

print("Vocabulary composition:")
for category, tokens in vocab_analysis.items():
    pct = len(tokens) / sp.GetPieceSize() * 100
    print(f"  {category}: {len(tokens)} ({pct:.1f}%)")

In [None]:
# Show sample tokens from each category
print("\nSample Korean tokens:")
print(vocab_analysis["korean"][:30])

print("\nSample English tokens:")
print(vocab_analysis["english"][:20])

print("\nSample other tokens:")
print(vocab_analysis["other"][:20])

In [None]:
# Visualize vocabulary distribution
categories = list(vocab_analysis.keys())
counts = [len(tokens) for tokens in vocab_analysis.values()]

plt.figure(figsize=(10, 6))
plt.bar(categories, counts, color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'])
plt.title('Korean Tokenizer Vocabulary Composition')
plt.xlabel('Token Category')
plt.ylabel('Count')

for i, (cat, count) in enumerate(zip(categories, counts)):
    plt.text(i, count + 200, f'{count}', ha='center')

plt.tight_layout()
plt.savefig(f"{MODEL_DIR}/vocab_composition.png", dpi=150)
plt.show()

print(f"Chart saved to {MODEL_DIR}/vocab_composition.png")

---
## 4. Test Tokenization

In [None]:
# Test sentences
test_sentences = [
    "안녕하세요, 저는 의료 AI 어시스턴트입니다.",
    "환자가 발열과 기침 증상을 호소합니다.",
    "당뇨병은 혈당 조절에 문제가 생기는 대사 질환입니다.",
    "MRI 검사 결과 뇌에 이상 소견이 발견되었습니다.",
    "고혈압 환자는 염분 섭취를 줄여야 합니다.",
]

print("Tokenization test:")
print("=" * 60)

for sentence in test_sentences:
    pieces = sp.EncodeAsPieces(sentence)
    ids = sp.EncodeAsIds(sentence)
    
    print(f"\nOriginal: {sentence}")
    print(f"Tokens ({len(pieces)}): {pieces}")
    print(f"IDs: {ids}")

In [None]:
# Compare with original MedGemma tokenizer
from transformers import AutoTokenizer

print("Comparing with MedGemma tokenizer...")

try:
    medgemma_tokenizer = AutoTokenizer.from_pretrained("google/gemma-2b")  # Use base Gemma for comparison
    
    print("\nToken count comparison:")
    print(f"{'Sentence':<50} | {'Korean SP':>10} | {'Gemma':>10} | {'Ratio':>8}")
    print("-" * 85)
    
    for sentence in test_sentences:
        korean_tokens = len(sp.EncodeAsPieces(sentence))
        gemma_tokens = len(medgemma_tokenizer.encode(sentence))
        ratio = gemma_tokens / korean_tokens
        
        short_sentence = sentence[:47] + "..." if len(sentence) > 50 else sentence
        print(f"{short_sentence:<50} | {korean_tokens:>10} | {gemma_tokens:>10} | {ratio:>7.2f}x")
    
    print("\nNote: Lower ratio with Korean tokenizer = more efficient Korean encoding")
    
except Exception as e:
    print(f"Could not load Gemma tokenizer: {e}")
    print("This comparison is optional.")

---
## 5. Save Tokenizer Info

In [None]:
# Save tokenizer info
tokenizer_info = {
    "model_path": f"{MODEL_PREFIX}.model",
    "vocab_path": f"{MODEL_PREFIX}.vocab",
    "vocab_size": sp.GetPieceSize(),
    "model_type": "bpe",
    "character_coverage": 0.9995,
    "special_tokens": {
        "pad_id": sp.pad_id(),
        "unk_id": sp.unk_id(),
        "bos_id": sp.bos_id(),
        "eos_id": sp.eos_id(),
    },
    "vocabulary_composition": {
        category: len(tokens) for category, tokens in vocab_analysis.items()
    },
}

info_path = f"{MODEL_DIR}/tokenizer_info.json"
with open(info_path, "w", encoding="utf-8") as f:
    json.dump(tokenizer_info, f, indent=2)

print(f"Tokenizer info saved to {info_path}")
print(json.dumps(tokenizer_info, indent=2))

In [None]:
# Save Korean tokens list for filtering
korean_tokens_path = f"{MODEL_DIR}/korean_tokens_all.txt"
with open(korean_tokens_path, "w", encoding="utf-8") as f:
    for token in vocab_analysis["korean"]:
        f.write(f"{token}\n")

print(f"Korean tokens saved to {korean_tokens_path}")
print(f"Total Korean tokens: {len(vocab_analysis['korean'])}")

In [None]:
print("\n" + "=" * 60)
print("Korean Tokenizer Training Complete!")
print("=" * 60)
print(f"\nTokenizer saved to: {MODEL_DIR}")
print(f"Vocabulary size: {sp.GetPieceSize()}")
print(f"Korean tokens: {len(vocab_analysis['korean'])}")
print("\nNext steps:")
print("  1. Run 02_filter_tokens.ipynb to filter by frequency")
print("  2. Run 03_merge_tokenizers.ipynb to merge with MedGemma")