In [2]:
import os, getpass, random, json, time
from pathlib import Path
from typing import List, Tuple, Dict, Optional
from groq import Groq


client = Groq(api_key=os.environ["GROQ_API_KEY"])

# Config
MODEL_NAME = "llama-3.1-8b-instant"  # fallback: "llama3-8b-8192"
ROOT = "/Users/kolosus/Downloads/pii_ner_assignment"
TRAIN_OUT = f"{ROOT}/data/train.jsonl"
DEV_OUT = f"{ROOT}/data/dev.jsonl"

TOTAL_TRAIN = 900
TOTAL_DEV = 180
SEED = 13
BATCH_N = 30  # lines per LLM call
TEMPERATURE = 0.8
TOP_P = 0.9

random.seed(SEED)
Path(f"{ROOT}/data").mkdir(parents=True, exist_ok=True)

In [3]:

# Cell 2 — Labels and tag parsing utilities (compute exact spans after removing tags)

ALLOWED_LABELS = {
    "CREDIT_CARD",
    "PHONE",
    "EMAIL",
    "PERSON_NAME",
    "DATE",
    "CITY",
    "LOCATION",
}

# Collapse multiple spaces while building text so offsets are stable and single-spaced
def _emit_char(buf: List[str], c: str, last_was_space: bool) -> bool:
    if c.isspace():
        if not last_was_space and (not buf or buf[-1] != " "):
            buf.append(" ")
        return True
    else:
        buf.append(c)
        return False

def parse_tagged_text(tagged: str) -> Optional[Tuple[str, List[Dict[str, object]]]]:
    # Streaming parser for <LABEL> ... </LABEL> (no nesting/overlap allowed)
    s = tagged.strip()
    i = 0
    out_chars: List[str] = []
    spans: List[Dict[str, object]] = []
    stack: List[Tuple[str, int]] = []  # (LABEL, start_pos_in_out)
    pos = 0
    last_was_space = False

    while i < len(s):
        if s[i] == "<":
            j = s.find(">", i + 1)
            if j == -1:
                return None
            tag = s[i + 1:j].strip()
            is_close = tag.startswith("/")
            label = tag[1:].strip().upper() if is_close else tag.upper()

            if label not in ALLOWED_LABELS:
                return None

            if not is_close:
                # opening
                stack.append((label, pos))
            else:
                # closing
                if not stack or stack[-1][0] != label:
                    return None
                open_label, start_pos = stack.pop()
                spans.append({"start": start_pos, "end": pos, "label": open_label})
            i = j + 1
        else:
            last_was_space = _emit_char(out_chars, s[i], last_was_space)
            if not last_was_space:
                pos += 1
            i += 1

    if stack:
        return None

    text_clean = "".join(out_chars).strip()
    # Basic sanity checks
    for e in spans:
        if e["start"] >= e["end"]:
            return None
        if e["start"] < 0 or e["end"] > len(text_clean):
            return None

    return text_clean, spans

In [5]:
# Cell 3 — Prompt templates (inline tags, single-space, no punctuation)

SYSTEM_PROMPT = (
    "You generate synthetic training data for PII NER on noisy STT transcripts.\n"
    "Rules:\n"
    "- Use only lowercase. No punctuation. Words separated by single spaces.\n"
    "- STT style: emails use 'first dot last at domain dot com'; numbers may be spelled out; 'oh' for zero; allow 'double nine'.\n"
    "- Allowed entity labels as inline tags: <CREDIT_CARD>..</CREDIT_CARD>, <PHONE>..</PHONE>, <EMAIL>..</EMAIL>, "
    "<PERSON_NAME>..</PERSON_NAME>, <DATE>..</DATE>, <CITY>..</CITY>, <LOCATION>..</LOCATION>.\n"
    "- 1 to 3 entities per line; include about 10 to 15 percent lines with zero entities (no tags).\n"
    "- Output exactly N lines; each line is a single tagged transcript. No JSON, no extra text."
)

def user_prompt(n: int) -> str:
    return (
        f"Output exactly {n} lines.\n"
        "Each line is a single transcript string with inline entity tags from the allowed set.\n"
        "Keep everything lowercase, single spaces, and no punctuation."
    )

In [6]:
# Cell 4 — Groq batch generation and parsing

def generate_tagged_batch(n: int) -> List[str]:
    resp = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_prompt(n)},
        ],
        temperature=TEMPERATURE,
        top_p=TOP_P,
        max_tokens=4000,
        stream=False,
    )
    content = resp.choices[0].message.content or ""
    # Split into non-empty lines
    lines = [ln.strip() for ln in content.splitlines() if ln.strip()]
    # If over/under-produced, trim or discard extras
    if len(lines) > n:
        lines = lines[:n]
    return lines

def build_examples_from_tagged(tagged_lines: List[str]) -> List[Dict[str, object]]:
    examples = []
    for tl in tagged_lines:
        parsed = parse_tagged_text(tl)
        if parsed is None:
            continue
        text, ents = parsed
        # Filter trivial issues
        if not (10 <= len(text) <= 300):
            continue
        # Keep 0–3 entities per line (some negatives are desired)
        if len(ents) > 3:
            continue
        examples.append({"text": text, "entities": ents})
    return examples

In [7]:
# Cell 5 — Generate target counts and write JSONL

def generate_dataset(total_needed: int) -> List[Dict[str, object]]:
    pool: List[Dict[str, object]] = []
    seen_texts = set()
    max_calls = (total_needed // BATCH_N) + 20  # cushion
    calls = 0
    while len(pool) < total_needed and calls < max_calls:
        calls += 1
        batch = generate_tagged_batch(BATCH_N)
        parsed = build_examples_from_tagged(batch)
        for ex in parsed:
            t = ex["text"]
            if t in seen_texts:
                continue
            seen_texts.add(t)
            pool.append(ex)
        # light backoff if under-producing valid examples
        if len(parsed) < int(0.6 * BATCH_N):
            time.sleep(0.5)
    return pool[:total_needed]

def save_jsonl(path: str, rows: List[Dict[str, object]]):
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

def attach_ids(rows: List[Dict[str, object]], start_idx: int = 0) -> List[Dict[str, object]]:
    out = []
    for i, r in enumerate(rows, start=start_idx):
        out.append({"id": f"utt_{i:05d}", "text": r["text"], "entities": r["entities"]})
    return out

# Generate
total_all = TOTAL_TRAIN + TOTAL_DEV
raw = generate_dataset(total_all)

# Split and save
train_rows = attach_ids(raw[:TOTAL_TRAIN], start_idx=0)
dev_rows = attach_ids(raw[TOTAL_TRAIN:], start_idx=TOTAL_TRAIN)

save_jsonl(TRAIN_OUT, train_rows)
save_jsonl(DEV_OUT, dev_rows)

print(f"Wrote {len(train_rows)} to {TRAIN_OUT}")
print(f"Wrote {len(dev_rows)} to {DEV_OUT}")

APIConnectionError: Connection error.

In [8]:
# Cell 6 — Quick QA: label counts and sample preview

from collections import Counter
lab_counter = Counter()
for r in train_rows + dev_rows:
    for e in r["entities"]:
        lab_counter[e["label"]] += 1

print("Entity counts:", dict(lab_counter))
print("Train/dev sizes:", len(train_rows), len(dev_rows))

# Show a couple of samples
for r in (train_rows[:2] + dev_rows[:2]):
    print(json.dumps(r, ensure_ascii=False))

NameError: name 'train_rows' is not defined

In [14]:
from pathlib import Path

OUT_DIR = Path('/Users/kolosus/Downloads/pii_ner_assignment/models')
DATA_DIR = Path('data/')
models = [
    {"name": "microsoft/MiniLM-L6-H384-uncased", "out_dir": str(OUT_DIR / "minilm_l6h384")},
    {"name": "google/electra-small-discriminator", "out_dir": str(OUT_DIR / "electra_small")},
    {"name": "google/mobilebert-uncased", "out_dir": str(OUT_DIR / "mobilebert")},
]
common = {
    "train": str(DATA_DIR / "train.jsonl"),
    "dev": str(DATA_DIR / "dev.jsonl"),
    "epochs": 3,
    "batch_size": 16,
    "lr": 5e-5,
    "max_length": 128,   # tighter for latency
}

In [18]:
import subprocess

def run_cmd(cmd):
    """Run a shell command and return the result."""
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Error running command: {cmd}")
        print(f"Error output: {result.stderr}")
    else:
        print(f"Successfully ran: {cmd}")
        if result.stdout:
            print(f"Output: {result.stdout}")
    return result

for m in models:
    cmd = f"""
    python {'src/train.py'} \
      --model_name {m['name']} \
      --train {common['train']} \
      --dev {common['dev']} \
      --out_dir {m['out_dir']} \
      --batch_size {common['batch_size']} \
      --epochs {common['epochs']} \
      --lr {common['lr']} \
      --max_length {common['max_length']}
    """.strip()
    _ = run_cmd(cmd)

Error running command: python src/train.py       --model_name microsoft/MiniLM-L6-H384-uncased       --train data/train.jsonl       --dev data/dev.jsonl       --out_dir /Users/kolosus/Downloads/pii_ner_assignment/models/minilm_l6h384       --batch_size 16       --epochs 3       --lr 5e-05       --max_length 128
Error output: /bin/sh: python: command not found

Error running command: python src/train.py       --model_name google/electra-small-discriminator       --train data/train.jsonl       --dev data/dev.jsonl       --out_dir /Users/kolosus/Downloads/pii_ner_assignment/models/electra_small       --batch_size 16       --epochs 3       --lr 5e-05       --max_length 128
Error output: /bin/sh: python: command not found

Error running command: python src/train.py       --model_name google/mobilebert-uncased       --train data/train.jsonl       --dev data/dev.jsonl       --out_dir /Users/kolosus/Downloads/pii_ner_assignment/models/mobilebert       --batch_size 16       --epochs 3       --l