In [1]:
# Cell 1 — Imports & config
import re
import math
import json
import pickle
import random
import logging
from pathlib import Path
from collections import Counter, defaultdict
from typing import List, Tuple, Union
from IPython.display import display, Markdown

# optional progress bar
try:
    from tqdm.auto import tqdm
except Exception:
    tqdm = None

# reproducibility/logging
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
logging.basicConfig(level=logging.INFO)


In [2]:
# Cell 2 — NLTK vocab load (safe)
import nltk
try:
    _ = nltk.corpus.words.words()
except Exception:
    nltk.download("words")

from nltk.corpus import words as nltk_words
ENGLISH_VOCAB = set(w.lower() for w in nltk_words.words())
print("Loaded English vocab size:", len(ENGLISH_VOCAB))


Loaded English vocab size: 234377


In [3]:
# Cell 3 — Tokenizer + leet normalization + helpers
LEET_MAP = str.maketrans({
    '0': 'o', '1': 'l', '3': 'e', '4': 'a', '5': 's', '7': 't', '@': 'a', '$': 's', '!': 'i'
})

def leet_normalize(s: str) -> str:
    return s.translate(LEET_MAP).lower()

def classify_run(r: str, do_leet: bool = False, use_vocab: bool = True):
    if r.isdigit():
        return "DIGITS", r, f"DIGITS{len(r)}"
    if r.isalpha():
        token_norm = r.lower()
        if do_leet:
            token_norm = leet_normalize(token_norm)
        if use_vocab and len(token_norm) >= 3 and token_norm in ENGLISH_VOCAB:
            return "WORD", token_norm, f"WORD{len(token_norm)}"
        else:
            return "FRAG", token_norm, "FRAG"
    return "SYMBOL", r, "SYMBOL"

def tokenize(password: str, do_leet: bool = False, use_vocab: bool = True) -> Tuple[List[str], str]:
    pw = password.strip()
    runs = re.findall(r'[A-Za-z]+|\d+|[^A-Za-z\d]+', pw)
    tokens_for_counts = []
    template_parts = []
    for r in runs:
        slot, token_for_counts, tpl = classify_run(r, do_leet=do_leet, use_vocab=use_vocab)
        tokens_for_counts.append(token_for_counts)
        template_parts.append(tpl)
    template = "|".join(template_parts)
    return tokens_for_counts, template


In [4]:
# Cell 4 — PCFGLite class (improved, trim helper, streaming fit)
class PCFGLite:
    def __init__(self, alpha: float = 1.0, do_leet: bool = False):
        self.template_counts = Counter()
        self.slot_counts = defaultdict(Counter)
        self.total_templates = 0
        self.alpha = float(alpha)
        self.do_leet = do_leet

    def __repr__(self):
        return f"PCFGLite(total_templates={self.total_templates}, unique_templates={len(self.template_counts)})"

    def trim_slot_counts(self, top_n=100000):
        for s in list(self.slot_counts.keys()):
            self.slot_counts[s] = Counter(dict(self.slot_counts[s].most_common(top_n)))

    def fit_list(self, pw_list, max_samples: int = None, verbose: bool = True, use_vocab: bool = True, trim_top_n: int = None):
        for i, pw in enumerate(pw_list):
            if max_samples and i >= max_samples:
                break
            if not pw:
                continue
            tokens, template = tokenize(pw, do_leet=self.do_leet, use_vocab=use_vocab)
            self.template_counts[template] += 1
            self.total_templates += 1

            runs = re.findall(r'[A-Za-z]+|\d+|[^A-Za-z\d]+', pw)
            for r in runs:
                slot, token_for_counts, _ = classify_run(r, do_leet=self.do_leet, use_vocab=use_vocab)
                self.slot_counts[slot][token_for_counts] += 1

            

            if trim_top_n and (i + 1) % 500000 == 0:
                for s in list(self.slot_counts.keys()):
                    self.slot_counts[s] = Counter(dict(self.slot_counts[s].most_common(trim_top_n)))

        if verbose:
            display(Markdown(f"**Trained on {self.total_templates} templates. Unique templates: {len(self.template_counts)}**"))

    def fit_file(self, filepath: str, max_lines: int = None, use_vocab: bool = True, show_progress: bool = True, trim_top_n: int = None):
        p = Path(filepath)
        if not p.exists():
            raise FileNotFoundError(filepath)

        def iter_lines():
            with p.open("r", encoding="latin-1", errors="ignore") as f:
                for i, line in enumerate(f):
                    if max_lines and i >= max_lines:
                        break
                    yield line.rstrip("\n\r")

        it = iter_lines()
        if show_progress and tqdm is not None and max_lines is None:
            it = tqdm(it, desc=f"Reading {p.name}")
        self.fit_list(it, max_samples=None, verbose=True, use_vocab=use_vocab, trim_top_n=trim_top_n)

    def template_prob(self, template: str) -> float:
        V = len(self.template_counts)
        return (self.template_counts[template] + self.alpha) / (self.total_templates + self.alpha * (V + 1))

    def slot_token_prob(self, slot_type: str, token: str) -> float:
        counter = self.slot_counts.get(slot_type, Counter())
        total = sum(counter.values())
        V = len(counter)
        return (counter[token] + self.alpha) / (total + self.alpha * (V + 1))

    def score(self, password: str, use_vocab: bool = True) -> float:
        tokens, template = tokenize(password, do_leet=self.do_leet, use_vocab=use_vocab)
        logp = math.log(self.template_prob(template))
        runs = re.findall(r'[A-Za-z]+|\d+|[^A-Za-z\d]+', password)
        for r in runs:
            slot, token_for_counts, _ = classify_run(r, do_leet=self.do_leet, use_vocab=use_vocab)
            p = self.slot_token_prob(slot, token_for_counts)
            logp += math.log(p)
        return logp

    def top_templates(self, n=30):
        return self.template_counts.most_common(n)

    def top_tokens(self, slot_type: str, n=30):
        return self.slot_counts.get(slot_type, Counter()).most_common(n)

    def snapshot(self, top_templates_n=200, top_words_n=500, top_digits_n=200):
        return {
            "total_templates": self.total_templates,
            "unique_templates": len(self.template_counts),
            "top_templates": self.top_templates(top_templates_n),
            "top_words": self.top_tokens("WORD", top_words_n),
            "top_digits": self.top_tokens("DIGITS", top_digits_n),
        }


In [5]:
# Cell 5 — Save / Load helpers (state-only by default)
def save_model(path: Union[str, Path], model: PCFGLite, state_only: bool = True):
    p = Path(path)
    if state_only:
        data = {
            "template_counts": dict(model.template_counts),
            "slot_counts": {k: dict(v) for k, v in model.slot_counts.items()},
            "total_templates": int(model.total_templates),
            "alpha": float(model.alpha),
            "do_leet": bool(model.do_leet),
        }
        with p.open("wb") as f:
            pickle.dump({"__pcfg_state_v1": True, "data": data}, f, protocol=pickle.HIGHEST_PROTOCOL)
    else:
        with p.open("wb") as f:
            pickle.dump(model, f, protocol=pickle.HIGHEST_PROTOCOL)
    logging.info(f"Saved model to {p.resolve()} (state_only={state_only})")

def load_model(path: Union[str, Path], state_only: bool = True) -> PCFGLite:
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(p)
    with p.open("rb") as f:
        data = pickle.load(f)

    if state_only:
        if isinstance(data, dict) and data.get("__pcfg_state_v1"):
            st = data["data"]
        else:
            st = data
        model = PCFGLite(alpha=st.get("alpha", 1.0), do_leet=st.get("do_leet", False))
        model.template_counts = Counter(st.get("template_counts", {}))
        model.slot_counts = defaultdict(Counter, {k: Counter(v) for k, v in st.get("slot_counts", {}).items()})
        model.total_templates = int(st.get("total_templates", 0))
        return model
    else:
        obj = data
        if not isinstance(obj, PCFGLite):
            raise TypeError("Unpickled object is not a PCFGLite instance.")
        return obj


In [6]:
# Cell 6 — Demo training & save (small sample fallback)
DATA_PATH = Path("Data-Breach/rockyou.txt")
sample_pw = ["password", "passw0rd", "P@ssw0rd", "john1987", "superman2020!", "aaaa1111bbbb"]

# Train model on either large file or sample
model = PCFGLite(alpha=1.0, do_leet=True)
if DATA_PATH.exists():
    # logging.info(f"Training model on {DATA_PATH} (full). This may take a while.")
    model.fit_file(str(DATA_PATH), max_lines=None, use_vocab=True, show_progress=False, trim_top_n=250000)
else:
    logging.info("DATA_PATH not found; training on small sample_pw for demo.")
    model.fit_list(sample_pw, verbose=False)

# Save state-only after training
save_model("pcfg_model_all_state.pkl", model, state_only=True)

# quick inspect
print("Top templates:", model.top_templates(5))
print("Top words:", model.top_tokens("WORD", 10))


**Trained on 14344390 templates. Unique templates: 29295**

INFO:root:Saved model to /Users/twochar/vS/Password-Decryption/pcfg_model_all_state.pkl (state_only=True)


Top templates: [('FRAG', 4051390), ('FRAG|DIGITS2', 1440243), ('FRAG|DIGITS1', 886830), ('FRAG|DIGITS4', 755828), ('FRAG|DIGITS3', 515233)]
Top words: [('love', 23062), ('ever', 18744), ('life', 16089), ('eva', 14720), ('yahoo', 9746), ('baby', 8523), ('may', 7791), ('angel', 7092), ('sexy', 6318), ('alex', 5420)]


In [None]:
# Cell 7 — Build model for passwords >= 6 chars, save
MIN_LEN = 6
model_ge6 = PCFGLite(alpha=1.0, do_leet=True)

def iter_lines_minlen(path: Union[str, Path], min_len: int = 6, max_lines: int = None):
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(path)
    with p.open("r", encoding="latin-1", errors="ignore") as f:
        for i, line in enumerate(f):
            if max_lines and i >= max_lines:
                break
            pw = line.rstrip("\n\r")
            if not pw:
                continue
            if len(pw) >= min_len:
                yield pw

if DATA_PATH.exists():
    model_ge6.fit_list(iter_lines_minlen(DATA_PATH, min_len=MIN_LEN), max_samples=None, verbose=True, use_vocab=True, trim_top_n=250000)
else:
    model_ge6.fit_list([pw for pw in sample_pw if len(pw) >= MIN_LEN], verbose=True)

save_model("pcfg_model_ge6_state.pkl", model_ge6, state_only=True)
print("Saved ge6 model snapshot. Total ge6 templates:", model_ge6.total_templates)

In [None]:
# Cell 8 — FRAG extraction (streaming) -> TSVs (all + len >=6)
MIN_TOKEN_LEN = 3
MIN_PW_LEN = 6
TOP_K_FRAGS = 200000

OUT_TSV_ALL = Path("frag_tokens_all.tsv")
OUT_TSV_SHORT = Path("frag_tokens_len6.tsv")

frag_counter_all = Counter()
frag_counter_short = Counter()

if DATA_PATH.exists():
    it = (line.rstrip("\n\r") for line in DATA_PATH.open("r", encoding="latin-1", errors="ignore"))
else:
    it = iter(sample_pw)

for i, pw in enumerate(it):
    if not pw:
        continue

    runs = re.findall(r'[A-Za-z]+|\d+|[^A-Za-z\d]+', pw)

    # Collect for "all"
    for r in runs:
        slot, token_for_counts, _ = classify_run(r, do_leet=model.do_leet, use_vocab=True)
        if slot == "FRAG" and isinstance(token_for_counts, str) and len(token_for_counts) >= MIN_TOKEN_LEN:
            frag_counter_all[token_for_counts] += 1
            # Also collect for "short" if pw is >= 6
            if len(pw) >= MIN_PW_LEN:
                frag_counter_short[token_for_counts] += 1

    if (i + 1) % 1_000_000 == 0:
        logging.info(f"Processed {i+1:,} passwords...")

# Write ALL passwords TSV
with OUT_TSV_ALL.open("w", encoding="utf8") as f:
    f.write("token\tcount\n")
    for token, cnt in frag_counter_all.most_common(TOP_K_FRAGS):
        f.write(f"{token}\t{cnt}\n")
logging.info(f"Wrote frag TSV (all, top {TOP_K_FRAGS}) to {OUT_TSV_ALL.resolve()}")

# Write SHORT (>=6) passwords TSV
with OUT_TSV_SHORT.open("w", encoding="utf8") as f:
    f.write("token\tcount\n")
    for token, cnt in frag_counter_short.most_common(TOP_K_FRAGS):
        f.write(f"{token}\t{cnt}\n")
logging.info(f"Wrote frag TSV (len >=6, top {TOP_K_FRAGS}) to {OUT_TSV_SHORT.resolve()}")

In [None]:
# Cell 9 — Snapshot save (bounded sizes)
snap = model.snapshot(top_templates_n=1000, top_words_n=2000, top_digits_n=500)
Path("pcfg_all.json").write_text(json.dumps(snap, ensure_ascii=False))
snap_ge6 = model_ge6.snapshot(top_templates_n=1000, top_words_n=2000, top_digits_n=500)
snap_ge6["filter"] = {"min_len": MIN_LEN}
Path("pcfg_of_len6_or_more.json").write_text(json.dumps(snap_ge6, ensure_ascii=False))
logging.info("Saved snapshots.")

In [None]:
# Cell 10 — Quick smoke-test (save -> load -> score)
save_model("tmp_state.pkl", model, state_only=True)
model2 = load_model("tmp_state.pkl", state_only=True)
assert isinstance(model2, PCFGLite)
print("Roundtrip OK. Example score:", model2.score("P@ssw0rd123!"))


INFO:root:Saved model to /Users/twochar/vS/Password-Decryption/tmp_state.pkl (state_only=True)


Roundtrip OK. Example score: -52.39906723052136
