<a href="https://colab.research.google.com/github/wesslen/seamless_sacrebleu_evaluation/blob/main/notebooks/01_chunking_alignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import spacy
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import PyPDF2
from docx import Document
import re
from dataclasses import dataclass
from collections import defaultdict
import math

@dataclass
class ChunkPair:
    source: str
    target: str
    chunk_type: str  # 'sentence' or 'recursive'
    alignment_score: float = 0.0

class DocumentReader:
    @staticmethod
    def read_docx(file_path: str) -> str:
        """Read content from a .docx file."""
        doc = Document(file_path)
        return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

    @staticmethod
    def read_pdf(file_path: str) -> str:
        """Read content from a PDF file."""
        text = []
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                text.append(page.extract_text())
        return '\n'.join(text)

    @staticmethod
    def read_document(file_path: str) -> str:
        """Read content from either .docx or .pdf file."""
        suffix = Path(file_path).suffix.lower()
        if suffix == '.docx':
            return DocumentReader.read_docx(file_path)
        elif suffix == '.pdf':
            return DocumentReader.read_pdf(file_path)
        else:
            raise ValueError(f"Unsupported file format: {suffix}")

class SentenceTokenizer:
    def __init__(self, language: str):
        """Initialize tokenizer for a specific language."""
        self.language = language
        # Create blank model
        self.nlp = spacy.blank(language)
        # Add sentencizer component
        self.nlp.add_pipe("sentencizer")

        # Common abbreviations by language
        self.abbreviations = {
            'en': {'Mr.', 'Mrs.', 'Dr.', 'Ms.', 'Prof.', 'Sr.', 'Jr.', 'etc.'},
            'es': {'Sr.', 'Sra.', 'Dr.', 'Dra.', 'Prof.', 'etc.'},
            'fr': {'M.', 'Mme.', 'Dr.', 'Prof.', 'etc.'}
        }

    def protect_abbreviations(self, text: str) -> str:
        """Replace periods in abbreviations with a special marker."""
        protected_text = text
        for abbr in self.abbreviations.get(self.language, set()):
            protected_text = re.sub(
                r'\b' + re.escape(abbr) + r'\b',
                lambda m: m.group().replace('.', '@POINT@'),
                protected_text
            )
        return protected_text

    def restore_abbreviations(self, text: str) -> str:
        """Restore the original periods in abbreviations."""
        return text.replace('@POINT@', '.')

    def tokenize(self, text: str) -> List[str]:
        """Split text into sentences."""
        # Protect abbreviations
        text = self.protect_abbreviations(text)

        # Process with spaCy
        doc = self.nlp(text)
        sentences = [str(sent).strip() for sent in doc.sents]

        # Restore abbreviations
        sentences = [self.restore_abbreviations(sent) for sent in sentences]

        return sentences

class RecursiveChunker:
    def __init__(self, min_chunk_size: int = 100, max_chunk_size: int = 500):
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size

    def chunk_text(self, sentences: List[str]) -> List[str]:
        """Recursively chunk text while preserving sentence boundaries."""
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_length = len(sentence)

            # If single sentence exceeds max_chunk_size, keep it as a separate chunk
            if sentence_length > self.max_chunk_size:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                    current_chunk = []
                    current_length = 0
                chunks.append(sentence)
                continue

            # If adding this sentence would exceed max_chunk_size
            if current_length + sentence_length > self.max_chunk_size and current_chunk:
                chunks.append(' '.join(current_chunk))
                current_chunk = []
                current_length = 0

            current_chunk.append(sentence)
            current_length += sentence_length

        # Add remaining chunk if it exists
        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return chunks

class DocumentAligner:
    def __init__(self, source_lang: str = 'es', target_lang: str = 'en'):
        self.source_tokenizer = SentenceTokenizer(source_lang)
        self.target_tokenizer = SentenceTokenizer(target_lang)
        self.chunker = RecursiveChunker()

    def calculate_length_ratio(self, source: str, target: str) -> float:
        """Calculate length ratio between source and target texts."""
        return len(target) / len(source) if len(source) > 0 else float('inf')

    def align_chunks(self, source_chunks: List[str], target_chunks: List[str]) -> List[ChunkPair]:
        """Align chunks based on length ratios and position."""
        aligned_pairs = []
        expected_ratio = sum(len(t) for t in target_chunks) / sum(len(s) for s in source_chunks)

        for i, source_chunk in enumerate(source_chunks):
            if i >= len(target_chunks):
                print(f"Warning: Missing target chunk for source chunk {i+1}")
                continue

            target_chunk = target_chunks[i]
            ratio = self.calculate_length_ratio(source_chunk, target_chunk)

            # Check if ratio is within acceptable range (Â±30% of expected ratio)
            if 0.7 * expected_ratio <= ratio <= 1.3 * expected_ratio:
                aligned_pairs.append(ChunkPair(
                    source=source_chunk,
                    target=target_chunk,
                    chunk_type='sentence' if len(source_chunk.split()) <= 2 else 'recursive',
                    alignment_score=1.0 - abs(ratio - expected_ratio) / expected_ratio
                ))
            else:
                print(f"Warning: Possible misalignment in chunk {i+1}")
                print(f"Source: {source_chunk[:100]}...")
                print(f"Target: {target_chunk[:100]}...")
                aligned_pairs.append(ChunkPair(
                    source=source_chunk,
                    target=target_chunk,
                    chunk_type='unverified',
                    alignment_score=0.5
                ))

        return aligned_pairs

    def process_documents(self, source_path: str, target_path: str) -> List[ChunkPair]:
        """Process and align documents."""
        # Read documents
        source_text = DocumentReader.read_document(source_path)
        target_text = DocumentReader.read_document(target_path)

        # Get sentences
        source_sentences = self.source_tokenizer.tokenize(source_text)
        target_sentences = self.target_tokenizer.tokenize(target_text)

        # Create chunks
        source_chunks = self.chunker.chunk_text(source_sentences)
        target_chunks = self.chunker.chunk_text(target_sentences)

        # Align chunks
        return self.align_chunks(source_chunks, target_chunks)

def process_alignment_file(input_path: str, output_path: str):
    """Process a JSONL file containing document pairs and create aligned chunks."""
    aligner = DocumentAligner()

    # Create output directory if it doesn't exist
    output_dir = Path(output_path).parent
    output_dir.mkdir(parents=True, exist_ok=True)

    aligned_pairs = []

    # Read input file
    with open(input_path, 'r', encoding='utf-8') as f:
        for line in f:
            pair = json.loads(line)
            source_path = pair['source']
            target_path = pair['target']

            print(f"\nProcessing document pair:")
            print(f"Source: {source_path}")
            print(f"Target: {target_path}")

            # Process document pair
            chunk_pairs = aligner.process_documents(source_path, target_path)

            # Convert to required output format
            for pair in chunk_pairs:
                aligned_pairs.append({
                    "source_text": pair.source,
                    "references": [pair.target],
                    "chunk_type": pair.chunk_type,
                    "alignment_score": pair.alignment_score
                })

    # Write output file
    with open(output_path, 'w', encoding='utf-8') as f:
        for pair in aligned_pairs:
            json.dump(pair, f, ensure_ascii=False)
            f.write('\n')

    print(f"\nProcessing complete!")
    print(f"Total aligned pairs: {len(aligned_pairs)}")
    print(f"Output written to: {output_path}")

# Example usage in Jupyter notebook
if __name__ == "__main__":
    # Replace with your actual file paths
    input_file = "document_pairs.jsonl"
    output_file = "aligned_chunks.jsonl"

    process_alignment_file(input_file, output_file)