# v22.1 Adaptive Data Preprocessing

Preprocesses collected Korean data for Neural Sparse training with adaptive processing based on data size.

## Key Features

- **Adaptive Processing**: Automatically selects processing method based on data size
  - < 1M rows: Use pandas (local processing)
  - >= 1M rows: Use Spark via EMR (distributed processing)
- **Text Cleaning**: HTML removal, whitespace normalization, special character filtering
- **Quality Filtering**: Duplicate removal, length constraints, Korean content ratio check

## Processing Pipeline

| Step | Description | Output |
|------|-------------|--------|
| 1 | Count rows to determine method | Processing strategy |
| 2 | Text cleaning | Clean text fields |
| 3 | Quality filtering | Filtered dataset |
| 4 | Save preprocessed data | JSONL output |

In [None]:
import sys
import os
from pathlib import Path


def find_project_root() -> Path:
    """Find the project root directory."""
    current = Path.cwd()
    for parent in [current] + list(current.parents):
        if (parent / "src").exists() or (parent / ".git").exists():
            return parent
    return Path.cwd().parent.parent


PROJECT_ROOT = find_project_root()
sys.path.insert(0, str(PROJECT_ROOT))

print(f"Project root: {PROJECT_ROOT}")

In [None]:
import json
import re
import logging
from dataclasses import dataclass
from typing import Any, Callable, Iterator, Optional, Protocol
from abc import ABC, abstractmethod

from tqdm.auto import tqdm

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

## 1. Configuration

In [None]:
# Load environment variables
try:
    from dotenv import load_dotenv
    load_dotenv(PROJECT_ROOT / ".env")
    print("Loaded .env file")
except ImportError:
    print("python-dotenv not installed, using system environment variables")

# Environment configuration
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "")
EMR_INSTANCE_ID = os.getenv("EMR_INSTANCE_ID", "")
EMR_SPARK_PORT = int(os.getenv("EMR_SPARK_PORT", "15002"))

print(f"S3_BUCKET_NAME: {S3_BUCKET_NAME or '(not set)'}")
print(f"EMR_INSTANCE_ID: {EMR_INSTANCE_ID or '(not set)'}")
print(f"EMR_SPARK_PORT: {EMR_SPARK_PORT}")

In [None]:
@dataclass(frozen=True)
class PreprocessingConfig:
    """Configuration for data preprocessing."""

    # Thresholds
    large_data_threshold: int = 1_000_000  # 1M rows
    min_text_length: int = 2
    max_text_length: int = 512
    min_korean_ratio: float = 0.30  # 30% Korean characters

    # Paths
    input_dir: Path = PROJECT_ROOT / "data" / "huggingface_korean"
    output_dir: Path = PROJECT_ROOT / "data" / "v22.1" / "preprocessed"
    s3_output_prefix: str = "spark-meta/neural/preprocessed/"

    # Spark settings
    spark_host: str = "localhost"
    spark_port: int = EMR_SPARK_PORT


config = PreprocessingConfig()

# Ensure output directory exists
config.output_dir.mkdir(parents=True, exist_ok=True)

print(f"Input directory: {config.input_dir}")
print(f"Output directory: {config.output_dir}")
print(f"Large data threshold: {config.large_data_threshold:,} rows")

## 2. Text Cleaning Functions

Shared cleaning logic used by both pandas and Spark implementations.

In [None]:
# Precompiled regex patterns for performance
HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
WHITESPACE_PATTERN = re.compile(r"\s+")
SPECIAL_CHAR_PATTERN = re.compile(
    r"[^\w\s가-힣a-zA-Z0-9.,!?;:'\"()\[\]{}\-]+"
)
KOREAN_CHAR_PATTERN = re.compile(r"[가-힣]")


def remove_html_tags(text: str) -> str:
    """Remove HTML tags from text.

    Args:
        text: Input text possibly containing HTML tags.

    Returns:
        Text with HTML tags removed.
    """
    if not text:
        return ""
    return HTML_TAG_PATTERN.sub("", text)


def normalize_whitespace(text: str) -> str:
    """Normalize whitespace by replacing multiple spaces with single space.

    Args:
        text: Input text with possible whitespace issues.

    Returns:
        Text with normalized whitespace.
    """
    if not text:
        return ""
    return WHITESPACE_PATTERN.sub(" ", text).strip()


def remove_special_characters(text: str) -> str:
    """Remove special characters, keeping Korean, English, numbers, and basic punctuation.

    Args:
        text: Input text possibly containing special characters.

    Returns:
        Text with special characters removed.
    """
    if not text:
        return ""
    return SPECIAL_CHAR_PATTERN.sub("", text)


def clean_text(text: str) -> str:
    """Apply full text cleaning pipeline.

    Pipeline:
    1. Remove HTML tags
    2. Normalize whitespace
    3. Remove special characters

    Args:
        text: Raw input text.

    Returns:
        Cleaned text.
    """
    if not text:
        return ""
    text = remove_html_tags(text)
    text = normalize_whitespace(text)
    text = remove_special_characters(text)
    return text.strip()


# Test cleaning functions
test_cases = [
    "<p>안녕하세요</p>  world!",
    "Hello\n\n\tWorld",
    "한글@#$%테스트!!!",
    "<b>Bold</b> text with &nbsp; entities",
]

print("Text cleaning tests:")
for test in test_cases:
    cleaned = clean_text(test)
    print(f"  '{test}' -> '{cleaned}'")

## 3. Quality Filtering Functions

In [None]:
def calculate_korean_ratio(text: str) -> float:
    """Calculate the ratio of Korean characters in text.

    Args:
        text: Input text.

    Returns:
        Ratio of Korean characters (0.0 to 1.0).
    """
    if not text:
        return 0.0
    korean_chars = len(KOREAN_CHAR_PATTERN.findall(text))
    total_chars = len(text.replace(" ", ""))
    if total_chars == 0:
        return 0.0
    return korean_chars / total_chars


def is_valid_text(
    text: str,
    min_length: int = 2,
    max_length: int = 512,
    min_korean_ratio: float = 0.30,
) -> bool:
    """Check if text meets quality criteria.

    Args:
        text: Input text to validate.
        min_length: Minimum character length.
        max_length: Maximum character length.
        min_korean_ratio: Minimum ratio of Korean characters.

    Returns:
        True if text meets all criteria, False otherwise.
    """
    if not text:
        return False

    text_length = len(text)
    if text_length < min_length or text_length > max_length:
        return False

    korean_ratio = calculate_korean_ratio(text)
    return korean_ratio >= min_korean_ratio


def is_valid_pair(
    text1: str,
    text2: str,
    min_length: int = 2,
    max_length: int = 512,
    min_korean_ratio: float = 0.30,
) -> bool:
    """Check if a text pair meets quality criteria.

    Args:
        text1: First text in pair.
        text2: Second text in pair.
        min_length: Minimum character length.
        max_length: Maximum character length.
        min_korean_ratio: Minimum ratio of Korean characters.

    Returns:
        True if pair meets all criteria, False otherwise.
    """
    # Both texts must be valid
    if not is_valid_text(text1, min_length, max_length, min_korean_ratio):
        return False
    if not is_valid_text(text2, min_length, max_length, min_korean_ratio):
        return False

    # text1 and text2 should not be identical
    if text1 == text2:
        return False

    return True


# Test quality filters
test_pairs = [
    ("안녕하세요", "반갑습니다"),  # Valid
    ("hi", "there"),  # Invalid: Low Korean ratio
    ("a", "b"),  # Invalid: Too short
    ("동일한텍스트", "동일한텍스트"),  # Invalid: Identical
    ("한글" * 300, "테스트"),  # Invalid: Too long
]

print("\nQuality filter tests:")
for t1, t2 in test_pairs:
    valid = is_valid_pair(
        t1, t2,
        min_length=config.min_text_length,
        max_length=config.max_text_length,
        min_korean_ratio=config.min_korean_ratio
    )
    print(f"  ('{t1[:20]}...', '{t2[:20]}...') -> {valid}")

## 4. Data Processing Statistics

In [None]:
@dataclass
class ProcessingStats:
    """Statistics for data processing pipeline."""

    initial_count: int = 0
    after_cleaning: int = 0
    after_dedup: int = 0
    after_identical_filter: int = 0
    after_length_filter: int = 0
    after_korean_filter: int = 0
    final_count: int = 0

    def print_summary(self) -> None:
        """Print processing statistics summary."""
        print("\n" + "=" * 60)
        print("Processing Statistics")
        print("=" * 60)
        print(f"  Initial count:            {self.initial_count:>12,}")
        print(f"  After cleaning:           {self.after_cleaning:>12,}")
        print(f"  After deduplication:      {self.after_dedup:>12,} "
              f"(-{self.after_cleaning - self.after_dedup:,})")
        print(f"  After identical filter:   {self.after_identical_filter:>12,} "
              f"(-{self.after_dedup - self.after_identical_filter:,})")
        print(f"  After length filter:      {self.after_length_filter:>12,} "
              f"(-{self.after_identical_filter - self.after_length_filter:,})")
        print(f"  After Korean filter:      {self.after_korean_filter:>12,} "
              f"(-{self.after_length_filter - self.after_korean_filter:,})")
        print(f"  Final count:              {self.final_count:>12,}")
        print("=" * 60)

        if self.initial_count > 0:
            retention = self.final_count / self.initial_count * 100
            print(f"  Retention rate: {retention:.1f}%")

## 5. Data Processor Protocol and Implementations

Abstract interface with pandas and Spark implementations.

In [None]:
class DataProcessor(ABC):
    """Abstract base class for data processors."""

    @abstractmethod
    def count_rows(self, input_path: Path) -> int:
        """Count total rows in input file."""
        pass

    @abstractmethod
    def process(self, input_path: Path, output_path: Path) -> ProcessingStats:
        """Process data and return statistics."""
        pass

    @abstractmethod
    def cleanup(self) -> None:
        """Clean up resources."""
        pass

In [None]:
class PandasProcessor(DataProcessor):
    """Pandas-based data processor for smaller datasets (< 1M rows)."""

    def __init__(self, config: PreprocessingConfig):
        """Initialize pandas processor.

        Args:
            config: Preprocessing configuration.
        """
        self.config = config
        logger.info("Initialized PandasProcessor")

    def count_rows(self, input_path: Path) -> int:
        """Count rows by reading file line by line.

        Args:
            input_path: Path to input JSONL file.

        Returns:
            Number of rows in file.
        """
        count = 0
        with open(input_path, "r", encoding="utf-8") as f:
            for _ in f:
                count += 1
        return count

    def process(self, input_path: Path, output_path: Path) -> ProcessingStats:
        """Process data using pandas.

        Args:
            input_path: Path to input JSONL file.
            output_path: Path to output JSONL file.

        Returns:
            Processing statistics.
        """
        import pandas as pd

        stats = ProcessingStats()

        # Load data
        logger.info(f"Loading data from {input_path}")
        df = pd.read_json(input_path, lines=True)
        stats.initial_count = len(df)
        logger.info(f"Loaded {stats.initial_count:,} rows")

        # Identify text columns
        text_columns = self._identify_text_columns(df)
        logger.info(f"Text columns: {text_columns}")

        # Step 1: Text cleaning
        logger.info("Step 1: Cleaning text...")
        for col in text_columns:
            df[col] = df[col].fillna("").astype(str).apply(clean_text)
        stats.after_cleaning = len(df)
        logger.info(f"After cleaning: {stats.after_cleaning:,} rows")

        # Step 2: Remove exact duplicates
        logger.info("Step 2: Removing duplicates...")
        df = df.drop_duplicates(subset=text_columns)
        stats.after_dedup = len(df)
        logger.info(f"After dedup: {stats.after_dedup:,} rows")

        # Step 3: Remove pairs where text1 == text2
        logger.info("Step 3: Removing identical pairs...")
        if len(text_columns) >= 2:
            col1, col2 = text_columns[:2]
            df = df[df[col1] != df[col2]]
        stats.after_identical_filter = len(df)
        logger.info(f"After identical filter: {stats.after_identical_filter:,} rows")

        # Step 4: Length filtering
        logger.info("Step 4: Applying length filter...")
        for col in text_columns:
            df = df[
                (df[col].str.len() >= self.config.min_text_length) &
                (df[col].str.len() <= self.config.max_text_length)
            ]
        stats.after_length_filter = len(df)
        logger.info(f"After length filter: {stats.after_length_filter:,} rows")

        # Step 5: Korean content ratio check
        logger.info("Step 5: Checking Korean content ratio...")
        for col in text_columns:
            df[f"{col}_korean_ratio"] = df[col].apply(calculate_korean_ratio)
            df = df[df[f"{col}_korean_ratio"] >= self.config.min_korean_ratio]
            df = df.drop(columns=[f"{col}_korean_ratio"])
        stats.after_korean_filter = len(df)
        stats.final_count = stats.after_korean_filter
        logger.info(f"After Korean filter: {stats.after_korean_filter:,} rows")

        # Save output
        logger.info(f"Saving to {output_path}")
        df.to_json(output_path, orient="records", lines=True, force_ascii=False)
        logger.info(f"Saved {stats.final_count:,} rows")

        return stats

    def _identify_text_columns(self, df) -> list[str]:
        """Identify text columns in dataframe.

        Args:
            df: Input dataframe.

        Returns:
            List of text column names.
        """
        # Priority order of column names
        priority_pairs = [
            ("source", "target"),
            ("text1", "text2"),
            ("anchor", "positive"),
            ("query", "document"),
        ]

        for col1, col2 in priority_pairs:
            if col1 in df.columns and col2 in df.columns:
                return [col1, col2]

        # Fallback: return string columns
        return [col for col in df.columns if df[col].dtype == "object"][:2]

    def cleanup(self) -> None:
        """Clean up resources (no-op for pandas)."""
        pass

In [None]:
class SparkProcessor(DataProcessor):
    """Spark-based data processor for larger datasets (>= 1M rows)."""

    def __init__(self, config: PreprocessingConfig):
        """Initialize Spark processor.

        Args:
            config: Preprocessing configuration.

        Raises:
            ConnectionError: If unable to connect to Spark.
        """
        self.config = config
        self.spark = None
        self._connect()

    def _connect(self) -> None:
        """Connect to EMR Spark via Spark Connect.

        Prerequisites:
        Run in terminal first:
        aws ssm start-session --target $EMR_INSTANCE_ID \
          --document-name AWS-StartPortForwardingSession \
          --parameters '{"portNumber":["15002"],"localPortNumber":["15002"]}'

        Raises:
            ConnectionError: If unable to connect to Spark.
        """
        try:
            from pyspark.sql import SparkSession

            spark_url = f"sc://{self.config.spark_host}:{self.config.spark_port}"
            logger.info(f"Connecting to Spark at {spark_url}")

            self.spark = (
                SparkSession.builder
                .remote(spark_url)
                .appName("korean_neural_sparse_v22.1")
                .getOrCreate()
            )

            # Test connection
            version = self.spark.version
            logger.info(f"Connected to Spark {version}")

        except Exception as e:
            logger.error(f"Failed to connect to Spark: {e}")
            raise ConnectionError(
                f"Unable to connect to Spark at {self.config.spark_host}:"
                f"{self.config.spark_port}. "
                f"Ensure SSM port forwarding is active. Error: {e}"
            )

    def count_rows(self, input_path: Path) -> int:
        """Count rows using Spark.

        Args:
            input_path: Path to input JSONL file.

        Returns:
            Number of rows in file.
        """
        df = self.spark.read.json(str(input_path))
        return df.count()

    def process(self, input_path: Path, output_path: Path) -> ProcessingStats:
        """Process data using Spark.

        Args:
            input_path: Path to input JSONL file.
            output_path: Path to output JSONL file.

        Returns:
            Processing statistics.
        """
        from pyspark.sql import functions as F
        from pyspark.sql.types import StringType, FloatType, BooleanType

        stats = ProcessingStats()

        # Register UDFs
        clean_text_udf = F.udf(clean_text, StringType())
        korean_ratio_udf = F.udf(calculate_korean_ratio, FloatType())

        # Load data
        logger.info(f"Loading data from {input_path}")
        df = self.spark.read.json(str(input_path))
        stats.initial_count = df.count()
        logger.info(f"Loaded {stats.initial_count:,} rows")

        # Identify text columns
        text_columns = self._identify_text_columns(df)
        logger.info(f"Text columns: {text_columns}")

        # Step 1: Text cleaning
        logger.info("Step 1: Cleaning text...")
        for col in text_columns:
            df = df.withColumn(
                col,
                clean_text_udf(F.coalesce(F.col(col), F.lit("")))
            )
        stats.after_cleaning = df.count()
        logger.info(f"After cleaning: {stats.after_cleaning:,} rows")

        # Step 2: Remove exact duplicates
        logger.info("Step 2: Removing duplicates...")
        df = df.dropDuplicates(text_columns)
        stats.after_dedup = df.count()
        logger.info(f"After dedup: {stats.after_dedup:,} rows")

        # Step 3: Remove pairs where text1 == text2
        logger.info("Step 3: Removing identical pairs...")
        if len(text_columns) >= 2:
            col1, col2 = text_columns[:2]
            df = df.filter(F.col(col1) != F.col(col2))
        stats.after_identical_filter = df.count()
        logger.info(f"After identical filter: {stats.after_identical_filter:,} rows")

        # Step 4: Length filtering
        logger.info("Step 4: Applying length filter...")
        for col in text_columns:
            df = df.filter(
                (F.length(F.col(col)) >= self.config.min_text_length) &
                (F.length(F.col(col)) <= self.config.max_text_length)
            )
        stats.after_length_filter = df.count()
        logger.info(f"After length filter: {stats.after_length_filter:,} rows")

        # Step 5: Korean content ratio check
        logger.info("Step 5: Checking Korean content ratio...")
        for col in text_columns:
            df = df.withColumn(f"{col}_korean_ratio", korean_ratio_udf(F.col(col)))
            df = df.filter(
                F.col(f"{col}_korean_ratio") >= self.config.min_korean_ratio
            )
            df = df.drop(f"{col}_korean_ratio")
        stats.after_korean_filter = df.count()
        stats.final_count = stats.after_korean_filter
        logger.info(f"After Korean filter: {stats.after_korean_filter:,} rows")

        # Save output
        logger.info(f"Saving to {output_path}")

        # Write to local as single file
        temp_output = output_path.parent / "temp_spark_output"
        df.coalesce(1).write.mode("overwrite").json(str(temp_output))

        # Move single file to final location
        import glob
        import shutil

        json_files = glob.glob(str(temp_output / "*.json"))
        if json_files:
            shutil.move(json_files[0], str(output_path))
        shutil.rmtree(str(temp_output), ignore_errors=True)

        logger.info(f"Saved {stats.final_count:,} rows")

        # Also save to S3 if bucket is configured
        if S3_BUCKET_NAME:
            s3_path = f"s3://{S3_BUCKET_NAME}/{self.config.s3_output_prefix}"
            logger.info(f"Saving to S3: {s3_path}")
            df.write.mode("overwrite").json(s3_path)

        return stats

    def _identify_text_columns(self, df) -> list[str]:
        """Identify text columns in Spark dataframe.

        Args:
            df: Input Spark dataframe.

        Returns:
            List of text column names.
        """
        columns = df.columns

        # Priority order of column names
        priority_pairs = [
            ("source", "target"),
            ("text1", "text2"),
            ("anchor", "positive"),
            ("query", "document"),
        ]

        for col1, col2 in priority_pairs:
            if col1 in columns and col2 in columns:
                return [col1, col2]

        # Fallback: return first two string columns
        string_cols = [
            f.name for f in df.schema.fields
            if str(f.dataType) == "StringType()"
        ]
        return string_cols[:2]

    def cleanup(self) -> None:
        """Stop Spark session."""
        if self.spark:
            logger.info("Stopping Spark session")
            self.spark.stop()
            self.spark = None

## 6. Adaptive Processor Factory

In [None]:
def create_processor(
    input_path: Path,
    config: PreprocessingConfig,
    force_pandas: bool = False,
    force_spark: bool = False,
) -> tuple[DataProcessor, int]:
    """Create appropriate processor based on data size.

    Args:
        input_path: Path to input file.
        config: Preprocessing configuration.
        force_pandas: Force use of pandas processor.
        force_spark: Force use of Spark processor.

    Returns:
        Tuple of (processor instance, row count).

    Raises:
        ValueError: If both force_pandas and force_spark are True.
    """
    if force_pandas and force_spark:
        raise ValueError("Cannot force both pandas and Spark")

    # Quick row count using file iteration
    logger.info(f"Counting rows in {input_path}...")
    row_count = 0
    with open(input_path, "r", encoding="utf-8") as f:
        for _ in tqdm(f, desc="Counting rows", unit="rows"):
            row_count += 1

    logger.info(f"Total rows: {row_count:,}")

    # Determine processor type
    use_spark = (
        force_spark or
        (not force_pandas and row_count >= config.large_data_threshold)
    )

    if use_spark:
        logger.info(
            f"Using Spark processor (rows={row_count:,} >= "
            f"threshold={config.large_data_threshold:,})"
        )
        try:
            processor = SparkProcessor(config)
        except ConnectionError as e:
            logger.warning(f"Spark connection failed: {e}")
            logger.warning("Falling back to Pandas processor")
            processor = PandasProcessor(config)
    else:
        logger.info(
            f"Using Pandas processor (rows={row_count:,} < "
            f"threshold={config.large_data_threshold:,})"
        )
        processor = PandasProcessor(config)

    return processor, row_count

## 7. Main Processing Pipeline

In [None]:
def preprocess_data(
    input_path: Path,
    output_path: Path,
    config: PreprocessingConfig,
    force_pandas: bool = False,
    force_spark: bool = False,
) -> ProcessingStats:
    """Run the full preprocessing pipeline.

    Args:
        input_path: Path to input JSONL file.
        output_path: Path to output JSONL file.
        config: Preprocessing configuration.
        force_pandas: Force use of pandas processor.
        force_spark: Force use of Spark processor.

    Returns:
        Processing statistics.
    """
    logger.info("=" * 60)
    logger.info("Starting Data Preprocessing Pipeline")
    logger.info("=" * 60)

    # Create processor
    processor, row_count = create_processor(
        input_path, config, force_pandas, force_spark
    )

    try:
        # Process data
        stats = processor.process(input_path, output_path)

        # Print summary
        stats.print_summary()

        return stats

    finally:
        # Cleanup
        processor.cleanup()

## 8. Execute Preprocessing

In [None]:
# Define input and output paths
INPUT_FILE = config.input_dir / "huggingface_synonym_pairs.jsonl"
OUTPUT_FILE = config.output_dir / "combined_clean.jsonl"

print(f"Input file: {INPUT_FILE}")
print(f"Output file: {OUTPUT_FILE}")
print(f"Input exists: {INPUT_FILE.exists()}")

In [None]:
# Check if input file exists
if not INPUT_FILE.exists():
    raise FileNotFoundError(
        f"Input file not found: {INPUT_FILE}\n"
        f"Please run 00_data_loading.ipynb first to generate the input data."
    )

# Run preprocessing
# Set force_pandas=True to always use pandas (for testing)
# Set force_spark=True to always use Spark (requires EMR connection)
stats = preprocess_data(
    input_path=INPUT_FILE,
    output_path=OUTPUT_FILE,
    config=config,
    force_pandas=False,  # Change to True for local testing
    force_spark=False,   # Change to True to force Spark
)

## 9. Verify Output

In [None]:
import pandas as pd

# Load and inspect output
if OUTPUT_FILE.exists():
    df_output = pd.read_json(OUTPUT_FILE, lines=True)

    print(f"Output file: {OUTPUT_FILE}")
    print(f"Total rows: {len(df_output):,}")
    print(f"Columns: {list(df_output.columns)}")
    print(f"\nFile size: {OUTPUT_FILE.stat().st_size / 1024 / 1024:.2f} MB")

    print("\nSample rows:")
    display(df_output.head(5))
else:
    print(f"Output file not found: {OUTPUT_FILE}")

In [None]:
# Statistics by pair type
if OUTPUT_FILE.exists() and "pair_type" in df_output.columns:
    print("\nDistribution by pair_type:")
    pair_type_counts = df_output["pair_type"].value_counts()
    for pair_type, count in pair_type_counts.items():
        pct = count / len(df_output) * 100
        print(f"  {pair_type:<25}: {count:>8,} ({pct:.1f}%)")

In [None]:
# Verify Korean content quality
if OUTPUT_FILE.exists():
    text_cols = ["source", "target"] if "source" in df_output.columns else df_output.columns[:2].tolist()

    print("\nKorean content ratio verification:")
    for col in text_cols:
        if col in df_output.columns:
            ratios = df_output[col].apply(calculate_korean_ratio)
            print(f"  {col}:")
            print(f"    Min: {ratios.min():.2f}")
            print(f"    Max: {ratios.max():.2f}")
            print(f"    Mean: {ratios.mean():.2f}")
            print(f"    Median: {ratios.median():.2f}")

In [None]:
# Text length distribution
if OUTPUT_FILE.exists():
    print("\nText length distribution:")
    for col in text_cols:
        if col in df_output.columns:
            lengths = df_output[col].str.len()
            print(f"  {col}:")
            print(f"    Min: {lengths.min()}")
            print(f"    Max: {lengths.max()}")
            print(f"    Mean: {lengths.mean():.1f}")
            print(f"    Median: {lengths.median():.1f}")

## 10. Summary

In [None]:
print("\n" + "=" * 60)
print("v22.1 Data Preprocessing Summary")
print("=" * 60)

print(f"\nConfiguration:")
print(f"  Large data threshold: {config.large_data_threshold:,} rows")
print(f"  Min text length: {config.min_text_length} chars")
print(f"  Max text length: {config.max_text_length} chars")
print(f"  Min Korean ratio: {config.min_korean_ratio * 100:.0f}%")

if OUTPUT_FILE.exists():
    print(f"\nOutput:")
    print(f"  File: {OUTPUT_FILE}")
    print(f"  Rows: {len(df_output):,}")
    print(f"  Size: {OUTPUT_FILE.stat().st_size / 1024 / 1024:.2f} MB")

    if S3_BUCKET_NAME:
        s3_path = f"s3://{S3_BUCKET_NAME}/{config.s3_output_prefix}"
        print(f"  S3 path: {s3_path}")

print("\n" + "=" * 60)

## Next Steps

1. Run `02_data_augmentation.ipynb` to augment the preprocessed data
2. The preprocessed data will be used as input for training triplet generation