In [None]:
!pip install -U datasets

In [None]:
import os, re
import pandas as pd
from datasets import load_dataset

In [None]:
# Random seed
SEED = 42

#Total samples per pair
N_TOTAL = 10000

# Split ratio
TRAIN_FRAC = 0.80
VAL_FRAC   = 0.10
TEST_FRAC  = 0.10

 # Basic length filters (language-agnostic)
MIN_CHARS = 3
MAX_CHARS_SRC = 500
MAX_CHARS_TGT = 500

In [None]:
# "Untranslated / misaligned" filters
# If target is too similar to source, it often means untranslated or wrong alignment
SIMILARITY_JACCARD_THRESHOLD = 0.85   # 0..1 (higher = stricter)

In [None]:
# Korean validation
KO_MIN_HANGUL_CHARS = 1
KO_MIN_HANGUL_RATIO = 0.30            # hangul chars / total non-space chars
KO_MAX_LATIN_RATIO  = 0.50            # if mostly latin letters, likely wrong

In [None]:
# Vietnamese validation (Latin script; use diacritics as weak signal + anti-copy)
VI_MIN_DIACRITIC_RATIO = 0.005        # if 0, allow no diacritics; small value filters obvious English
VI_MAX_COPY_RATIO      = 0.90         # additional anti-copy (token overlap proxy)

In [None]:
# Indonesian validation (Latin script; anti-copy + optional "English-ness" heuristic)
ID_MAX_COPY_RATIO      = 0.90

In [None]:
# Output folder
OUT_ROOT = "dataset_splits_opus100_10k"

In [None]:
def basic_clean(text: str) -> str:
    # consistent whitespace cleanup
    return " ".join(str(text).strip().split())

In [None]:
_word_re = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ]+|[0-9]+", re.UNICODE)

In [None]:
def tokenize_words(text: str):
    return _word_re.findall(text.lower())

In [None]:
def jaccard_similarity(a: str, b: str) -> float:
    wa = set(tokenize_words(a))
    wb = set(tokenize_words(b))
    if not wa and not wb:
        return 1.0
    if not wa or not wb:
        return 0.0
    inter = len(wa & wb)
    union = len(wa | wb)
    return inter / max(union, 1)

In [None]:
def ratio_latin_letters(text: str) -> float:
    chars = [c for c in text if not c.isspace()]
    if not chars:
        return 0.0
    latin = sum(1 for c in chars if ("A" <= c <= "Z") or ("a" <= c <= "z"))
    return latin / len(chars)

In [None]:
def count_hangul(text: str) -> int:
    # Hangul syllables + jamo ranges
    n = 0
    for c in text:
        o = ord(c)
        if (0xAC00 <= o <= 0xD7A3) or (0x1100 <= o <= 0x11FF) or (0x3130 <= o <= 0x318F):
            n += 1
    return n

In [None]:
def hangul_ratio(text: str) -> float:
    chars = [c for c in text if not c.isspace()]
    if not chars:
        return 0.0
    return count_hangul(text) / len(chars)

In [None]:
def diacritic_ratio(text: str) -> float:
    # Vietnamese often contains diacritics; English almost never does.
    # Count non-ascii letters as a proxy (good enough for filtering obvious English in VI target).
    chars = [c for c in text if not c.isspace()]
    if not chars:
        return 0.0
    non_ascii = sum(1 for c in chars if ord(c) > 127)
    return non_ascii / len(chars)

In [None]:
def pass_basic_filters(src: str, tgt: str) -> bool:
    if not src or not tgt:
        return False
    if len(src) < MIN_CHARS or len(tgt) < MIN_CHARS:
        return False
    if len(src) > MAX_CHARS_SRC or len(tgt) > MAX_CHARS_TGT:
        return False
    # Remove exact copies (common misalignment/untranslated)
    if src.strip() == tgt.strip():
        return False
    # Remove near-copies (untranslated / same sentence)
    if jaccard_similarity(src, tgt) >= SIMILARITY_JACCARD_THRESHOLD:
        return False
    return True

In [None]:
def pass_language_validation(src_lang: str, tgt_lang: str, src: str, tgt: str) -> bool:
    # We validate mainly the TARGET language (tgt_lang), because that’s where you saw English leakage.
    # For bidirectional, you’ll swap columns at training time; we keep canonical EN->X datasets clean.
    if tgt_lang == "ko":
        # Must contain Hangul and have reasonable Hangul dominance
        if count_hangul(tgt) < KO_MIN_HANGUL_CHARS:
            return False
        if hangul_ratio(tgt) < KO_MIN_HANGUL_RATIO:
            return False
        if ratio_latin_letters(tgt) > KO_MAX_LATIN_RATIO:
            return False
        return True

    if tgt_lang == "vi":
        # Vietnamese uses Latin letters, so we can’t script-check.
        # Filter obvious English leakage using diacritics ratio + anti-copy.
        if jaccard_similarity(src, tgt) > VI_MAX_COPY_RATIO:
            return False
        # Mild filter: require *some* non-ascii characters occasionally.
        # (Set VI_MIN_DIACRITIC_RATIO to 0.0 if you find it removes too much.)
        if diacritic_ratio(tgt) < VI_MIN_DIACRITIC_RATIO:
            # allow if it still looks non-copy (avoid removing short valid VI with no diacritics)
            # keep this conservative:
            if len(tgt.split()) >= 6:   # longer lines should usually show diacritics
                return False
        return True

    if tgt_lang == "id":
        # Indonesian uses Latin letters too; rely on anti-copy.
        if jaccard_similarity(src, tgt) > ID_MAX_COPY_RATIO:
            return False
        return True

    # Default: accept
    return True

In [None]:
def make_splits_opus100(pair_cfg: str, src_lang: str, tgt_lang: str, out_dir: str):
    print(f"\n=== Loading opus100/{pair_cfg} for {src_lang}->{tgt_lang} ===")
    raw = load_dataset("opus100", pair_cfg, split="train")

    # Map to clean source/target
    def _map_row(ex):
        src = basic_clean(ex["translation"][src_lang])
        tgt = basic_clean(ex["translation"][tgt_lang])
        return {"source": src, "target": tgt}

    ds = raw.map(_map_row, remove_columns=raw.column_names)

    # Apply basic + language validation filters
    def _filter_row(ex):
        s = ex["source"]
        t = ex["target"]
        if not pass_basic_filters(s, t):
            return False
        if not pass_language_validation(src_lang, tgt_lang, s, t):
            return False
        return True

    ds = ds.filter(_filter_row)

    # Deterministic shuffle + fixed size
    ds = ds.shuffle(seed=SEED)
    n_total = min(N_TOTAL, len(ds))
    ds = ds.select(range(n_total))

    # 80/10/10 split by slicing (deterministic)
    n_train = int(n_total * TRAIN_FRAC)
    n_val   = int(n_total * VAL_FRAC)
    n_test  = n_total - n_train - n_val

    train_ds = ds.select(range(0, n_train))
    val_ds   = ds.select(range(n_train, n_train + n_val))
    test_ds  = ds.select(range(n_train + n_val, n_train + n_val + n_test))

    os.makedirs(out_dir, exist_ok=True)

    pd.DataFrame(train_ds).to_csv(os.path.join(out_dir, "train.csv"), index=False, encoding="utf-8")
    pd.DataFrame(val_ds).to_csv(os.path.join(out_dir, "val.csv"), index=False, encoding="utf-8")
    pd.DataFrame(test_ds).to_csv(os.path.join(out_dir, "test.csv"), index=False, encoding="utf-8")

    # Metadata for reproducibility
    with open(os.path.join(out_dir, "meta.txt"), "w", encoding="utf-8") as f:
        f.write(f"dataset=opus100\n")
        f.write(f"pair_cfg={pair_cfg}\n")
        f.write(f"direction={src_lang}->{tgt_lang}\n")
        f.write(f"seed={SEED}\n")
        f.write(f"n_total={n_total}\n")
        f.write(f"split={TRAIN_FRAC}/{VAL_FRAC}/{TEST_FRAC}\n")
        f.write(f"min_chars={MIN_CHARS}\n")
        f.write(f"max_chars_src={MAX_CHARS_SRC}\n")
        f.write(f"max_chars_tgt={MAX_CHARS_TGT}\n")
        f.write(f"jaccard_threshold={SIMILARITY_JACCARD_THRESHOLD}\n")
        f.write(f"ko_min_hangul_chars={KO_MIN_HANGUL_CHARS}\n")
        f.write(f"ko_min_hangul_ratio={KO_MIN_HANGUL_RATIO}\n")
        f.write(f"ko_max_latin_ratio={KO_MAX_LATIN_RATIO}\n")
        f.write(f"vi_min_diacritic_ratio={VI_MIN_DIACRITIC_RATIO}\n")
        f.write(f"vi_max_copy_ratio={VI_MAX_COPY_RATIO}\n")
        f.write(f"id_max_copy_ratio={ID_MAX_COPY_RATIO}\n")

    print(f"After filtering: total={n_total} | train={len(train_ds)} val={len(val_ds)} test={len(test_ds)}")
    print(f"Saved to: {out_dir}")

In [None]:
# create three frozen datasets
os.makedirs(OUT_ROOT, exist_ok=True)

# We store EN->X as the canonical saved files.
# For X->EN, your training code should swap columns (source<->target) without making new files.

make_splits_opus100("en-id", "en", "id", os.path.join(OUT_ROOT, "en_id"))
make_splits_opus100("en-vi", "en", "vi", os.path.join(OUT_ROOT, "en_vi"))
make_splits_opus100("en-ko", "en", "ko", os.path.join(OUT_ROOT, "en_ko"))

print("\nDONE  Clean + frozen dataset splits created.")
print(f"Folder: {OUT_ROOT}/  (upload this folder to Drive)")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp -r dataset_splits_opus100_10k /content/drive/MyDrive/