In [None]:
#!pip install pronouncing
#!pip install phonetics



In [None]:
import os
os.environ["GOOGLE_API_KEY"] = "AIzaSyDIYtI90iXsQDsIOytwdrFBeaIeVEwjnTs"

In [None]:
import pandas as pd

def load_aoa(path="/content/drive/MyDrive/Colab Notebooks/AoA_ratings_Kuperman_et_al_BRM_with_PoS.xlsx"):
    try:
        df = pd.read_excel(path)
        w = next((c for c in df.columns if "word" in c.lower()), "Word")
        r = next((c for c in df.columns if any(k in c.lower() for k in ["aoa","mean","rating"])), "Rating.Mean")
        df = df[[w,r]].dropna()
        df[w] = df[w].astype(str).str.lower()
        print(f"✅ AoA entries loaded: {len(df):,}")
        return dict(zip(df[w], df[r]))
    except Exception as e:
        print(f"⚠️ AoA file not found or error loading: {e} — using fallback.")
        return {} # Return an empty dictionary or None as a fallback

AOA = load_aoa()


def aoa_penalty(word, age):
    val = AOA.get(word.lower())
    if val is None:
        print(f"Word '{word}' has not been found in AoA Dict.")
        return None # Or return a default penalty or value
    try:
        age = float(age)
        return print(f'{age > val} : The word is suitable for age greater than:{val:.2f}')
    except ValueError:
        print(f"Invalid age provided: {age}")
        return None

✅ AoA entries loaded: 31,104


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:


import json, re, time
from typing import Dict, Any, Optional, Tuple

# ---------- Word-wrap helper (≤10 words per line) ----------
def _wrap_by_words(text: str, max_words: int = 10) -> str:
    words = text.split()
    if not words:
        return ""
    lines, cur = [], []
    for w in words:
        cur.append(w)
        if len(cur) >= max_words:
            lines.append(" ".join(cur))
            cur = []
    if cur:
        lines.append(" ".join(cur))
    return "\n".join(lines)

# --- small utility (define if not in scope) ---
def _safe_take_text(gen_response):
    if hasattr(gen_response, "text") and gen_response.text:
        return gen_response.text
    try:
        cands = getattr(gen_response, "candidates", [])
        for c in cands:
            content = getattr(c, "content", None)
            parts = content.get("parts", []) if isinstance(content, dict) else getattr(content, "parts", [])
            for p in parts or []:
                t = getattr(p, "text", None)
                if t: return t
    except Exception:
        pass
    return ""

def _parse_pair_label(lbl: str) -> Tuple[Optional[str], Optional[str], bool, Optional[float]]:
    if not lbl or lbl.strip().lower() == "none":
        return None, None, False, None
    implicit = "[implicit]" in lbl
    m = re.search(r"\(([^,]+),\s*([^)]+)\)", lbl)
    w1, w2 = (m.group(1).strip(), m.group(2).strip()) if m else (None, None)
    ms = re.search(r"similarity:\s*([0-9.]+)", lbl)
    score = float(ms.group(1)) if ms else None
    return w1, w2, implicit, score

# ── LLM prompt for choosing the pair (ignore upstream pairs; pick your own) ───
_CHOOSE_PAIR_INSTRUCTIONS = """
You are assisting a pun/wordplay detector. You will receive:
- The input sentence (text) and age
- The detector's coarse verdict about pun type: one of {"phonetic","semantic","non-joke"}.

Your task:
1) Choose EXACTLY ONE candidate pair using your reasoning (do not rely on earlier candidate strings).
   They must be two different words.
   • If pun_type == "phonetic": you may pick an implicit phonetic partner; it need not both appear in text.
   • If pun_type == "semantic": pick two different words that appear in the given text and are meaning-related for the joke.
2) If pun_type == "non-joke", set chosen_type="none" and return empty pair.
3) Return STRICT JSON (no markdown, no extra text):
{
  "chosen_type": "phonetic" | "semantic" | "none",
  "w1": "<first word or empty>",
  "w2": "<second word or empty>",
  "why": "one short sentence"
}
"""

def llm_choose_pair(text: str, age: float, pun_type_hint: str, retries: int = 1) -> Dict[str, Any]:
    payload = {"text": text, "age": age, "pun_type_hint": pun_type_hint}
    prompt = (
        _CHOOSE_PAIR_INSTRUCTIONS
        + "\n\nINPUT:\n"
        + json.dumps(payload, ensure_ascii=False, indent=2)
        + "\n\nOUTPUT JSON ONLY:"
    )
    last_err = None
    for i in range(retries + 1):
        try:
            r = model_llm.generate_content(prompt)
            ans = _safe_take_text(r).strip()
            ans = re.sub(r'^\s*```(?:json)?\s*', '', ans)
            ans = re.sub(r'\s*```\s*$', '', ans).strip()
            m = re.search(r'\{.*\}', ans, re.DOTALL)
            if m: ans = m.group(0)
            data = json.loads(ans)
            ctype = str(data.get("chosen_type", "none")).lower()
            if ctype not in {"phonetic", "semantic", "none"}:
                ctype = "none"
            w1 = (data.get("w1") or "").strip()
            w2 = (data.get("w2") or "").strip()
            why = (data.get("why") or "").strip()
            if w1 and w2 and w1.lower() == w2.lower():
                ctype, w1, w2 = "none", "", ""
            return {"chosen_type": ctype, "w1": w1, "w2": w2, "why": why}
        except Exception as e:
            last_err = e
            if i < retries and any(x in str(e) for x in ("429", "503")):
                time.sleep(1.5 * (i + 1)); continue
            break
    return {"chosen_type": "none", "w1": "", "w2": "", "why": f"[LLM choose error] {str(last_err)[:160]}"}

# ── LLM QA constrained to SELECTED pair (for follow-ups & uncommon Qs) ───────
_QA_SELECTED_PAIR_INSTRUCTIONS = """
You answer questions about the detected wordplay using ONLY the SELECTED pair and its channel.
Do not invent new pairs. Be concise and plain text.
If asked “what kind of wordplay,” answer with {phonetic|semantic|none} and one short reason.
Do not report any similarity score unless the user explicitly asks for it.
"""

def qa_on_selected(question: str, state, retries: int = 1) -> str:
    sel = state.selected or {}
    blob = {
        "text": state.last_text or "",
        "age": state.last_age,
        "selected_type": sel.get("chosen_type","none"),
        "selected_pair": [sel.get("w1",""), sel.get("w2","")],
        "notes": "Use only the selected pair carried from prior analysis."
    }
    prompt = (
        _QA_SELECTED_PAIR_INSTRUCTIONS
        + "\n\nQUESTION:\n" + question.strip()
        + "\n\nCONTEXT:\n" + json.dumps(blob, ensure_ascii=False, indent=2)
        + "\n\nANSWER (plain text, concise):"
    )
    last_err = None
    for i in range(retries + 1):
        try:
            r = model_llm.generate_content(prompt)
            ans = _safe_take_text(r).strip()
            ans = re.sub(r'^\s*```(?:json)?\s*', '', ans)
            ans = re.sub(r'\s*```\s*$', '', ans).strip()
            return ans
        except Exception as e:
            last_err = e
            if i < retries and any(x in str(e) for x in ("429", "503")):
                time.sleep(1.5 * (i + 1)); continue
            break
    return f"[LLM error] {str(last_err)[:200]}"

# ── Conversation memory ───────────────────────────────────────
class ConversationState:
    def __init__(self, default_age: float = 10):
        self.last_text: Optional[str] = None
        self.last_age: float = float(default_age)
        self.last_feats: Optional[Dict[str,Any]] = None
        self.selected: Dict[str, Any] = {
            "chosen_type": "none",
            "w1": "",
            "w2": "",
            "why": "",
            "similarity": None  # kept for follow-ups, not printed by default
        }

    def update_selection(self, chosen_type: str, w1: str, w2: str, why: str, similarity: Optional[float]):
        self.selected = {"chosen_type": chosen_type, "w1": w1, "w2": w2, "why": why, "similarity": similarity}

    def update_sentence(self, text: str, age: float, feats: Dict[str, Any]):
        self.last_text = text
        self.last_age = float(age)
        self.last_feats = feats

STATE = ConversationState(default_age=10)

# ── Age/sentence extraction helpers ───────────────────────────
_AGE_PAT = re.compile(r"age\s*=\s*(\d{1,3})", re.I)

def _extract_age(user: str, fallback: float) -> float:
    m = _AGE_PAT.search(user)
    if m:
        try: return float(m.group(1))
        except: pass
    return float(fallback)

def _extract_sentence(user: str, last_text: Optional[str]) -> Optional[str]:
    qm = re.findall(r"“([^”]+)”|\"([^\"]+)\"", user)
    for grp in qm:
        inside = grp[0] or grp[1]
        if inside and len(inside.split()) >= 2:
            return inside.strip()
    m = re.search(r"\bis\s+(.+?)\s+a\s+pun\??", user, flags=re.I)
    if m:
        cand = m.group(1).strip()
        if len(cand.split()) >= 2: return cand
    m = re.search(r"(?:^|\b)text\s*:\s*(.+)$", user, flags=re.I)
    if m: return m.group(1).strip()
    if len(user.split()) >= 3 and any(p in user for p in [".", "?", "!", "—", ","]):
        return user.strip()
    return last_text

# ── Follow-up patterns ────────────────────────────────────────
_SIM_PHON_PAT = re.compile(r"phonetic\s+similarity\s+between\s+([a-z]+)\s+and\s+([a-z]+)", re.I)
_SIM_SEM_PAT  = re.compile(r"semantic\s+(?:similarity|distance)\s+between\s+([a-z]+)\s+and\s+([a-z]+)", re.I)

# ── Core analyze+select pipeline (LLM chooses the pair) ───────
def _analyze_and_select(text: str, age: float) -> str:
    feats = extract_features(text, age)
    verdict = analyze_text(text, age)
    pun_type = str(verdict.get("pun_type", "none")).lower()
    if pun_type == "non-joke":
        pun_type = "none"

    choice = llm_choose_pair(text, age, pun_type)
    chosen_type = choice["chosen_type"]
    w1, w2 = choice["w1"], choice["w2"]

    # compute but don't print similarity unless asked later
    similarity = None
    if chosen_type == "phonetic" and w1 and w2:
        similarity = float(phonetic_similarity(w1, w2))
    elif chosen_type == "semantic" and w1 and w2:
        similarity = float(semantic_similarity(text, w1, w2))

    STATE.update_sentence(text, age, feats)
    STATE.update_selection(chosen_type, w1, w2, choice.get("why",""), similarity)

    is_joke = verdict.get("valid_joke", False)
    human = []
    human.append(f"It is {'a pun' if is_joke else 'not a pun'} ({pun_type}).")
    if chosen_type in {"phonetic","semantic"} and w1 and w2:
        chan = "sound" if chosen_type == "phonetic" else "meaning"
        human.append(f"Selected {chosen_type} pair: '{w1}'–'{w2}' (channel: {chan}).")
    if choice.get("why"):
        human.append(choice["why"])
    if verdict.get("humor_reason"):
        human.append(verdict["humor_reason"])
    return " ".join(human).strip()

# ── Router (intent checks BEFORE generic analyze) ─────────────
def dispatch(user_query: str, state: ConversationState) -> str:
    q = user_query.strip()
    ql = q.lower()
    age = _extract_age(q, state.last_age)
    text = _extract_sentence(q, state.last_text)

    # direct similarity queries (explicit ⇒ allowed to print scores)
    m = _SIM_PHON_PAT.search(ql)
    if m:
        w1, w2 = m.group(1), m.group(2)
        return f"Phonetic similarity between '{w1}' and '{w2}' is {phonetic_similarity(w1, w2):.2f}."
    m = _SIM_SEM_PAT.search(ql)
    if m:
        w1, w2 = m.group(1), m.group(2)
        ctx_text = text or state.last_text or f"{w1} … {w2}."
        sim = semantic_similarity(ctx_text, w1, w2)
        return f"Semantic similarity between '{w1}' and '{w2}' is {sim:.2f} (distance {1-sim:.2f})."

    # follow-ups on the SELECTED pair (explicit similarity ⇒ allowed)
    if "semantic distance between the chosen pair" in ql or "semantic similarity of the chosen pair" in ql:
        sel = state.selected
        if not sel or sel.get("chosen_type") != "semantic" or not (sel.get("w1") and sel.get("w2")):
            return "No semantic pair is selected as it has classified as phonetic."
        sim = semantic_similarity(state.last_text, sel["w1"], sel["w2"])
        state.selected["similarity"] = float(sim)
        return f"Chosen semantic pair '{sel['w1']}'–'{sel['w2']}' has similarity {sim:.2f} (distance {1-sim:.2f})."

    if "phonetic similarity of the chosen pair" in ql or "sound similarity of the chosen pair" in ql:
        sel = state.selected
        if not sel or sel.get("chosen_type") != "phonetic" or not (sel.get("w1") and sel.get("w2")):
            return "No phonetic pair is selected as the jokes classified as phonetic."
        sim = phonetic_similarity(sel["w1"], sel["w2"])
        state.selected["similarity"] = float(sim)
        return f"Chosen phonetic pair '{sel['w1']}'–'{sel['w2']}' has phonetic similarity {sim:.2f}."

    # “what kind of wordplay / pun type / type of pun” — use selected pair (no scores)
    if ("what kind of wordplay" in ql) or re.search(r"\bpun\s*type\b", ql) or re.search(r"\btype\s+of\s+pun\b", ql):
        sel = state.selected
        if not state.last_text or not sel or sel.get("chosen_type") == "none":
            return "Provide a sentence first so I can select a pair (e.g., Is “Water you doing for lunch” a pun? age=10)."
        pt = sel.get("chosen_type", "none")
        if pt in {"phonetic","semantic"}:
            reason = sel.get("why","The selected pair best matches the detected wordplay channel.")
            return f"{pt} — {reason}"
        return "none — no convincing pair was selected for wordplay."

    # “explain the joke/pun” — use selected pair (no scores)
    if "explain" in ql and ("joke" in ql or "pun" in ql):
        sel = state.selected
        if not state.last_text or not sel or sel.get("chosen_type") == "none":
            return "Provide a sentence first so I can select a pair."
        return qa_on_selected(q, state)

    # “Is … a pun?” or sentence present → analyze+select (no scores in summary)
    if re.search(r"\bis\s+.+\bpun\??", ql) or (text and len(text.split()) >= 2):
        return _analyze_and_select(text, age) if text else "Please include a sentence."

    # Generic uncommon questions → LLM QA using selected pair (no scores)
    if state.last_text and state.selected and state.selected.get("chosen_type") != "none":
        return qa_on_selected(q, state)

    return "Tell me what to do (e.g., Is “Water you doing for lunch” a pun? age=10)."

# ── One-box loop with memory + wrapped output ─────────────────
print("Conversational Agent (memory on) — type 'quit' to exit.")
#print("Examples:")
#print('  • Is "Water you doing for lunch" a pun? age=10')
#print('  • phonetic similarity between x and y')
#print('  • semantic distance between the chosen pair')
#print('  • semantic similarity of the chosen pair')
#print('  • phonetic similarity of the chosen pair')
#print('  • what kind of wordplay is involved?')
#print('  • explain the joke')


while True:
    user = input("\nAsk: ").strip()
    if user.lower() in {"quit", "exit"}:
        print("Goodbye!")
        break
    raw = dispatch(user, STATE)
    print(_wrap_by_words(f"Agent: {raw}", max_words=10))


Conversational Agent (memory on) — type 'quit' to exit.
