In [39]:
import json
from pathlib import Path
import stanza
import re
from difflib import get_close_matches
import sys 

sys.path.append(str(Path.cwd().parent))
from utils.regexes import SPEAKER_PAIRS, SPEAKERS, speaker_labels, speaker_labels_restricted

nlp = stanza.Pipeline(lang="en", processors="tokenize,mwt,pos,lemma,ner", verbose=False)

class ErrorDetector:
    """
    Stanza detection only:
      - Print PERSON entities (default)
      - Optionally print ALL entities
      - Optionally print simple Title+Name pairs (POS-based heuristic)
    """
    def __init__(self, text: str, nlp_pipeline):
        self.text = text
        self.doc = nlp_pipeline(text)

    def detect_proper_nouns(self, include_non_person: bool = False, show_title_pairs: bool = True):
        # show all NER entities
        if include_non_person:
            print("All Named Entities (NER):")
            for ent in self.doc.ents:
                print(f"{ent.text:<30} -> {ent.type}")

        # PERSON-only view
        print("\nPERSON entities:")
        person_tags = {"PERSON", "PER"}
        found_person = False
        for ent in self.doc.ents:
            if ent.type in person_tags:
                found_person = True
                print(ent.text)
        if not found_person:
            print("(none)")

        # Lightweight title + name using POS
        if show_title_pairs:
            print("\nTitle + Name pairs (POS-based):")
            title_words = {"Dr.", "Mr.", "Ms.", "Mrs.", "Prof.", "Professor"}
            found_pair = False
            for sent in self.doc.sentences:
                words = sent.words
                for i in range(len(words) - 1):
                    if words[i].text in title_words and words[i + 1].upos == "PROPN":
                        found_pair = True
                        print(f"{words[i].text} {words[i+1].text}")
            if not found_pair:
                print("(none)")

    def process_transcripts(input_folder, only_stem: str | None = None,
                            include_non_person: bool = False, show_title_pairs: bool = True):
        # Resolve and list files
        input_path = Path(input_folder).resolve()
        print(f"[INFO] Input folder: {input_path}")
        files = sorted(input_path.glob("*.txt.json"))
        print(f"[INFO] Found {len(files)} files matching *.txt.json")
        if only_stem:
            files = [f for f in files if f.stem.startswith(only_stem)]
            print(f"[INFO] After filtering by stem '{only_stem}': {len(files)} file(s)")

        if not files:
            print("[WARN] No matching files.")
            return

        # Loop files and print detections
        for file in files:
            print("\n" + "=" * 70)
            print(f"[FILE] {file.name}")
            with open(file, "r", encoding="utf-8") as f:
                data = json.load(f)

            rows = data.get("rows", [])
            if not rows:
                print("[WARN] Skipping: No 'rows' key or it's empty.")
                continue

            text = "\n".join(row.get("content", "") for row in rows)
            det = ErrorDetector(text, nlp)
            det.detect_proper_nouns(include_non_person=include_non_person, show_title_pairs=show_title_pairs)

    def process_string(sample_text: str, include_non_person: bool = True, show_title_pairs: bool = True):
        print("[INFO] Running detection on a literal string…")
        det = ErrorDetector(sample_text, nlp)
        det.detect_proper_nouns(include_non_person=include_non_person, show_title_pairs=show_title_pairs)

    if __name__ == "__main__":
        # Example: adjust to your project layout
        project_root = Path.cwd().parent
        input_folder = project_root / "data" / "s1055-1058" / "REVIEW"

        # Target just one file (two test files. in use are 057_707 and 055_684)
        process_transcripts(
            input_folder=input_folder,
            only_stem="055_684",          # set to None to scan all
            include_non_person=True,       # set False to show only PERSON entities
            show_title_pairs=True
        )

[INFO] Input folder: /Users/admin/Documents/coding_land/HoardingDisorderScripts/data/s1055-1058/REVIEW
[INFO] Found 37 files matching *.txt.json
[INFO] After filtering by stem '055_684': 1 file(s)

[FILE] 055_684.txt.json
All Named Entities (NER):
26:28                          -> CARDINAL
26:42                          -> CARDINAL
26:48                          -> CARDINAL
I’ve                           -> PERSON
27:01                          -> CARDINAL
27:07                          -> CARDINAL
Gotcha                         -> PERSON
two                            -> CARDINAL
27:33                          -> CARDINAL
27:38                          -> CARDINAL
27:41                          -> CARDINAL
28:09                          -> CARDINAL
28:18                          -> CARDINAL
Gotcha                         -> PERSON
28:42                          -> CARDINAL
29:04                          -> CARDINAL
29:12                          -> CARDINAL
29:18                      

In [40]:
sys.path.append(str(Path.cwd().parent))
from utils.regexes import SPEAKER_PAIRS, SPEAKERS, speaker_labels, speaker_labels_restricted

def find_speaker_format_issues(text, speaker_set):
        """
       Method 2.1 Detects speaker label formatting issues:
            1. Speaker label followed by space before colon (e.g., 'Participant :')
            2. Speaker label followed by punctuation or character other than ':' (e.g., 'Participant.', 'Participant-')
        Returns a dictionary with issue types and matching instances.
        """
        issues = {}

        # Pattern 1: Space before colon "Participant :" (colon spacing issue)
        spacing_pattern = re.compile(r'\b(?:' + '|'.join(re.escape(s) for s in speaker_set) + r')\s+:')
        spacing_matches = spacing_pattern.findall(text)
        if spacing_matches:
            issues['spacing_issue'] = spacing_matches

        # Pattern 2: Speaker label followed by something other than colon or space (e.g., 'Participant.' or 'Participant!')
        bad_punct_pattern = re.compile(r'\b(?:' + '|'.join(re.escape(s) for s in speaker_set) + r')[^\s:]')
        punct_matches = bad_punct_pattern.findall(text)
        if punct_matches:
            issues['bad_punctuation'] = punct_matches

        return issues

def find_spelling_variants(text, speaker_set, threshold=0.8):
    '''
    Method 3.1 Finds likely misspellings of speaker labels using fuzzy matching.
    '''
    pattern = re.compile(r'^([A-Z][a-zA-Z0-9_ ]{1,30})(?=\s*:\s*)', re.MULTILINE)
    candidates = pattern.findall(text)

    fuzzy_hits = {}
    for cand in candidates:
        matches = get_close_matches(cand, speaker_set, n=1, cutoff=threshold)
        if matches and matches[0] != cand:
            fuzzy_hits[cand] = matches[0]
    return fuzzy_hits

def find_multi_speaker_lines(text):
    '''
    Method 3.2 Finds lines that contain more than one speaker label.
    '''
    speaker_pattern = r'\b(?:' + '|'.join(re.escape(s) for s in SPEAKERS) + r')\s*:'
    pattern = re.compile(speaker_pattern)
    multi_speaker_lines = []
    for i, line in enumerate(text.splitlines()):
        matches = pattern.findall(line)
        if len(matches) > 1:
            multi_speaker_lines.append((i + 1, line.strip(), matches))
    return multi_speaker_lines


In [41]:
#from pathlib import Path
#import json

# ---- config ----
FILENAME = "057_707.txt.json"   # change if needed

# In a notebook, CWD is usually .../HoardingDisorderScripts/notebooks
project_root = Path.cwd().parent
review_dir = project_root / "data" / "s1055-1058" / "REVIEW"

print(f"[INFO] CWD:        {Path.cwd()}")
print(f"[INFO] Project:    {project_root}")
print(f"[INFO] Review dir: {review_dir}")

if not review_dir.exists():
    raise FileNotFoundError("Review dir not found. Check the path to data/s1055-1058/REVIEW.")

file_path = (review_dir / FILENAME).resolve()
print(f"[INFO] File:       {file_path}")

if not file_path.exists():
    # helpful listing so you can pick the right name
    print("[WARN] File not found. Here are some candidates in REVIEW/:")
    for p in sorted(review_dir.glob("*.txt.json"))[:20]:
        print("  -", p.name)
    raise FileNotFoundError(f"{FILENAME} not found in {review_dir}")

# ---- load JSON ----
with open(file_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# ---- build text ----
text = ""
if isinstance(data.get("full_content"), str) and data["full_content"].strip():
    text = data["full_content"]
elif isinstance(data.get("rows"), list) and data["rows"]:
    text = "\n".join(row.get("content", "") for row in data["rows"] if isinstance(row.get("content"), str))
elif isinstance(data.get("text"), str) and data["text"].strip():
    text = data["text"]

print(f"[INFO] Built text length: {len(text):,} chars")
if not text.strip():
    raise ValueError("Couldn't build text from 'full_content', 'rows', or 'text'.")

# ---- run YOUR existing detectors (they must already be defined/imported) ----
speaker_set = set(SPEAKERS)

fmt_issues   = find_speaker_format_issues(text, speaker_set)
misspellings = find_spelling_variants(text, speaker_set, threshold=0.8)
multi_lines  = find_multi_speaker_lines(text)

total = sum(len(v) for v in fmt_issues.values()) + len(misspellings) + len(multi_lines)

# ---- print summary ----
if total:
    print(f"\nYES — errors found: {total}")

    if fmt_issues.get("spacing_issue"):
        print(f"  • Space-before-colon: {len(fmt_issues['spacing_issue'])}")
        for ln, snip, _ in fmt_issues["spacing_issue"][:10]:
            print(f"      line {ln}: {snip!r}")

    if fmt_issues.get("bad_punctuation"):
        print(f"  • Bad punctuation after label: {len(fmt_issues['bad_punctuation'])}")
        for ln, snip, _ in fmt_issues["bad_punctuation"][:10]:
            print(f"      line {ln}: {snip!r}")

    if misspellings:
        print(f"  • Likely misspellings: {len(misspellings)}")
        for bad, sug in list(misspellings.items())[:10]:
            print(f"      {bad!r} -> {sug!r}")

    if multi_lines:
        print(f"  • Multiple labels on one line: {len(multi_lines)}")
        for ln, line_txt, labels in multi_lines[:5]:
            print(f"      line {ln}: labels={labels} | {line_txt[:120]}")
else:
    print("\nNO ERRORS")


[INFO] CWD:        /Users/admin/Documents/coding_land/HoardingDisorderScripts/notebooks
[INFO] Project:    /Users/admin/Documents/coding_land/HoardingDisorderScripts
[INFO] Review dir: /Users/admin/Documents/coding_land/HoardingDisorderScripts/data/s1055-1058/REVIEW
[INFO] File:       /Users/admin/Documents/coding_land/HoardingDisorderScripts/data/s1055-1058/REVIEW/057_707.txt.json
[INFO] Built text length: 488 chars

NO ERRORS


In [42]:
# Test transcript with all target errors 
test_text = """\
Interviewer: Hi, thanks for joining us.                        # valid
Participant : I'm good, thanks.                                # spacing-before-colon (bad)
Interveiwer: Can you tell me your name?                        # misspelling (bad)
Interviewee. Please describe your symptoms.                    # bad punctuation after label (.)
Speaker- What brings you here today?                           # bad punctuation after label (-)
Interviewr : This has both spelling and spacing issues.        # misspelling + spacing-before-colon (bad)
Interviewer: What's your favorite food? Participant: Pizza.    # multiple labels on one line (bad)
Participant's favorite color is blue.                          # label immediately followed by apostrophe (bad)
Interviewee: All good here.                                    # valid
Interviewee? Could you repeat that?                            # bad punctuation after label (?)
"""

# Use your project’s canonical speakers if available:
try:
    speaker_set = set(SPEAKERS)  # from utils.regexes
except NameError:
    speaker_set = {"Interviewer", "Participant", "Interviewee", "Speaker"}

# --- Run your existing detectors ---
fmt_issues   = find_speaker_format_issues(test_text, speaker_set)
misspellings = find_spelling_variants(test_text, speaker_set, threshold=0.8)  # or 0.85 if you want stricter
multi_lines  = find_multi_speaker_lines(test_text)

total = sum(len(v) for v in fmt_issues.values()) + len(misspellings) + len(multi_lines)

# Print summary
if total:
    print(f"\nYES — errors found: {total}")

    if fmt_issues.get("spacing_issue"):
        print(f"  • Space-before-colon: {len(fmt_issues['spacing_issue'])}")
        for ln, snip, full in fmt_issues["spacing_issue"]:
            print(f"      line {ln}: {snip!r}")

    if fmt_issues.get("bad_punctuation"):
        print(f"  • Bad punctuation after label: {len(fmt_issues['bad_punctuation'])}")
        for ln, snip, full in fmt_issues["bad_punctuation"]:
            print(f"      line {ln}: {snip!r}")

    if misspellings:
        print(f"  • Likely misspellings: {len(misspellings)}")
        for bad, sug in misspellings.items():
            print(f"      {bad!r} -> {sug!r}")

    if multi_lines:
        print(f"  • Multiple labels on one line: {len(multi_lines)}")
        for ln, line_txt, labels in multi_lines:
            print(f"      line {ln}: labels={labels} | {line_txt[:120]}")
else:
    print("\nNO ERRORS")



YES — errors found: 9
  • Space-before-colon: 1


ValueError: too many values to unpack (expected 3)