In [1]:
import os
import glob
import json
import argparse
import gc
import re
import warnings
import numpy as np
import torch
from bs4 import BeautifulSoup
from tqdm import tqdm
from colorama import init, Fore, Style

import ebooklib
from ebooklib import epub
from nltk.tokenize import sent_tokenize
import nltk
from kiwipiepy import Kiwi
from sentence_transformers import SentenceTransformer, util

In [2]:
from dataclasses import dataclass

In [3]:
@dataclass
class Chapter:
    """챕터 정보를 담는 데이터 클래스"""

    idx: int
    text: str
    sents: list[str]
    embeddings: list[float] | None = None

    def __init__(self, idx, text, sents):
        self.idx = idx
        self.text = text
        self.sents = sents
        self.embedding = None

In [4]:
# 초기화
init(autoreset=True)
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

warnings.filterwarnings('ignore')

[nltk_data] Error loading punkt: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:992)>


In [5]:
@dataclass
class Config:
    input_dir: str
    device: str
    threshold: float
    max_merge_n: int
    merge_bonus: float

    """
    def __init__(self, args):
        self.input_dir = args.input_dir
        self.device = args.device
        self.threshold = args.threshold
        self.max_merge_n = args.max_merge_n
        self.merge_bonus = args.merge_bonus
    """

    def __init__(self):
        pass

In [6]:
class HierarchicalAligner:
    def __init__(self, args):
        self.args = args
        self.device = args.device
        
        print(f"{Fore.CYAN}[System] Loading Models...{Style.RESET_ALL}")
        self.kiwi = Kiwi()
        self.model = SentenceTransformer('sentence-transformers/LaBSE', device=self.device)
        self.model.eval()
        
        self.total_matches = 0

    def clean_text(self, text: str) -> str:
        if not text: return ""
        soup = BeautifulSoup(text, 'html.parser')
        for tag in soup(["script", "style", "nav", "sup", "footer", "header", "table"]):
            tag.decompose()
        text = soup.get_text()
        return re.sub(r'\s+', ' ', text).strip()

    def load_epub_chapters(self, filepath: str, lang: str) -> list[Chapter]:
        try:
            book = epub.read_epub(filepath)
        except Exception as e:
            print(f"{Fore.RED}[Error] Failed to read {filepath}: {e}{Style.RESET_ALL}")
            return []

        chapters = []
        chap_idx = 0

        for item_id, _ in book.spine:
            item = book.get_item_with_id(item_id)
            if not item: continue
            
            if item.get_type() == ebooklib.ITEM_DOCUMENT:
                content = item.get_content().decode('utf-8', errors='ignore')
                cleaned = self.clean_text(content)
                
                if len(cleaned) < 50: continue

                if lang == 'en':
                    sents = sent_tokenize(cleaned)
                else:
                    sents = [s.text for s in self.kiwi.split_into_sents(cleaned)]
                
                sents = [s for s in sents if len(s.strip()) > 1]
                
                if len(sents) > 0:
                    chapters.append(Chapter(chap_idx, cleaned, sents))
                    chap_idx += 1
        
        return chapters

    def get_chapter_embedding(self, chapter: Chapter):
        summary = " ".join(chapter.sents[:10]) 
        return self.model.encode(summary, convert_to_tensor=True, device=self.device)

    def align_chapters(self, en_chapters, kr_chapters):
        n_en = len(en_chapters)
        n_kr = len(kr_chapters)
        
        if n_en == 0 or n_kr == 0: return []

        print(f"{Fore.YELLOW}[Chapter Align] Aligning {n_en} EN chapters with {n_kr} KR chapters...{Style.RESET_ALL}")

        en_embs = torch.stack([self.get_chapter_embedding(c) for c in en_chapters])
        kr_embs = torch.stack([self.get_chapter_embedding(c) for c in kr_chapters])
        
        sim_matrix = util.cos_sim(en_embs, kr_embs).cpu().numpy()
        
        dp = np.full((n_en + 1, n_kr + 1), -np.inf)
        backtrack = np.zeros((n_en + 1, n_kr + 1), dtype=int)
        dp[0, 0] = 0
        
        SKIP_PENALTY = -0.2 

        for i in range(n_en + 1):
            for j in range(n_kr + 1):
                if i == 0 and j == 0: continue
                
                best_score = -np.inf
                best_action = 0 # 0:Stop, 1:Match, 2:Skip EN, 3:Skip KR

                # Match
                if i > 0 and j > 0:
                    score = dp[i-1, j-1] + sim_matrix[i-1, j-1]
                    if score > best_score:
                        best_score, best_action = score, 1
                
                # Skip EN
                if i > 0:
                    score = dp[i-1, j] + SKIP_PENALTY
                    if score > best_score:
                        best_score, best_action = score, 2
                
                # Skip KR
                if j > 0:
                    score = dp[i, j-1] + SKIP_PENALTY
                    if score > best_score:
                        best_score, best_action = score, 3
                
                dp[i, j] = best_score
                backtrack[i, j] = best_action
        
        matched_pairs = []
        curr_i, curr_j = n_en, n_kr
        
        while curr_i > 0 or curr_j > 0:
            action = backtrack[curr_i, curr_j]
            if action == 0: break
            
            if action == 1: # Match
                sim = sim_matrix[curr_i-1, curr_j-1]
                if sim > 0.3: 
                    matched_pairs.append((en_chapters[curr_i-1], kr_chapters[curr_j-1]))
                curr_i -= 1; curr_j -= 1
            elif action == 2: # Skip EN
                curr_i -= 1
            elif action == 3: # Skip KR
                curr_j -= 1
            else:
                curr_i -= 1; curr_j -= 1
                
        matched_pairs.reverse()
        return matched_pairs

    def align_sentences_in_chapter(self, en_sents, kr_sents, json_file, log_file):
        if not en_sents or not kr_sents: return

        en_emb = self.model.encode(en_sents, convert_to_tensor=True, device=self.device)
        kr_emb = self.model.encode(kr_sents, convert_to_tensor=True, device=self.device)
        
        n_en = len(en_sents)
        n_kr = len(kr_sents)
        max_n = self.args.max_merge_n # 옵션에서 가져오기
        
        sim_matrix = util.cos_sim(en_emb, kr_emb).cpu().numpy()
        dp = np.full((n_en + 1, n_kr + 1), -np.inf)
        backtrack = np.zeros((n_en + 1, n_kr + 1), dtype=int)
        dp[0, 0] = 0
        
        SKIP_PENALTY = -0.3
        
        # Action Codes:
        # 1: 1:1 Match
        # 2: Skip EN
        # 3: Skip KR
        # 10 + k: 1:k Match (Merge KR k sentences) -> ex) 12: 1:2, 13: 1:3
        # 20 + k: k:1 Match (Merge EN k sentences) -> ex) 22: 2:1, 23: 3:1

        for r in range(n_en + 1):
            for c in range(n_kr + 1):
                if r == 0 and c == 0: continue
                best_score = -np.inf
                best_action = 0 
                
                # (1) Skip EN
                if r > 0:
                    if dp[r-1, c] + SKIP_PENALTY > best_score:
                        best_score, best_action = dp[r-1, c] + SKIP_PENALTY, 2
                
                # (2) Skip KR
                if c > 0:
                    if dp[r, c-1] + SKIP_PENALTY > best_score:
                        best_score, best_action = dp[r, c-1] + SKIP_PENALTY, 3
                        
                # (3) 1:1 Match
                if r > 0 and c > 0:
                    s = dp[r-1, c-1] + sim_matrix[r-1, c-1]
                    if s > best_score: best_score, best_action = s, 1
                
                # (4) 1:N Match (EN 1 : KR k)
                # k: 2 ~ max_n
                for k in range(2, max_n + 1):
                    if r > 0 and c >= k:
                        # KR 벡터 평균 계산
                        vec_avg = torch.mean(kr_emb[c-k : c], dim=0)
                        sim = util.cos_sim(en_emb[r-1], vec_avg).item()
                        
                        # 병합 보너스: (k-1) * merge_bonus
                        # 예: 2개 합치면 1배, 3개 합치면 2배 보너스
                        bonus = self.args.merge_bonus * (k - 1)
                        s = dp[r-1, c-k] + sim + bonus
                        
                        if s > best_score:
                            best_score, best_action = s, 10 + k

                # (5) N:1 Match (EN k : KR 1)
                for k in range(2, max_n + 1):
                    if r >= k and c > 0:
                        # EN 벡터 평균 계산
                        vec_avg = torch.mean(en_emb[r-k : r], dim=0)
                        sim = util.cos_sim(vec_avg, kr_emb[c-1]).item()
                        
                        bonus = self.args.merge_bonus * (k - 1)
                        s = dp[r-k, c-1] + sim + bonus
                        
                        if s > best_score:
                            best_score, best_action = s, 20 + k

                dp[r, c] = best_score
                backtrack[r, c] = best_action

        # --- Backtracking ---
        path = []
        curr_r, curr_c = n_en, n_kr
        
        while curr_r > 0 or curr_c > 0:
            action = backtrack[curr_r, curr_c]
            if action == 0: break
            
            if action == 1: # 1:1
                sim = dp[curr_r, curr_c] - dp[curr_r-1, curr_c-1]
                path.append({'type': '1:1', 'en_idx': [curr_r-1], 'kr_idx': [curr_c-1], 'sim': sim})
                curr_r -= 1; curr_c -= 1

            elif action == 2: # Skip EN
                path.append({'type': 'SKIP_EN', 'en_idx': [curr_r-1], 'kr_idx': [], 'sim': 0})
                curr_r -= 1

            elif action == 3: # Skip KR
                curr_c -= 1
            
            elif action >= 20: # k:1 Match (EN k개 병합)
                k = action - 20
                sim_total = dp[curr_r, curr_c] - dp[curr_r-k, curr_c-1]
                # 보너스 제외한 순수 유사도 추정 (보너스 빼기)
                bonus = self.args.merge_bonus * (k - 1)
                real_sim = sim_total - bonus
                
                en_indices = list(range(curr_r-k, curr_r))
                path.append({'type': f'{k}:1', 'en_idx': en_indices, 'kr_idx': [curr_c-1], 'sim': real_sim})
                curr_r -= k; curr_c -= 1
                
            elif action >= 10: # 1:k Match (KR k개 병합)
                k = action - 10
                sim_total = dp[curr_r, curr_c] - dp[curr_r-1, curr_c-k]
                bonus = self.args.merge_bonus * (k - 1)
                real_sim = sim_total - bonus
                
                kr_indices = list(range(curr_c-k, curr_c))
                path.append({'type': f'1:{k}', 'en_idx': [curr_r-1], 'kr_idx': kr_indices, 'sim': real_sim})
                curr_r -= 1; curr_c -= k

            else:
                # Should not happen
                curr_r -= 1; curr_c -= 1
        
        path.reverse()

        # --- 파일 쓰기 ---
        for item in path:
            en_text_list = [en_sents[i] for i in item['en_idx']]
            en_text_full = " ".join(en_text_list)
            
            if item['type'] == 'SKIP_EN':
                log_msg = f"[SKIP] (Score: {item['sim']:.4f})\n EN: {en_text_full}\n KR: (No Match)\n"
                log_file.write(log_msg + "-"*50 + "\n")
            else:
                kr_text_list = [kr_sents[i] for i in item['kr_idx']]
                kr_text_full = " ".join(kr_text_list)
                
                sim_score = item['sim']
                status = "MATCH" if sim_score >= self.args.threshold else "REJECT"
                
                log_msg = f"[{status}] (Score: {sim_score:.4f} | Type: {item['type']})\n EN: {en_text_full}\n KR: {kr_text_full}\n"
                log_file.write(log_msg + "-"*50 + "\n")

                if sim_score >= self.args.threshold:
                    data = {
                        "system": "전문 번역가 스타일로 번역하세요.",
                        "user": en_text_full,
                        "assistant": kr_text_full
                    }
                    json_file.write(json.dumps(data, ensure_ascii=False) + "\n")
                    self.total_matches += 1

    def process_pair(self, en_path, kr_path):
        book_title = os.path.basename(en_path).replace('en_', '').replace('.epub', '')
        output_json = f"dataset_{book_title}.jsonl"
        output_log = f"dataset_{book_title}.log"
        
        print(f"\n{Fore.BLUE}=== Processing: {book_title} ==={Style.RESET_ALL}")
        
        en_chapters = self.load_epub_chapters(en_path, 'en')
        kr_chapters = self.load_epub_chapters(kr_path, 'kr')
        
        print(f" - Loaded Chapters: EN={len(en_chapters)}, KR={len(kr_chapters)}")

        matched_chapters = self.align_chapters(en_chapters, kr_chapters)
        print(f" - Matched Chapters: {len(matched_chapters)} pairs")

        self.total_matches = 0
        with open(output_json, 'w', encoding='utf-8') as f_json, \
             open(output_log, 'w', encoding='utf-8') as f_log:
            
            f_log.write(f"=== Log for {book_title} (Max Merge: {self.args.max_merge_n}) ===\n\n")
            
            pbar = tqdm(matched_chapters, desc="Aligning Chapters")
            for en_chap, kr_chap in pbar:
                f_log.write(f"### Chapter Pair: EN[{en_chap.idx}] - KR[{kr_chap.idx}] ###\n")
                self.align_sentences_in_chapter(en_chap.sents, kr_chap.sents, f_json, f_log)
        
        print(f"{Fore.GREEN} >> Finished {book_title}. Total Matches: {self.total_matches}{Style.RESET_ALL}")
        print(f" >> Log saved to: {output_log}")

    def run(self):
        en_files = sorted(glob.glob(os.path.join(self.args.input_dir, "en_*.epub")))
        for en_file in en_files:
            kr_file = en_file.replace("en_", "kr_")
            if os.path.exists(kr_file):
                self.process_pair(en_file, kr_file)
            else:
                print(f"Skipping {en_file} (No pair found)")

In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_dir', default='data')
    parser.add_argument('--device', default='cuda' if torch.cuda.is_available() else 'cpu')
    parser.add_argument('--threshold', type=float, default=0.65, help='Similarity threshold')
    
    # [신규 옵션] N개 문장 병합 설정
    parser.add_argument('--max_merge_n', type=int, default=5, help='Max sentences to merge (default: 5)')
    parser.add_argument('--merge_bonus', type=float, default=0.001, help='Bonus for merging sentences')
    
    args = parser.parse_args()
    
    if not os.path.exists(args.input_dir):
        os.makedirs(args.input_dir)
    
    aligner = HierarchicalAligner(args)
    aligner.run()

In [7]:
config = Config()
config.input_dir = "data"
config.device = 'cuda' if torch.cuda.is_available() else 'cpu'
config.threshold = 0.65
config.max_merge_n = 5
config.merge_bonus = 0.001

aligner = HierarchicalAligner(config)
aligner.run()

[System] Loading Models...


Quantization is not supported for ArchType::neon. Fall back to non-quantized model.



=== Processing: Harry Potter and the Goblet of Fire ===
 - Loaded Chapters: EN=38, KR=43
[Chapter Align] Aligning 38 EN chapters with 43 KR chapters...
 - Matched Chapters: 38 pairs


Aligning Chapters: 100%|████████████████████████████████████████████████████████████████████████████████| 38/38 [20:17<00:00, 32.05s/it]


 >> Finished Harry Potter and the Goblet of Fire. Total Matches: 5566
 >> Log saved to: dataset_Harry Potter and the Goblet of Fire.log

=== Processing: Harry Potter and the Sorcerors Stone ===
 - Loaded Chapters: EN=18, KR=22
[Chapter Align] Aligning 18 EN chapters with 22 KR chapters...
 - Matched Chapters: 17 pairs


Aligning Chapters: 100%|████████████████████████████████████████████████████████████████████████████████| 17/17 [08:50<00:00, 31.22s/it]


 >> Finished Harry Potter and the Sorcerors Stone. Total Matches: 4485
 >> Log saved to: dataset_Harry Potter and the Sorcerors Stone.log
