## 1. Configs (Paths + Threshold)

In [1]:
from pathlib import Path

# -----------------------------
# Input WET (try multiple places)
# -----------------------------
CANDIDATE_WET_PATHS = [
    Path("/mnt/data/test_compression.warc.wet"),   # common in mounted env
    Path("data-v2/test_compression.warc.wet"),
    Path("test_compression.warc.wet"),
]

WET_PATH = next((p for p in CANDIDATE_WET_PATHS if p.exists()), None)
assert WET_PATH is not None, f"Missing WET file. Tried: {CANDIDATE_WET_PATHS}"
print("✅ Using WET:", WET_PATH)

✅ Using WET: data-v2/test_compression.warc.wet


In [2]:
# -----------------------------
# Output directory + files
# -----------------------------
OUT_DIR = Path("data-v3.1")
OUT_DIR.mkdir(parents=True, exist_ok=True)

RAW_EXTRACT_JSONL = OUT_DIR / "wet_raw_extracted.jsonl"
CLEANED_JSONL     = OUT_DIR / "wet_cleaned_filtered.jsonl"
LINEDEDUP_JSONL   = OUT_DIR / "wet_line_deduped.jsonl"       # ✅ new
PAGEFILTER_JSONL  = OUT_DIR / "wet_pagefiltered.jsonl"       # ✅ new stage
DEDUPED_JSONL     = OUT_DIR / "wet_deduped.jsonl"
FINAL_JSONL       = OUT_DIR / "wet_final.jsonl"              # optional stage output
REPORT_JSON       = OUT_DIR / "wet_report.json"

In [3]:
# -----------------------------
# Thresholds (preserve your v2 behavior)
# -----------------------------
MIN_CHARS = 300
MAX_NONASCII_RATIO = 0.25
MAX_DIGIT_RATIO    = 0.30
MAX_PUNCT_RATIO    = 0.35
MIN_STOPWORD_RATIO = 0.05
MAX_REPEAT_LINE_RATIO = 0.30

RANDOM_SEED = 42

In [4]:
# -----------------------------
# New: Cross-document line-level dedup (CCNet-like approximation)
# -----------------------------
ENABLE_LINE_DEDUP = True

# Drop lines that appear in too many documents (boilerplate templates)
LINE_DUP_MIN_LEN = 30         # ignore tiny lines
LINE_DUP_MAX_DOC_FREQ = 3     # was 5     # if a line appears in >= this many docs, remove it
LINE_DEDUP_KEEP_TOPK_LONG_LINES = 0  # set >0 if you want to always keep some longest lines

# -----------------------------
# New: Page-type junk filters (high ROI)
# -----------------------------
ENABLE_PAGEFILTER = True

# cookie/privacy dominance filter (conservative)
COOKIE_TERMS = {
    "cookie", "cookies", "gdpr", "consent", "privacy", "policy",
    "personal data", "legitimate interest", "third party", "preferences"
}
COOKIE_MIN_HITS = 8
COOKIE_MIN_DENSITY = 0.015     # hits / tokens
COOKIE_MAX_UNIQUE_RATIO = 0.40 # low diversity indicates boilerplate

# e-commerce / listing filter (conservative)
ECOM_TERMS = {
    "add to cart", "checkout", "shipping", "returns", "sku", "in stock",
    "out of stock", "buy now", "quantity", "order", "discount", "price"
}
ECOM_MIN_HITS = 6
ECOM_MIN_DENSITY = 0.012

# JS/template dump filter
JS_MARKERS = {"function", "var", "const", "let", "document", "window", "return"}
JS_MIN_DENSITY = 0.02          # marker hits / tokens
JS_MIN_PUNCT_RATIO = 0.30      # reuse your punct ratio notion

In [5]:
# -----------------------------
# Optional: KenLM scoring hook (only used if you enable + have model)
# -----------------------------
ENABLE_KENLM = False
KENLM_MODEL_PATH = None  # e.g., "models/cc.en.klm"
KENLM_MAX_CHARS = 50000
KENLM_SCORE_THRESHOLD = None  # e.g., -120.0 (you must calibrate; leave None to just record scores)

In [6]:
# -----------------------------
# Optional: Reference-quality classifier (trainable proxy)
# -----------------------------
ENABLE_REF_QUALITY_FILTER = False
REF_QUALITY_MODEL_PATH = OUT_DIR / "ref_quality_model.joblib"
REF_QUALITY_THRESHOLD = 0.5  # probability threshold if enabled

In [7]:
# -----------------------------
# Optional: Extended analyses (from your extended notebook)
# -----------------------------
RUN_NEAR_DUP = True
RUN_TOPIC_MODELING = True

NEAR_DUP_SAMPLE_N = 3000    # None = all
MINHASH_NUM_PERM  = 128
LSH_THRESHOLD     = 0.90
SHINGLE_SIZE      = 5

TOPIC_SAMPLE_N = 5000       # None = all
N_TOPICS       = 8
TOP_TERMS      = 12

## 2. Install Dependencies

In [8]:
!pip -q install warcio tqdm langid nltk numpy scikit-learn datasketch joblib

## 3. Imports + NLTK Downloads

In [9]:
import json, re, hashlib, random
from collections import Counter, defaultdict
from urllib.parse import urlparse

import numpy as np
from tqdm import tqdm
from warcio.archiveiterator import ArchiveIterator
import langid

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

random.seed(RANDOM_SEED)

nltk.download("punkt")
nltk.download("stopwords")

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/ostwalaman/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/ostwalaman/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

## 4. Utilities

In [10]:
WS_RE = re.compile(r"\s+")

def normalize_ws(text: str) -> str:
    return WS_RE.sub(" ", text).strip()

def host_from_url(url: str) -> str:
    try:
        return urlparse(url).netloc.lower()
    except Exception:
        return ""

def iter_jsonl(path: Path):
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                yield json.loads(line)

def summarize_numeric(values):
    arr = np.array(values, dtype=float) if values else np.array([], dtype=float)
    if arr.size == 0:
        return {"n": 0}
    return {
        "n": int(arr.size),
        "mean": float(arr.mean()),
        "p50": float(np.median(arr)),
        "p90": float(np.quantile(arr, 0.9)),
        "max": float(arr.max())
    }

def analyze_raw(jsonl_path: Path, top_k_hosts=20):
    lengths = []
    hosts = Counter()
    for obj in iter_jsonl(jsonl_path):
        t = obj.get("content", "")
        lengths.append(len(t))
        hosts[obj.get("host", "")] += 1

    report = {
        "docs": len(lengths),
        "length_chars": summarize_numeric(lengths),
        "top_hosts": hosts.most_common(top_k_hosts),
    }
    return report

def preview_wet(wet_path, max_records=60, max_preview_chars=220):
    type_counts = Counter()
    samples = []
    seen = 0
    with open(wet_path, "rb") as stream:
        for rec in ArchiveIterator(stream):
            seen += 1
            type_counts[rec.rec_type] += 1
            if rec.rec_type == "conversion":
                url = rec.rec_headers.get_header("WARC-Target-URI") or ""
                raw = rec.content_stream().read()
                text = raw.decode("utf-8", errors="ignore")
                prev = normalize_ws(text[:max_preview_chars])
                samples.append({"url": url, "chars": len(text), "preview": prev})
            if seen >= max_records:
                break
    return {"type_counts": dict(type_counts), "samples": samples}

wet_preview = preview_wet(WET_PATH, max_records=60)
wet_preview["type_counts"], wet_preview["samples"][:3]

({'warcinfo': 1, 'conversion': 59},
 [{'url': 'http://000af36.netsolhost.com/wordpress1/2004/09/page/2/',
   'chars': 16478,
   'preview': 'September | 2004 | Bob Griendling | Page 2 Menu About Home Experience Op-eds Bio Blog Top Monthly Archives: September 2004 Fantasyland Date: September 27, 2004 Author: Bob Griendling Categories: Uncategorized Funny thing'},
  {'url': 'http://0400425.netsolhost.com/beiseker/calendar-2/action~month/exact_date~1672556400/request_format~html/',
   'chars': 2087,
   'preview': 'Calendar | Village of Beiseker | Page 0400425.netsolhost.com|beiseker|calendar-2|action~month|exact_date~1672556400|request_format~html| Village of Beiseker Crossroads to the Future Search Main menu Skip to primary conte'},
  {'url': 'http://055-237-0928.com/css/0pgxv9khyq5k0ar63xv97/index.html',
   'chars': 4542,
   'preview': "춘천출장만남 최신뉴스▶ 출장샵,출장마사지,출장안마 [새책]종화동안마,익산여대생출장 [새책]비천동안마,서랑동안마 대덕소개팅,웅진동안마 '지하철에서 출장30대소개팅 위험.jpg,성북출장만남 출장대행 콜걸샾 오피콜걸 여대생' 창원성인출장마사지,서대문콜걸 장수군출장만남 출장대행 콜

## 5. Extract WET --> JSONL

In [11]:
def extract_wet_to_jsonl(wet_path: Path, out_jsonl: Path, min_chars: int = 300):
    total = 0
    conv = 0
    kept = 0
    dropped_short = 0

    with open(wet_path, "rb") as stream, open(out_jsonl, "w", encoding="utf-8") as out:
        for rec in tqdm(ArchiveIterator(stream), desc="Extracting WET"):
            total += 1
            if rec.rec_type != "conversion":
                continue

            conv += 1
            url = rec.rec_headers.get_header("WARC-Target-URI") or ""
            raw = rec.content_stream().read()
            text_raw = raw.decode("utf-8", errors="ignore")

            # Preserve your old behavior as "content"
            text_norm = normalize_ws(text_raw)

            if len(text_norm) < min_chars:
                dropped_short += 1
                continue

            obj = {
                "url": url,
                "host": host_from_url(url),
                "source": "wet",
                "content": text_norm,     # ✅ same field you used before
                "content_raw": text_raw,  # ✅ new for line-aware cleaning/dedup
            }
            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            kept += 1

    stats = {
        "total_records": total,
        "conversion_records": conv,
        "kept_docs": kept,
        "dropped_too_short": dropped_short
    }
    return stats

extract_stats = extract_wet_to_jsonl(WET_PATH, RAW_EXTRACT_JSONL, min_chars=MIN_CHARS)
extract_stats

Extracting WET: 34318it [00:08, 3859.58it/s]


{'total_records': 34318,
 'conversion_records': 34317,
 'kept_docs': 32879,
 'dropped_too_short': 1438}

## 6. Raw Report

In [12]:
raw_report = analyze_raw(RAW_EXTRACT_JSONL)
raw_report

{'docs': 32879,
 'length_chars': {'n': 32879,
  'mean': 7480.538246297028,
  'p50': 3807.0,
  'p90': 13459.600000000002,
  'max': 877216.0},
 'top_hosts': [('cdha.cuny.edu', 14),
  ('courseware.zcu.cz', 13),
  ('turbotax.intuit.com', 10),
  ('diecezja.pl', 9),
  ('alcoholpolicy.niaaa.nih.gov', 8),
  ('businessfig.com', 8),
  ('www.besport.com', 8),
  ('www.library.univ.kiev.ua', 7),
  ('yscholarhub.yonsei.ac.kr', 6),
  ('headquarters.s4.xrea.com', 5),
  ('viavca.in2p3.fr', 5),
  ('5ka-sale.ru', 5),
  ('b-port.com', 5),
  ('bryansk.news', 5),
  ('ca.news.yahoo.com', 5),
  ('andpremium.jp', 4),
  ('arquivo.cienciaviva.pt', 4),
  ('art.ceskatelevize.cz', 4),
  ('burbujasweb.com', 4),
  ('cleanindiajournal.com', 4)]}

## 7. Cleaning

In [13]:
NAV_LINE_RE = re.compile(r"^(menu|home|about|contact|search|skip to|privacy|terms|login|sign in)$", re.I)

def clean_doc_text_keep_lines(text_raw: str):
    """
    Preserves your v2 behavior, but uses raw lines to compute repetition + boilerplate.
    Returns:
      cleaned_text: normalized whitespace join of kept lines
      repeat_ratio: same idea as your v2
      kept_lines: original kept lines (for cross-doc line dedup)
    """
    raw_lines = [ln.strip() for ln in text_raw.splitlines()]
    lines = [ln for ln in raw_lines if ln]

    if not lines:
        return "", 1.0, []

    filtered = []
    for ln in lines:
        low = ln.lower().strip()
        if len(low) <= 2:
            continue
        if len(low) <= 25 and NAV_LINE_RE.match(low):
            continue
        filtered.append(ln)

    if not filtered:
        return "", 1.0, []

    cnt = Counter([ln.lower() for ln in filtered])
    kept = [ln for ln in filtered if cnt[ln.lower()] < 3]  # same as your v2

    # same spirit as your v2
    repeat_ratio = 1.0 - (len(set([ln.lower() for ln in kept])) / max(1, len(kept)))
    cleaned = normalize_ws("\n".join(kept))
    return cleaned, float(repeat_ratio), kept

## 8. Language + Quality Gates

In [14]:
EN_STOPWORDS = set(stopwords.words("english"))

DIGIT_RE = re.compile(r"\d")
PUNCT_RE = re.compile(r"[^\w\s]")

def nltk_tokens(text: str):
    return [t.lower() for t in word_tokenize(text)]

def stopword_ratio_nltk(text: str):
    toks = nltk_tokens(text)
    words = [t for t in toks if t.isalpha()]
    if not words:
        return 0.0
    sw = sum(1 for w in words if w in EN_STOPWORDS)
    return sw / len(words)

def basic_ratios(text: str):
    if not text:
        return {"nonascii": 1.0, "digit": 1.0, "punct": 1.0}
    n = len(text)
    nonascii = sum(1 for c in text if ord(c) > 127) / n
    digit = len(DIGIT_RE.findall(text)) / n
    punct = len(PUNCT_RE.findall(text)) / n
    return {"nonascii": nonascii, "digit": digit, "punct": punct}

def is_english_langid(text: str):
    lang, score = langid.classify(text[:5000])
    return (lang == "en"), {"lang": lang, "lang_score": float(score)}

def quality_pass(text: str, repeat_line_ratio: float):
    if len(text) < MIN_CHARS:
        return False, {"fail": "too_short"}

    r = basic_ratios(text)
    swr = stopword_ratio_nltk(text)

    if r["nonascii"] > MAX_NONASCII_RATIO:
        return False, {"fail":"nonascii", **r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}
    if r["digit"] > MAX_DIGIT_RATIO:
        return False, {"fail":"digit", **r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}
    if r["punct"] > MAX_PUNCT_RATIO:
        return False, {"fail":"punct", **r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}
    if swr < MIN_STOPWORD_RATIO:
        return False, {"fail":"low_stopwords", **r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}
    if repeat_line_ratio > MAX_REPEAT_LINE_RATIO:
        return False, {"fail":"too_repetitive", **r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}

    return True, {**r, "stopword_ratio": swr, "repeat_line_ratio": repeat_line_ratio}

## 9. Filter + Clean Stage

In [15]:
def filter_and_clean(in_jsonl: Path, out_jsonl: Path):
    stats = Counter()

    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="Cleaning+Filtering"):
            content_raw = obj.get("content_raw", "")
            content = obj.get("content", "")

            if not content and not content_raw:
                stats["drop_empty"] += 1
                continue

            # line-aware cleaning (uses raw)
            cleaned, rep, kept_lines = clean_doc_text_keep_lines(content_raw if content_raw else content)

            if not cleaned:
                stats["drop_clean_empty"] += 1
                continue

            ok_lang, lang_meta = is_english_langid(cleaned)
            if not ok_lang:
                stats["drop_non_en"] += 1
                continue

            ok_q, q_meta = quality_pass(cleaned, rep)
            if not ok_q:
                stats[f"drop_quality_{q_meta.get('fail','unknown')}"] += 1
                continue

            out_obj = {
                "url": obj.get("url", ""),
                "host": obj.get("host", ""),
                "source": obj.get("source", "wet"),
                "content": cleaned,             # ✅ same as before
                "_kept_lines": kept_lines,      # ✅ temp for cross-doc line dedup
                "meta": {"lang": lang_meta, "quality": q_meta}
            }
            out.write(json.dumps(out_obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    return stats

filter_stats = filter_and_clean(RAW_EXTRACT_JSONL, CLEANED_JSONL)
filter_stats

Cleaning+Filtering: 32879it [04:09, 131.66it/s]


Counter({'drop_non_en': 19103,
         'kept': 12470,
         'drop_quality_too_repetitive': 1003,
         'drop_quality_low_stopwords': 219,
         'drop_quality_digit': 53,
         'drop_quality_too_short': 30,
         'drop_quality_nonascii': 1})

## 10. Cross-document line-level dedup

In [16]:
LINE_NORM_RE = re.compile(r"\s+")
URL_RE = re.compile(r"https?://\S+|www\.\S+", re.I)
LONG_NUM_RE = re.compile(r"\d{2,}")     # normalize multi-digit numbers
HEX_RE = re.compile(r"\b[0-9a-f]{8,}\b", re.I)  # ids/hashes
MONEY_RE = re.compile(r"[$€£]\s?\d+(?:[.,]\d+)?")

def normalize_line_for_hash(line: str) -> str:
    """
    Normalize template-variant lines so boilerplate collapses together.
    This makes cross-doc line dedup MUCH more effective.
    """
    s = line.strip().lower()

    # remove/normalize volatile tokens
    s = URL_RE.sub("<url>", s)
    s = MONEY_RE.sub("<money>", s)
    s = HEX_RE.sub("<hex>", s)
    s = LONG_NUM_RE.sub("0", s)  # page=123 -> page=0, dates -> 0-0-0

    # collapse whitespace
    s = LINE_NORM_RE.sub(" ", s)

    # optional: trim very long lines to avoid huge hashes on garbage
    if len(s) > 500:
        s = s[:500]

    return s

def line_level_dedup(in_jsonl: Path, out_jsonl: Path):
    if not ENABLE_LINE_DEDUP:
        # pass-through
        with open(out_jsonl, "w", encoding="utf-8") as out:
            for obj in iter_jsonl(in_jsonl):
                obj.pop("_kept_lines", None)
                out.write(json.dumps(obj, ensure_ascii=False) + "\n")
        return {"skipped": True}

    # Pass 1: count in how many documents each line appears (doc-frequency)
    line_doc_freq = Counter()
    total_docs = 0

    for obj in tqdm(iter_jsonl(in_jsonl), desc="LineDedup pass1 (count)"):
        total_docs += 1
        lines = obj.get("_kept_lines", [])
        seen_in_doc = set()
        for ln in lines:
            if len(ln) < LINE_DUP_MIN_LEN:
                continue
            h = hashlib.md5(normalize_line_for_hash(ln).encode("utf-8", errors="ignore")).hexdigest()
            seen_in_doc.add(h)
        for h in seen_in_doc:
            line_doc_freq[h] += 1

    # Pass 2: remove high-docfreq lines
    stats = Counter()
    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="LineDedup pass2 (filter)"):
            lines = obj.get("_kept_lines", [])
            kept = []
            removed = 0

            for ln in lines:
                if len(ln) < LINE_DUP_MIN_LEN:
                    kept.append(ln)
                    continue
                h = hashlib.md5(normalize_line_for_hash(ln).encode("utf-8", errors="ignore")).hexdigest()
                if line_doc_freq[h] >= LINE_DUP_MAX_DOC_FREQ:
                    removed += 1
                else:
                    kept.append(ln)

            # optional: always keep some longest lines (if you want recall)
            if LINE_DEDUP_KEEP_TOPK_LONG_LINES and len(kept) == 0 and len(lines) > 0:
                kept = sorted(lines, key=len, reverse=True)[:LINE_DEDUP_KEEP_TOPK_LONG_LINES]

            new_text = normalize_ws("\n".join(kept))
            if len(new_text) < MIN_CHARS:
                stats["drop_post_linededup_too_short"] += 1
                continue

            obj["content"] = new_text
            obj["meta"]["line_dedup"] = {
                "removed_lines": int(removed),
                "orig_lines": int(len(lines)),
                "kept_lines": int(len(kept)),
            }

            obj.pop("_kept_lines", None)  # drop temp
            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    stats["docs_seen"] = total_docs
    stats["unique_line_hashes"] = len(line_doc_freq)
    stats["high_freq_lines"] = sum(1 for _,df in line_doc_freq.items() if df >= LINE_DUP_MAX_DOC_FREQ)
    return dict(stats)

line_dedup_stats = line_level_dedup(CLEANED_JSONL, LINEDEDUP_JSONL)
line_dedup_stats

LineDedup pass1 (count): 12470it [00:06, 1813.46it/s]
LineDedup pass2 (filter): 12470it [00:09, 1377.23it/s]


{'kept': 12320,
 'drop_post_linededup_too_short': 150,
 'docs_seen': 12470,
 'unique_line_hashes': 389524,
 'high_freq_lines': 5920}

In [17]:
WORD_RE_SIMPLE = re.compile(r"[A-Za-z]+")

def _count_phrase_hits(text_lc: str, phrases: set) -> int:
    hits = 0
    for p in phrases:
        if " " in p:
            hits += text_lc.count(p)
        else:
            # word boundary-ish: count occurrences; rough but OK
            hits += len(re.findall(rf"\b{re.escape(p)}\b", text_lc))
    return hits

def unique_word_ratio(tokens):
    if not tokens:
        return 0.0
    return len(set(tokens)) / len(tokens)

def page_type_filter(in_jsonl: Path, out_jsonl: Path):
    """
    Drops:
      - cookie/privacy boilerplate-dominant pages
      - e-commerce listing/product/checkout pages
      - JS/template dumps
    Runs AFTER line-dedup so boilerplate removal already happened.
    """
    if not ENABLE_PAGEFILTER:
        # pass-through
        with open(out_jsonl, "w", encoding="utf-8") as out:
            for obj in iter_jsonl(in_jsonl):
                out.write(json.dumps(obj, ensure_ascii=False) + "\n")
        return {"skipped": True}

    stats = Counter()

    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="Page-type filtering"):
            text = obj.get("content", "")
            if not text:
                stats["drop_empty"] += 1
                continue

            text_lc = text.lower()
            tokens = [t.lower() for t in WORD_RE_SIMPLE.findall(text)]
            n_tokens = len(tokens)

            if n_tokens == 0:
                stats["drop_no_tokens"] += 1
                continue

            uniq_ratio = unique_word_ratio(tokens)

            # --- cookie/privacy dominance ---
            cookie_hits = _count_phrase_hits(text_lc, COOKIE_TERMS)
            cookie_density = cookie_hits / n_tokens

            cookie_drop = (
                cookie_hits >= COOKIE_MIN_HITS and
                cookie_density >= COOKIE_MIN_DENSITY and
                uniq_ratio <= COOKIE_MAX_UNIQUE_RATIO
            )

            if cookie_drop:
                stats["drop_cookie_privacy"] += 1
                continue

            # --- e-commerce/listing pages ---
            ecom_hits = _count_phrase_hits(text_lc, ECOM_TERMS)
            ecom_density = ecom_hits / n_tokens

            ecom_drop = (
                ecom_hits >= ECOM_MIN_HITS and
                ecom_density >= ECOM_MIN_DENSITY
            )

            if ecom_drop:
                stats["drop_ecommerce"] += 1
                continue

            # --- JS/template dumps ---
            js_hits = _count_phrase_hits(text_lc, JS_MARKERS)
            js_density = js_hits / n_tokens

            r = basic_ratios(text)  # uses your existing function

            js_drop = (
                js_density >= JS_MIN_DENSITY and
                r["punct"] >= JS_MIN_PUNCT_RATIO
            )

            if js_drop:
                stats["drop_js_dump"] += 1
                continue

            # keep
            obj.setdefault("meta", {})["pagefilter"] = {
                "unique_word_ratio": float(uniq_ratio),
                "cookie_hits": int(cookie_hits),
                "cookie_density": float(cookie_density),
                "ecom_hits": int(ecom_hits),
                "ecom_density": float(ecom_density),
                "js_hits": int(js_hits),
                "js_density": float(js_density),
            }

            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    return dict(stats)

pagefilter_stats = page_type_filter(LINEDEDUP_JSONL, PAGEFILTER_JSONL)
pagefilter_stats

Page-type filtering: 12320it [00:25, 490.38it/s]


{'kept': 11316,
 'drop_cookie_privacy': 61,
 'drop_ecommerce': 941,
 'drop_js_dump': 2}

## 11. Exact Dedup

In [18]:
def content_hash(text: str) -> str:
    t = normalize_ws(text)
    return hashlib.md5(t.encode("utf-8", errors="ignore")).hexdigest()

def exact_dedup(in_jsonl: Path, out_jsonl: Path):
    seen = set()
    stats = Counter()

    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="Exact dedup"):
            t = obj.get("content", "")
            h = content_hash(t)
            if h in seen:
                stats["drop_exact_dup"] += 1
                continue
            seen.add(h)
            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    stats["unique_hashes"] = len(seen)
    return dict(stats)

DEDUP_INPUT = (
    PAGEFILTER_JSONL if (ENABLE_LINE_DEDUP and ENABLE_PAGEFILTER)
    else (LINEDEDUP_JSONL if ENABLE_LINE_DEDUP else CLEANED_JSONL)
)

dedup_stats = exact_dedup(DEDUP_INPUT, DEDUPED_JSONL)
dedup_stats

Exact dedup: 11316it [00:02, 4077.35it/s]


{'kept': 11289, 'drop_exact_dup': 27, 'unique_hashes': 11289}

## 12. Language Distribution Probe

In [19]:
def lang_distribution(jsonl_path: Path, sample_n=20000, seed=42):
    cnt = Counter()
    rows = list(iter_jsonl(jsonl_path))
    random.Random(seed).shuffle(rows)
    rows = rows[:min(sample_n, len(rows))]

    for obj in tqdm(rows, desc="LangID dist"):
        text = obj.get("content", "")
        if not text:
            continue
        lang, score = langid.classify(text[:5000])
        cnt[lang] += 1

    print("Total classified:", sum(cnt.values()))
    print("Top 15 languages:", cnt.most_common(15))
    return cnt

lang_cnt = lang_distribution(RAW_EXTRACT_JSONL, sample_n=20000)
lang_cnt.most_common(10)

LangID dist: 100%|██████████| 20000/20000 [01:23<00:00, 240.25it/s]


Total classified: 20000
Top 15 languages: [('en', 8266), ('ru', 1281), ('de', 1140), ('es', 1057), ('ja', 1037), ('fr', 992), ('zh', 902), ('it', 624), ('pt', 444), ('nl', 423), ('pl', 409), ('la', 292), ('cs', 253), ('id', 252), ('vi', 244)]


[('en', 8266),
 ('ru', 1281),
 ('de', 1140),
 ('es', 1057),
 ('ja', 1037),
 ('fr', 992),
 ('zh', 902),
 ('it', 624),
 ('pt', 444),
 ('nl', 423)]

## 13. KenLM Scoring Hook

In [20]:
!pip -q install https://github.com/kpu/kenlm/archive/master.zip

In [21]:
def try_load_kenlm(model_path):
    try:
        import kenlm
    except Exception as e:
        print("KenLM not available. Install it first if you want this stage.")
        return None
    if model_path is None:
        print("KENLM_MODEL_PATH is None. Provide a path to a .klm model.")
        return None
    try:
        return kenlm.Model(str(model_path))
    except Exception as e:
        print("Failed to load KenLM model:", e)
        return None

def kenlm_score_text(model, text: str):
    # kenlm.Model.score returns log10 probability by default (depends on build)
    # We record the raw score; you must calibrate thresholds on your corpus.
    text = text[:KENLM_MAX_CHARS]
    return float(model.score(text))

def apply_kenlm_filter(in_jsonl: Path, out_jsonl: Path):
    if not ENABLE_KENLM:
        # pass-through
        with open(out_jsonl, "w", encoding="utf-8") as out:
            for obj in iter_jsonl(in_jsonl):
                out.write(json.dumps(obj, ensure_ascii=False) + "\n")
        return {"skipped": True}

    model = try_load_kenlm(KENLM_MODEL_PATH)
    if model is None:
        # pass-through but warn
        with open(out_jsonl, "w", encoding="utf-8") as out:
            for obj in iter_jsonl(in_jsonl):
                out.write(json.dumps(obj, ensure_ascii=False) + "\n")
        return {"skipped": True, "reason": "kenlm_not_loaded"}

    stats = Counter()
    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="KenLM scoring"):
            t = obj.get("content", "")
            s = kenlm_score_text(model, t)
            obj.setdefault("meta", {}).setdefault("quality", {})["kenlm_score"] = s

            if KENLM_SCORE_THRESHOLD is not None and s < KENLM_SCORE_THRESHOLD:
                stats["drop_kenlm_low_score"] += 1
                continue

            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    return dict(stats)

kenlm_stats = apply_kenlm_filter(DEDUPED_JSONL, FINAL_JSONL)
kenlm_stats

{'skipped': True}

## 14. Optional Reference-Quality Classifier

In [22]:
import joblib
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction import DictVectorizer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

WORD_RE = re.compile(r"[A-Za-z]+")

def featurize_doc(text: str, host: str = "") -> dict:
    r = basic_ratios(text)
    swr = stopword_ratio_nltk(text)
    words = WORD_RE.findall(text)
    n_words = len(words)
    avg_word_len = (sum(len(w) for w in words) / n_words) if n_words else 0.0
    return {
        "len_chars": len(text),
        "n_words": n_words,
        "avg_word_len": avg_word_len,
        "stopword_ratio": swr,
        "nonascii_ratio": r["nonascii"],
        "digit_ratio": r["digit"],
        "punct_ratio": r["punct"],
        "host_tld": host.split(".")[-1] if host and "." in host else host,
    }

In [23]:
def train_ref_quality_classifier(labeled_rows):
    """
    labeled_rows: list of dicts with keys: content, host, label (0/1)
    """
    X = [featurize_doc(r["content"], r.get("host","")) for r in labeled_rows]
    y = [int(r["label"]) for r in labeled_rows]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y if len(set(y))>1 else None)

    clf = Pipeline([
        ("vec", DictVectorizer(sparse=True)),
        ("lr", LogisticRegression(max_iter=200, n_jobs=None))
    ])
    clf.fit(X_train, y_train)
    preds = clf.predict(X_test)
    print(classification_report(y_test, preds))
    return clf

In [24]:
def apply_ref_quality_filter(in_jsonl: Path, out_jsonl: Path, model_path: Path, threshold: float = 0.5):
    if not ENABLE_REF_QUALITY_FILTER:
        with open(out_jsonl, "w", encoding="utf-8") as out:
            for obj in iter_jsonl(in_jsonl):
                out.write(json.dumps(obj, ensure_ascii=False) + "\n")
        return {"skipped": True}

    if not model_path.exists():
        raise FileNotFoundError(f"Missing ref-quality model at: {model_path}")

    clf = joblib.load(model_path)
    stats = Counter()

    with open(out_jsonl, "w", encoding="utf-8") as out:
        for obj in tqdm(iter_jsonl(in_jsonl), desc="Ref-quality filter"):
            t = obj.get("content", "")
            host = obj.get("host","")
            feats = featurize_doc(t, host)
            proba = float(clf.predict_proba([feats])[0][1])
            obj.setdefault("meta", {})["ref_quality_proba"] = proba

            if proba < threshold:
                stats["drop_ref_quality_low"] += 1
                continue

            out.write(json.dumps(obj, ensure_ascii=False) + "\n")
            stats["kept"] += 1

    return dict(stats)

print("✅ Ref-quality stage ready. To use it:")
print("1) Create labeled_rows (manual or weak labels)")
print("2) clf = train_ref_quality_classifier(labeled_rows)")
print("3) joblib.dump(clf, REF_QUALITY_MODEL_PATH)")
print("4) ENABLE_REF_QUALITY_FILTER=True and run apply_ref_quality_filter(...)")

✅ Ref-quality stage ready. To use it:
1) Create labeled_rows (manual or weak labels)
2) clf = train_ref_quality_classifier(labeled_rows)
3) joblib.dump(clf, REF_QUALITY_MODEL_PATH)
4) ENABLE_REF_QUALITY_FILTER=True and run apply_ref_quality_filter(...)


## 15. Near-duplicate clustering

In [25]:
from datasketch import MinHash, MinHashLSH

def load_docs(jsonl_path: Path, limit=None, seed=42):
    docs = list(iter_jsonl(jsonl_path))
    if limit is not None and len(docs) > limit:
        rng = random.Random(seed)
        rng.shuffle(docs)
        docs = docs[:limit]
    return docs

def word_shingles(text: str, k=5):
    words = [w.lower() for w in WORD_RE.findall(text)]
    if len(words) < k:
        return []
    return [" ".join(words[i:i+k]) for i in range(len(words)-k+1)]

def build_minhash(shingles, num_perm=128):
    m = MinHash(num_perm=num_perm)
    for s in shingles:
        m.update(s.encode("utf-8", errors="ignore"))
    return m

class UnionFind:
    def __init__(self, n):
        self.p = list(range(n))
        self.r = [0]*n
    def find(self, x):
        while self.p[x] != x:
            self.p[x] = self.p[self.p[x]]
            x = self.p[x]
        return x
    def union(self, a, b):
        ra, rb = self.find(a), self.find(b)
        if ra == rb:
            return
        if self.r[ra] < self.r[rb]:
            self.p[ra] = rb
        elif self.r[ra] > self.r[rb]:
            self.p[rb] = ra
        else:
            self.p[rb] = ra
            self.r[ra] += 1

def near_duplicate_clusters(jsonl_path: Path):
    if not RUN_NEAR_DUP:
        return {"skipped": True}

    docs = load_docs(jsonl_path, limit=NEAR_DUP_SAMPLE_N, seed=RANDOM_SEED)
    texts = [d.get("content","") for d in docs]
    print("Docs for near-dup:", len(texts))

    lsh = MinHashLSH(threshold=LSH_THRESHOLD, num_perm=MINHASH_NUM_PERM)
    minhashes = []
    for i, t in enumerate(tqdm(texts, desc="Building MinHashes")):
        shingles = word_shingles(t, k=SHINGLE_SIZE)
        mh = build_minhash(shingles, num_perm=MINHASH_NUM_PERM) if shingles else build_minhash([""], num_perm=MINHASH_NUM_PERM)
        minhashes.append(mh)
        lsh.insert(str(i), mh)

    uf = UnionFind(len(texts))
    for i, mh in enumerate(tqdm(minhashes, desc="Query LSH")):
        hits = lsh.query(mh)
        for h in hits:
            j = int(h)
            if i != j:
                uf.union(i, j)

    clusters = defaultdict(list)
    for i in range(len(texts)):
        clusters[uf.find(i)].append(i)

    cluster_sizes = sorted([len(v) for v in clusters.values()], reverse=True)
    big = [v for v in clusters.values() if len(v) >= 2]
    print("Total clusters:", len(clusters))
    print("Clusters size>=2:", len(big))
    print("Top 10 cluster sizes:", cluster_sizes[:10])

    # show a few examples
    for idx, members in enumerate(sorted(big, key=len, reverse=True)[:5], 1):
        print(f"\nCluster {idx} size={len(members)}")
        for m in members[:2]:
            print(" -", docs[m].get("url",""), "| preview:", normalize_ws(texts[m][:180]))

    return {
        "docs_used": len(texts),
        "clusters_total": len(clusters),
        "clusters_ge2": len(big),
        "top_cluster_sizes": cluster_sizes[:10]
    }

near_dup_report = near_duplicate_clusters(DEDUPED_JSONL if not ENABLE_KENLM else FINAL_JSONL)
near_dup_report

Docs for near-dup: 3000


Building MinHashes: 100%|██████████| 3000/3000 [00:17<00:00, 167.66it/s]
Query LSH: 100%|██████████| 3000/3000 [00:00<00:00, 185080.93it/s]


Total clusters: 2998
Clusters size>=2: 2
Top 10 cluster sizes: [2, 2, 1, 1, 1, 1, 1, 1, 1, 1]

Cluster 1 size=2
 - https://www.mjtunes.com/modules/mydownloads/singlefile.php?lid=2887 | preview: Michael Jackson - Who Is The Thriller (Mashup) / Remixes Michael Jackson Music database - MjTunes Listen to MjTunes Radio You can VOTE for and RATE your favorite tracks. Vote only
 - http://www.mjtunes.com/modules/mydownloads/singlefile.php?lid=3218 | preview: Michael Jackson - Earth Song (DJ Gnome Dubstep Remix) / Remixes Michael Jackson Music database - MjTunes Listen to MjTunes Radio You can VOTE for and RATE your favorite tracks. Vot

Cluster 2 size=2
 - https://turbotax.intuit.com/reviews/online/premium/?page=1656 | preview: Skip To Main Content Must file by 3/31. Start for free expand navigation options Expert does your taxes Expert does your taxes Do it yourself Do it yourself We’ll guide you step-by
 - https://turbotax.intuit.com/reviews/online/deluxe/?page=10982 | preview: Skip To Main 

{'docs_used': 3000,
 'clusters_total': 2998,
 'clusters_ge2': 2,
 'top_cluster_sizes': [2, 2, 1, 1, 1, 1, 1, 1, 1, 1]}

## 16. Topic Modeling

In [26]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import NMF

def topic_modeling(jsonl_path: Path):
    if not RUN_TOPIC_MODELING:
        return {"skipped": True}

    docs_tm = load_docs(jsonl_path, limit=TOPIC_SAMPLE_N, seed=RANDOM_SEED)
    texts = [d["content"] for d in docs_tm]
    print("Docs for topic modeling:", len(texts))

    if len(texts) < 20:
        print("Not enough documents to run topic modeling reliably.")
        return {"docs_used": len(texts), "skipped": True, "reason": "too_few_docs"}

    vectorizer = TfidfVectorizer(
        max_features=30000,
        min_df=2,
        max_df=0.9,
        ngram_range=(1,2),
        stop_words="english"
    )
    X = vectorizer.fit_transform(texts)

    nmf = NMF(n_components=N_TOPICS, random_state=RANDOM_SEED)
    W = nmf.fit_transform(X)
    H = nmf.components_
    terms = np.array(vectorizer.get_feature_names_out())

    def top_terms(topic_idx, topn=12):
        idx = np.argsort(H[topic_idx])[::-1][:topn]
        return terms[idx].tolist()

    topic_terms = {f"topic_{i}": top_terms(i, TOP_TERMS) for i in range(N_TOPICS)}
    dominant = W.argmax(axis=1)

    print("\nTopic terms:")
    for k, v in topic_terms.items():
        print(k, ":", v)

    print("\nDominant topic counts:")
    dom_cnt = Counter(dominant.tolist())
    for t, c in dom_cnt.most_common():
        print(f"topic_{t}: {c}")

    return {
        "docs_used": len(texts),
        "n_topics": N_TOPICS,
        "topic_terms": topic_terms,
        "dominant_topic_counts": dict(dom_cnt)
    }

topic_report = topic_modeling(DEDUPED_JSONL if not ENABLE_KENLM else FINAL_JSONL)
topic_report

Docs for topic modeling: 5000

Topic terms:
topic_0 : ['services', 'business', 'health', 'research', 'school', 'management', 'university', 'education', 'data', 'information', 'news', 'service']
topic_1 : ['november', 'april', 'january', 'october', 'september', 'june', 'december', 'march', 'july', 'february', 'august', '2019']
topic_2 : ['cart', 'shop', 'accessories', 'products', '00', 'product', 'usd', 'shipping', 'size', 'sale', 'add', '99']
topic_3 : ['function', 'var', 'return', 'data', 'function var', 'document', 'function return', 'window', 'jquery', 'null', 'length', 'const']
topic_4 : ['like', 'just', 'time', 'people', 'best', 'love', 'game', 'life', 'day', 'book', 'know', 'new']
topic_5 : ['00', 'pm', '00 pm', '12', '11', '10', '00 00', 'views', '30', '2024', '14', '20']
topic_6 : ['islands', 'republic', 'guinea', 'saint', 'united', 'eur', 'usd', 'south', 'island', 'arab', 'congo', 'french']
topic_7 : ['search', 'password', 'cookies', 'privacy', 'forum', 'email', 'policy', 'log

{'docs_used': 5000,
 'n_topics': 8,
 'topic_terms': {'topic_0': ['services',
   'business',
   'health',
   'research',
   'school',
   'management',
   'university',
   'education',
   'data',
   'information',
   'news',
   'service'],
  'topic_1': ['november',
   'april',
   'january',
   'october',
   'september',
   'june',
   'december',
   'march',
   'july',
   'february',
   'august',
   '2019'],
  'topic_2': ['cart',
   'shop',
   'accessories',
   'products',
   '00',
   'product',
   'usd',
   'shipping',
   'size',
   'sale',
   'add',
   '99'],
  'topic_3': ['function',
   'var',
   'return',
   'data',
   'function var',
   'document',
   'function return',
   'window',
   'jquery',
   'null',
   'length',
   'const'],
  'topic_4': ['like',
   'just',
   'time',
   'people',
   'best',
   'love',
   'game',
   'life',
   'day',
   'book',
   'know',
   'new'],
  'topic_5': ['00',
   'pm',
   '00 pm',
   '12',
   '11',
   '10',
   '00 00',
   'views',
   '30',
   '2024',


## 17. Final JSON

In [27]:
post_report = analyze_raw(DEDUPED_JSONL if not ENABLE_KENLM else FINAL_JSONL)

final_report = {
    "input_wet": str(WET_PATH),
    "raw_extract_jsonl": str(RAW_EXTRACT_JSONL),
    "cleaned_jsonl": str(CLEANED_JSONL),
    "line_deduped_jsonl": str(LINEDEDUP_JSONL),
    "deduped_jsonl": str(DEDUPED_JSONL),
    "final_jsonl": str(FINAL_JSONL),
    "raw_report": raw_report,
    "extract_stats": extract_stats,
    "filter_stats": dict(filter_stats),
    "line_dedup_stats": line_dedup_stats,
    "exact_dedup_stats": dict(dedup_stats),
    "near_dup_report": near_dup_report,
    "topic_report": topic_report,
    "post_report": post_report,
    "pagefilter_jsonl": str(PAGEFILTER_JSONL),
    "pagefilter_stats": dict(pagefilter_stats),
    "notes": {
        "tokenization": "NLTK word_tokenize + NLTK stopwords used for stopword_ratio",
        "language_id": "langid (same as v2)",
        "dedup_exact": "md5 of whitespace-normalized content",
        "line_level_dedup": "cross-document doc-frequency line removal (CCNet-like approximation)",
        "kenlm": "optional scoring hook (disabled by default)",
        "ref_quality": "optional classifier proxy (disabled by default)",
        "pagefilter": "cookie/privacy + ecommerce + js template page-type filters (conservative)"
    }
}

with open(REPORT_JSON, "w", encoding="utf-8") as f:
    json.dump(final_report, f, indent=2, ensure_ascii=False)

print("✅ Saved report:", REPORT_JSON)
print("✅ Outputs in:", OUT_DIR.resolve())

✅ Saved report: data-v3.1/wet_report.json
✅ Outputs in: /Users/ostwalaman/Desktop/LLaMA Project/data-v3.1
