In [1]:
import re
from pathlib import Path

from tqdm import tqdm

In [2]:
validation_path = Path("validation")
test_path = Path("test")
train_path = Path("train")

In [3]:
path = train_path

In [4]:
removed = []
for p in path.glob("*cleaned.txt"):
    try:
        if p.is_file():
            p.unlink()
            removed.append(p)
    except Exception as e:
        print(f"Failed to remove {p}: {e}")

print(f"Removed {len(removed)} files.")

Removed 0 files.


In [5]:
files = sorted(path.glob("*.txt"))
len(files)

28602

In [6]:
UNK_TOKEN = "<unk>"


def _num_to_words_lt100(n: int) -> str:
    """0..99 -> written form (US/UK neutral, no hyphens)."""
    ones = [
        "zero",
        "one",
        "two",
        "three",
        "four",
        "five",
        "six",
        "seven",
        "eight",
        "nine",
        "ten",
        "eleven",
        "twelve",
        "thirteen",
        "fourteen",
        "fifteen",
        "sixteen",
        "seventeen",
        "eighteen",
        "nineteen",
    ]
    tens = [
        "",
        "",
        "twenty",
        "thirty",
        "forty",
        "fifty",
        "sixty",
        "seventy",
        "eighty",
        "ninety",
    ]

    if 0 <= n < 20:
        return ones[n]
    t, o = divmod(n, 10)
    return tens[t] if o == 0 else f"{tens[t]} {ones[o]}"


def _strip_gutenberg_boilerplate(text: str) -> str:
    """
    Best-effort removal of Project Gutenberg header/footer.
    If markers aren't found, returns the original text.
    """
    # Common PG markers (case-insensitive)
    start_pat = re.compile(
        r"\*\*\*\s*start of (this|the) project gutenberg", re.IGNORECASE
    )
    end_pat = re.compile(r"\*\*\*\s*end of (this|the) project gutenberg", re.IGNORECASE)

    start_m = start_pat.search(text)
    end_m = end_pat.search(text)

    if start_m:
        # Skip the whole marker line
        after_start = text.find("\n", start_m.start())
        text = text[after_start + 1 if after_start != -1 else start_m.end() :]

    if end_m:
        text = text[: end_m.start()]

    return text


def clean_gutenberg_txt(
    load_path: Path,
    save_path: Path,
    min_words_per_paragraph: int = 32,
) -> None:
    """
    Reads a Project Gutenberg .txt file, cleans it, and writes the result.

    Conditions implemented:
      1) Drops paragraphs with fewer than `min_words_per_paragraph` words.
      2) Removes punctuation except the single quote (').
      3) Converts numbers < 100 into words; numbers >= 100 become <unk>.
      4) Converts output to upper case (optionally keeping <unk> lower-case).

    Output paragraphs are separated by a blank line.
    """
    load_path = Path(load_path)
    save_path = Path(save_path)

    text = load_path.read_text(errors="ignore")

    text = _strip_gutenberg_boilerplate(text)

    # Normalize newlines
    text = text.replace("\r\n", "\n").replace("\r", "\n")

    # Split into paragraphs (blank-line separated)
    raw_paragraphs = re.split(r"\n\s*\n+", text)

    cleaned_paragraphs = []
    num_pat = re.compile(r"\b\d+\b")

    for p in raw_paragraphs:
        p = p.strip()
        if not p:
            continue

        # Collapse internal whitespace
        p = re.sub(r"\s+", " ", p)

        # Convert numbers: <100 -> words, otherwise -> <unk>
        def repl_num(m: re.Match) -> str:
            n = int(m.group(0))
            return _num_to_words_lt100(n) if n < 100 else UNK_TOKEN

        p = num_pat.sub(repl_num, p)

        # Remove punctuation except apostrophe:
        # keep letters, digits, whitespace, and apostrophe. Replace everything else with space.
        p = re.sub(r"[^A-Za-z0-9\s'<>]+", " ", p)
        p = re.sub(r"\s+", " ", p).strip()

        # Word-count filter
        if len(p.split()) < min_words_per_paragraph:
            continue

        # Uppercase
        p_up = p.upper()

        # Optionally preserve <unk> as lowercase token
        p_up = p_up.replace(UNK_TOKEN.upper(), UNK_TOKEN)

        cleaned_paragraphs.append(p_up)

    save_path.parent.mkdir(parents=True, exist_ok=True)
    save_path.write_text(
        "\n\n".join(cleaned_paragraphs) + ("\n" if cleaned_paragraphs else "")
    )

In [7]:
for file in tqdm(files):
    clean_gutenberg_txt(
        file,
        file.with_suffix(".cleaned.txt"),
    )

100%|██████████| 28602/28602 [25:27<00:00, 18.73it/s] 


# Concatenate files

In [None]:
def concat_cleaned(folder: str) -> Path:
    d = Path(folder).expanduser().resolve()
    out = Path(f"{d.name}.txt")

    # Match files that end exactly with "cleaned.txt"
    files = sorted([p for p in d.iterdir() if p.name.endswith("cleaned.txt")])

    with out.open("w") as w:
        for p in files:
            with p.open("r") as r:
                for line in r:
                    if line.strip():
                        w.write(line.rstrip("\n") + "\n")

    return out


concat_cleaned(Path("validation"))

AttributeError: 'str' object has no attribute 'open'