In [None]:
# ------------------------- Install Dependencies -------------------------
!pip install -q transformers sentence-transformers language_tool_python

# Upgrade Java (required for grammar checker)
!sudo apt-get remove openjdk-11-* -y
!sudo apt-get update
!sudo apt install openjdk-17-jdk -y

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
!java -version

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.5/55.5 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.6/55.6 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m38.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m32.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m46.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os, zipfile, shutil, re
from pathlib import Path
from google.colab import drive
import language_tool_python
from sentence_transformers import SentenceTransformer, util
import torch
import html
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from IPython.display import display, clear_output
import ipywidgets as widgets
import nltk
import traceback
import gc


In [None]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ------------------------- Paths & Config -------------------------
BASE_PARAPHRASES_DIR = Path("/content/drive/MyDrive/THESIS/LLM_PARAPHRASES_SANITIZED/PARAPHRASES")
WORKING_DIR = Path("/content/drive/MyDrive/THESIS/LLM_PARAPHRASES_SANITIZED/WORKING")
DONE_DIR = Path("/content/drive/MyDrive/THESIS/LLM_PARAPHRASES_SANITIZED/DONE")
ORIGINAL_ZIP = Path("/content/drive/MyDrive/THESIS/bankruptcies-audit-opinion.zip")
ORIGINAL_EXTRACT = Path("/content/original_audits")
TEMP_CLEANED_DIR = Path("/content/temp_cleaned")

token_drop_threshold = 150
max_perplexity = 1000.0
max_grammar_errors = 5
min_similarity = 0.7

tool = language_tool_python.LanguageTool('en-US')
gpt2_tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
gpt2_model = GPT2LMHeadModel.from_pretrained('gpt2').eval().to('cuda' if torch.cuda.is_available() else 'cpu')
embed_model = SentenceTransformer('all-MiniLM-L6-v2')



In [None]:
# Problematic expressions
bad_word_families = [
    (re.compile(r'\bparaphras\w*\b', re.IGNORECASE), "paraphrase word family"),
    (re.compile(r'\brephras\w*\b', re.IGNORECASE), "rephrase word family"),
    (re.compile(r'\bcaret\w*\b', re.IGNORECASE), "caret word family"),
    (re.compile(r'\basterisk\w*\b', re.IGNORECASE), "asterisk word family"),
    (re.compile(r'\brevis\w*\b', re.IGNORECASE), "revise word family"),
    (re.compile(r'\bvers\w*\b', re.IGNORECASE), "version word family"),
    (re.compile(r'\bmean\w*\b', re.IGNORECASE), "meaning word family"),
    (re.compile(r'machine learning', re.IGNORECASE), "'machine learning'"),
    (re.compile(r'\bAI\b', re.IGNORECASE), "'AI'"),
    (re.compile(r'\btrain\w*\b', re.IGNORECASE), "train word family")
]


In [None]:
# ------------------------ Utility Functions ------------------------

# Load sentence transformer model once (you can replace this with your desired model)
similarity_model = SentenceTransformer("all-MiniLM-L6-v2")

def compute_similarity(a, b):
    emb_a = similarity_model.encode(a, convert_to_tensor=True)
    emb_b = similarity_model.encode(b, convert_to_tensor=True)
    return util.pytorch_cos_sim(emb_a, emb_b).item()

def sanitize_text(text):
    if not isinstance(text, str):
        text = str(text)
    return ''.join(ch for ch in text if ch.isprintable())


def compute_perplexity(text):
    encodings = gpt2_tokenizer(text, return_tensors="pt", truncation=True, max_length=1024)
    input_ids = encodings.input_ids.to(gpt2_model.device)
    with torch.no_grad():
        outputs = gpt2_model(input_ids, labels=input_ids)
        loss = outputs.loss
    return torch.exp(loss).item()

def is_valid_input(text):
    try:
        _ = gpt2_tokenizer(text, return_tensors="pt", truncation=True, max_length=1024)
        return True
    except Exception as e:
        print(f"[Invalid text] {text[:60]}... — {e}")
        return False



def safe_compute_perplexity(text):
    text = sanitize_text(text)
    try:
        return compute_perplexity(text)
    except Exception as e:
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            gc.collect()
        print(f"[❌] Perplexity failed for input: {repr(text[:60])}... — {e}")
        return None



def compute_grammar_errors(text):
    return len(tool.check(text))

def safe_compute_similarity(a, b):
    a = sanitize_text(a)
    b = sanitize_text(b)
    try:
        return compute_similarity(a, b)
    except Exception as e:
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            gc.collect()
        print(f"[❌] Similarity failed for inputs:\nA: {repr(a[:60])}\nB: {repr(b[:60])}\n— {e}")
        return None

def extract_originals():
    if not ORIGINAL_EXTRACT.exists():
        ORIGINAL_EXTRACT.mkdir()
        with zipfile.ZipFile(ORIGINAL_ZIP, 'r') as z:
            z.extractall(ORIGINAL_EXTRACT)

def load_original(year, cik):
    extract_originals()
    path = ORIGINAL_EXTRACT / "AUDIT_OPINION_ITEM_7_ONLY_YES_BANKRUPTCY/YES_BANKRUPTCY" / str(year) / str(cik) / "audit_opinion.txt"
    return path.read_text(errors='ignore') if path.exists() else None

def clean_symbols_from_text(text):
    return text.replace('*', '').replace('^', '')

In [None]:
# ------------------------- Auto-Clean Phase -------------------------
def auto_clean_symbols(year, model):
    src_zip = BASE_PARAPHRASES_DIR / f"{year}_{model}_ALL_PARAPHRASED.zip"
    extract_dir = WORKING_DIR / f"{year}_{model}_ALL_PARAPHRASED"
    cleaned_dir = TEMP_CLEANED_DIR / f"{year}_{model}_CLEANED"
    if cleaned_dir.exists(): shutil.rmtree(cleaned_dir)
    cleaned_dir.mkdir(parents=True, exist_ok=True)

    if not extract_dir.exists():
        with zipfile.ZipFile(src_zip, 'r') as z:
            z.extractall(extract_dir)

    cleaned_count = 0

    for cik_dir in extract_dir.iterdir():
        if not cik_dir.is_dir(): continue
        cleaned_subdir = cleaned_dir / cik_dir.name
        cleaned_subdir.mkdir(parents=True, exist_ok=True)
        for txt_file in cik_dir.glob("*.txt"):
            text = txt_file.read_text(errors='ignore')
            cleaned_text = text.replace("*", "").replace("^", "")
            if cleaned_text != text:
                cleaned_count += 1
            (cleaned_subdir / txt_file.name).write_text(cleaned_text)

    print(f"🧼 Auto-clean complete: {cleaned_count} files had '*' or '^' removed.")
    return cleaned_dir

In [None]:
# ------------------------ Main Logic ------------------------
def clean_and_prepare(year, model):
    zip_path = BASE_PARAPHRASES_DIR / f"{year}_{model}_ALL_PARAPHRASED.zip"
    extract_dir = WORKING_DIR / f"{year}_{model}_ALL_PARAPHRASED"

    if extract_dir.exists(): shutil.rmtree(extract_dir)
    with zipfile.ZipFile(zip_path, 'r') as z:
        z.extractall(extract_dir)

    if TEMP_CLEANED_DIR.exists(): shutil.rmtree(TEMP_CLEANED_DIR)
    shutil.copytree(extract_dir, TEMP_CLEANED_DIR)

    cleaned_count = 0
    for cik_dir in TEMP_CLEANED_DIR.iterdir():
        if not cik_dir.is_dir(): continue
        for pf_file in cik_dir.glob("paraphrase_*.txt"):
            content = pf_file.read_text(errors='ignore')
            cleaned = clean_symbols_from_text(content)
            if content != cleaned:
                pf_file.write_text(cleaned)
                cleaned_count += 1
    print(f"\n🧼 Auto-clean complete: {cleaned_count} files had '*' or '^' removed.")


def find_problem_cases(year, model):
    extract_dir = TEMP_CLEANED_DIR
    cases = []

    for cik_dir in extract_dir.iterdir():
        if not cik_dir.is_dir():
            continue

        orig = load_original(year, cik_dir.name)
        if not orig:
            continue

        orig_len = len(orig.split())

        for pf_file in cik_dir.glob("paraphrase_*.txt"):
            text = pf_file.read_text(errors='ignore')
            reason = None

            # 🚩 Check for bad word families
            for pattern, label in bad_word_families:
                if pattern.search(text):
                    reason = f"🚩 {label} found"
                    break  # No need to check other issues if it's already flagged

            # ⚠️ Length check
            if not reason and (orig_len - len(text.split()) >= token_drop_threshold):
                reason = "⚠️ Length discrepancy detected"

            # ⚠️ NLP checks (safe)
            if not reason:
                perp = safe_compute_perplexity(text)
                gram = compute_grammar_errors(text)
                sim = safe_compute_similarity(orig, text)

                if perp is None:
                    reason = "⚠️ Perplexity calculation failed"
                elif sim is None:
                    reason = "⚠️ Similarity calculation failed"
                elif perp > max_perplexity:
                    reason = f"⚠️ High perplexity ({perp:.1f})"
                elif gram > max_grammar_errors:
                    reason = f"⚠️ Grammar issues ({gram})"
                elif sim < min_similarity:
                    reason = f"⚠️ Low semantic similarity ({sim:.2f})"

            if reason:
                cases.append({
                    'year': year,
                    'model': model,
                    'cik': cik_dir.name,
                    'idx': pf_file.name.split('_')[-1][0],
                    'path': pf_file,
                    'orig': orig,
                    'text': text,
                    'reason': reason
                })

    print(f"🔍 Found {len(cases)} problematic paraphrases.")
    return cases

In [None]:
# ----------- HTML Highlighter Function -----------
def highlight_bad_words_html(text):
    """Return HTML-formatted text with yellow highlights on bad word family matches."""
    escaped_text = html.escape(text)  # avoid XSS or HTML breakage
    for pattern, label in bad_word_families:
        def replacer(match):
            return f'<span style="background-color: yellow;" title="{label}">{html.escape(match.group(0))}</span>'
        escaped_text = pattern.sub(replacer, escaped_text)
    return escaped_text.replace('\n', '<br>')


# ----------- Updated Interactive Editor UI -----------
def interactive_editor(problem_cases):
    idx = 0
    total = len(problem_cases)
    mods = []
    out = widgets.Output()

    # Widgets
    textarea = widgets.Textarea(layout=widgets.Layout(width='100%', height='150px'))
    html_view = widgets.HTML(layout=widgets.Layout(width='100%', height='150px', overflow='auto', border='1px solid gray', padding='5px'))

    btn_prev = widgets.Button(description="Previous")
    btn_save = widgets.Button(description="Save")
    btn_delete = widgets.Button(description="Delete")
    btn_next = widgets.Button(description="Next")

    # UI updater

    def show_case(i):
        clear_output(wait=True)
        c = problem_cases[i]
        print(f"{i+1}/{total} | {c['year']}-{c['model']} | CIK {c['cik']} | paraphrase_{c['idx']}.txt")
        print(c['reason'])
        print("\n--- Original (first 300 chars) ---\n", c['orig'][:300])

        textarea.value = c['text']

        children = []

        if c['reason'].startswith("🚩"):
            # Only show highlight preview if it's a bad word issue
            html_view.value = highlight_bad_words_html(c['text'])
            children.append(widgets.Label("Highlighted Preview (bad words only):"))
            children.append(html_view)

        children.append(widgets.Label("Editable Text:"))
        children.append(textarea)
        children.append(widgets.HBox([btn_prev, btn_save, btn_delete, btn_next]))
        children.append(out)

        display(widgets.VBox(children))

    # Button callbacks
    def on_save(b):
        c = problem_cases[idx]
        with open(c['path'], 'w') as f:
            f.write(textarea.value)
        # Update current case's text and refresh preview
        problem_cases[idx]['text'] = textarea.value
        html_view.value = highlight_bad_words_html(textarea.value)
        mods.append(f"✅ Modified: {c['cik']} paraphrase_{c['idx']}")
        with out: print("Saved.")

    def on_delete(b):
        c = problem_cases[idx]
        if c['path'].exists():
            c['path'].unlink()
            mods.append(f"❌ Deleted: {c['cik']} paraphrase_{c['idx']}")
        with out: print("Deleted.")

    def on_next(b):
        nonlocal idx
        if idx < total - 1:
            idx += 1
            show_case(idx)
        else:
            clear_output()
            print("✅ Done reviewing!")
            if mods:
                print("\n===== MODIFICATIONS SUMMARY =====\n", "\n".join(mods))

    def on_prev(b):
        nonlocal idx
        if idx > 0:
            idx -= 1
            show_case(idx)

    # Event bindings
    btn_save.on_click(on_save)
    btn_delete.on_click(on_delete)
    btn_next.on_click(on_next)
    btn_prev.on_click(on_prev)

    # Initial display
    show_case(idx)

In [None]:
def zip_edited(src_dir, year, model):


    DONE_DIR = Path("/content/drive/MyDrive/THESIS/LLM_PARAPHRASES_SANITIZED/DONE")
    dst = DONE_DIR / f"{year}_{model}_ALL_PARAPHRASED.zip"
    if dst.exists():
        dst.unlink()
    shutil.make_archive(str(dst.with_suffix('')), 'zip', root_dir=src_dir)
    print("📦 Zipped edited folder to", dst)


In [None]:
# ------------------------ RUN ------------------------
year = 2012  # Change as needed
model = "MISTRAL"  # Change as needed

clean_and_prepare(year, model)
cases = find_problem_cases(year, model)
if cases:
    interactive_editor(cases)
else:
    print("✅ All paraphrases look good!")

✅ Done reviewing!


In [None]:
zip_edited(Path("/content/temp_cleaned"), year, model)


📦 Zipped edited folder to /content/drive/MyDrive/THESIS/LLM_PARAPHRASES_SANITIZED/DONE/2012_MISTRAL_ALL_PARAPHRASED.zip
