In [10]:
# --- Promotion vs Rest：品牌詞過濾 + 稀疏矩陣索引修正 + 輸出規則 --- #
import re, numpy as np, pandas as pd, yaml
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS

PATH = "../data/raw/_SELECT_A_object_id_A_complex_id_A_vote_reason_id_B_reason_A_dat_202110291714.csv"
TEXT_COL = "review_text"   # 若不同請改

df = pd.read_csv(PATH)
df[TEXT_COL] = df[TEXT_COL].fillna("").astype(str)

# ---------- 標籤：8 vs 非8 ----------
df = df[df["vote_reason_id"].notna()].copy()
df["is_promo"] = (df["vote_reason_id"] == 8).astype(int)
print("樣本數（0=非推銷, 1=推銷）:", Counter(df["is_promo"]))

# ---------- 停用詞/黑名單 ----------
domain_stop = {
    "apartment","apartments","complex","property","community","management","staff",
    "people","place","area","experience","review","reviews","resident","residents",
    "living","leasing","office","unit","building","maintenance","manager","team"
}
filler_stop = {
    "comment","comments","said","told","like","love","great","nice","friendly","amazing",
    "good","bad","awesome","awful","helpful","quiet","loud","money","quot","amp"
}
# 地名/設施/品牌樣式（避免被誤認為推銷話術）
facility_stop = {
    # 常見設施/地標
    "dog","park","dog park","grocery","store","grocery store","pool","gym","parking","rules",
    "shopping","center","shopping center","school","bus","stop","train","station","downtown","mall",
    # 社區名稱常見後綴
    "square","landing","club","farm","glen","heights","village","meadows","court","estates","homes",
    "apartments","apartments","at","on"
}
# 已觀察到的品牌/專有名詞（依你的輸出補幾個）
proper_noun_like = {
    "lawrence","weaver","timber","grady","polo"
}
num_noise = {str(i) for i in range(0,1000)}

custom_stops = list(set(ENGLISH_STOP_WORDS) | domain_stop | filler_stop | facility_stop | proper_noun_like | num_noise)

def clean(s: str) -> str:
    s = s.lower()
    # 移除會被 regex 直接偵測的元素，避免干擾片語抽取
    s = re.sub(r"http\S+|www\.\S+"," ", s)
    s = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", " ", s)
    s = re.sub(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", " ", s)
    s = re.sub(r"[^a-z\s']", " ", s)
    return re.sub(r"\s+", " ", s).strip()

texts = df[TEXT_COL].map(clean)

# ---------- 只抓話術：bigram/trigram ----------
vec = CountVectorizer(
    stop_words=custom_stops,
    lowercase=True,
    ngram_range=(2,3),      # 抓 "buy now", "visit website"
    min_df=5,               # 視資料量可調：特徵太少→降到3；太雜→拉到8~10
    max_df=0.6,
    token_pattern=r"(?u)\b[a-z][a-z']+\b"
)
X = vec.fit_transform(texts)
terms = np.array(vec.get_feature_names_out())

# ---------- Log-odds with prior（8 vs 非8）----------
y_p = df["is_promo"].values.astype(int)
mask_p = (y_p == 1)
mask_r = (y_p == 0)

promo_ct = (X[mask_p].sum(axis=0).A1 + 0.5)  # +0.5 平滑
rest_ct  = (X[mask_r].sum(axis=0).A1 + 0.5)
promo_total = promo_ct.sum()
rest_total  = rest_ct.sum()

logit_promo = np.log(promo_ct / (promo_total - promo_ct))
logit_rest  = np.log(rest_ct  / (rest_total  - rest_ct))
delta = logit_promo - logit_rest   # 越大越像促銷

order = np.argsort(delta)[::-1]

# 統一的過濾器：去數字/過短/含停用/含設施或品牌樣式
bad_tokens = set().union(*(map(set, [w.split() for w in (facility_stop | proper_noun_like)])))
def keep(term):
    if any(ch.isdigit() for ch in term): return False
    toks = term.split()
    if len(toks) < 2: return False
    if any(t in bad_tokens for t in toks): return False
    if any(t in ENGLISH_STOP_WORDS for t in toks): return False
    return True

promo_terms = [t for t in terms[order] if keep(t)][:60]
print("Promotion（前 20 更聚焦）=>", promo_terms[:20])

# ---------- Off-topic：2 vs 非2 ----------
df["is_off"] = (df["vote_reason_id"] == 2).astype(int)
y_o = df["is_off"].values.astype(int)
mask_o = (y_o == 1)
mask_not_o = (y_o == 0)

off_ct     = (X[mask_o].sum(axis=0).A1 + 0.5)
notoff_ct  = (X[mask_not_o].sum(axis=0).A1 + 0.5)
off_total  = off_ct.sum()
notoff_total = notoff_ct.sum()

logit_off    = np.log(off_ct    / (off_total    - off_ct))
logit_notoff = np.log(notoff_ct / (notoff_total - notoff_ct))
delta_off = logit_off - logit_notoff
order_off = np.argsort(delta_off)[::-1]
off_terms = [t for t in terms[order_off] if keep(t)][:60]
print("Off-topic（前 20 更聚焦）=>", off_terms[:20])

# ---------- Promotion 強化 regex（硬規則） ----------
promotion_patterns = [
    r"http[s]?://", r"\.com\b", r"\.net\b", r"\.ru\b",
    r"\bbit\.ly\b", r"\btinyurl\b",
    r"\bpromo\b", r"\bdiscount\b", r"\bbuy now\b", r"\bfree trial\b",
    r"\bvisit (my|our) (site|website)\b", r"\bcontact (me|us)\b",
    r"\bcall (now|us)\b", r"\boffer ends\b",
    r"\bapply (now|today)\b", r"\blease (now|today)\b",
    r"\bmove[-\s]?in special\b", r"\bspecial offer\b",
    r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
    r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"
]

rules_yaml = {
    "rules": [
        {
            "id": "RULE_PROMOTION",
            "reason_id": 8,
            "description": "Promotional or business content",
            "keywords": promo_terms[:30],
            "pattern": "|".join(promotion_patterns),
            "weight": 0.7,
            "enabled": True
        },
        {
            "id": "RULE_OFF_TOPIC",
            "reason_id": 2,
            "description": "Irrelevant or unhelpful content",
            "keywords": off_terms[:30],
            "weight": 0.4,
            "enabled": True
        }
    ]
}

out_path = "rules_generated.yml"   # 輸出在 Notebook 同層
with open(out_path, "w") as f:
    yaml.safe_dump(rules_yaml, f, sort_keys=False, allow_unicode=True)

print(f"✅ 產出 {out_path}")


樣本數（0=非推銷, 1=推銷）: Counter({0: 31048, 1: 1135})
Promotion（前 20 更聚焦）=> ['comminity appliances especially', 'appliances especially washer', 'accomidating noisy kitchens', 'kitchens pretty comminity', 'accomidating noisy', 'okay accomidating', 'okay accomidating noisy', 'pretty comminity appliances', 'noisy kitchens', 'noisy kitchens pretty', 'comminity appliances', 'especially washer', 'especially washer dryers', 'appliances especially', 'pretty comminity', 'years issues addressed', 'live quite located', 'forget human things', 'promptly central', 'think restrictions greater']
Off-topic（前 20 更聚焦）=> ['did did did', 'orchard crossing', 'experienced columbia', 'activities communicate messages', 'swimming condo activities', 'communicate messages', 'swimming condo', 'condo activities communicate', 'locations facilities swimming', 'locations facilities', 'communicate messages hear', 'pache locations facilities', 'condo activities', 'activities communicate', 'messages hear noise', 'pache location

In [11]:
# --- Post-process terms: 清理 → 加白名單 → 輸出乾淨規則 --- #
import re, yaml

# 1) 黑名單：地名/品牌/設施/家電/噪音拼字（可依結果持續擴充）
brand_geo_suffix = {
    "square","landing","club","farm","glen","heights","village","meadows","court","estates","homes"
}
proper_nouns_seen = {"lawrence","weaver","timber","grady","polo"}  # 你剛剛輸出有看到的
facilities = {
    "dog","park","dog park","grocery","store","grocery store","pool","gym","parking","rules",
    "shopping","center","shopping center","school","bus","stop","train","station","downtown","mall"
}
appliance_kitchen = {
    "appliance","appliances","washer","washers","dryer","dryers","kitchen","kitchens","stove","fridge",
    "microwave","dishwasher","oven","sink","countertop","appliances especially","especially washer","noisy kitchens"
}
noise_tokens = {"apos","comminity","accomidating","okay accomidating"}  # 明顯錯字/殘字

black_tokens = brand_geo_suffix | proper_nouns_seen | facilities | appliance_kitchen | noise_tokens

def is_bad_term(term: str) -> bool:
    toks = term.split()
    # 短、含數字、含停用黑名詞就砍
    if len(toks) < 2:
        return True
    if any(ch.isdigit() for ch in term):
        return True
    if any(t in black_tokens for t in toks):
        return True
    # 如果最後一個 token 是品牌/地名後綴，也砍
    if toks[-1] in brand_geo_suffix:
        return True
    return False

# 2) 促銷白名單（真正像商業話術的片語）
promo_whitelist = [
    "highly recommend","special offer","move in special","move-in special","apply now","lease today",
    "tour today","schedule a tour","limited time","act fast","free application","application fee waived",
    "waived application fee","call now","contact us","buy now","free trial","refer a friend","hassle free",
    "added bonus","best community","stop by","book a tour"
]

# 3) Off-topic 白名單（偏題/不可驗證的常見片語；當輔助訊號）
off_whitelist = [
    "dogs barking","noise level","bed bugs","moving soon","stay away","doesnt work","does not work",
    "let me know","nothing to do","off topic","not related","waste of time"
]

# 4) 把你模型算到的清單清乾淨，再加白名單
def clean_list(cands, whitelist, topk=30):
    base = [t for t in cands if not is_bad_term(t)]
    # 置頂白名單，再接上清理後的自動詞
    merged = list(dict.fromkeys(whitelist + base))  # 去重保序
    return merged[:topk]

promo_clean = clean_list(promo_terms, promo_whitelist, topk=30)
off_clean   = clean_list(off_terms,   off_whitelist,   topk=30)

print("Promotion（清理後，前 20）=>", promo_clean[:20])
print("Off-topic（清理後，前 20）=>", off_clean[:20])

# 5) Promotion 的硬規則 regex（仍保留，拉高 precision）
promotion_patterns = [
    r"http[s]?://", r"\.com\b", r"\.net\b", r"\.ru\b",
    r"\bbit\.ly\b", r"\btinyurl\b",
    r"\bpromo\b", r"\bdiscount\b", r"\bbuy now\b", r"\bfree trial\b",
    r"\bvisit (my|our) (site|website)\b", r"\bcontact (me|us)\b",
    r"\bcall (now|us)\b", r"\boffer ends\b",
    r"\bapply (now|today)\b", r"\blease (now|today)\b",
    r"\bmove[-\s]?in special\b", r"\bspecial offer\b",
    r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
    r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"
]

rules_yaml = {
    "rules": [
        {
            "id": "RULE_PROMOTION",
            "reason_id": 8,
            "description": "Promotional or business content",
            "keywords": promo_clean,
            "pattern": "|".join(promotion_patterns),
            "weight": 0.7,
            "enabled": True
        },
        {
            "id": "RULE_OFF_TOPIC",
            "reason_id": 2,
            "description": "Irrelevant or unhelpful content",
            "keywords": off_clean,
            "weight": 0.35,  # ← Off-topic 比較抽象，先給低一點
            "enabled": True
        }
    ]
}

out_path = "rules_generated.yml"  # 留在 Notebook 同層
with open(out_path, "w") as f:
    yaml.safe_dump(rules_yaml, f, sort_keys=False, allow_unicode=True)

print(f"✅ 規則已輸出到 {out_path}")


Promotion（清理後，前 20）=> ['highly recommend', 'special offer', 'move in special', 'move-in special', 'apply now', 'lease today', 'tour today', 'schedule a tour', 'limited time', 'act fast', 'free application', 'application fee waived', 'waived application fee', 'call now', 'contact us', 'buy now', 'free trial', 'refer a friend', 'hassle free', 'added bonus']
Off-topic（清理後，前 20）=> ['dogs barking', 'noise level', 'bed bugs', 'moving soon', 'stay away', 'doesnt work', 'does not work', 'let me know', 'nothing to do', 'off topic', 'not related', 'waste of time', 'did did did', 'orchard crossing', 'experienced columbia', 'activities communicate messages', 'swimming condo activities', 'communicate messages', 'swimming condo', 'condo activities communicate']
✅ 規則已輸出到 rules_generated.yml


In [16]:
# ==== Seeded bootstrapping for 6/7/9 lexicons ====
import re, numpy as np, pandas as pd, yaml
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS

PATH = "../data/raw/_SELECT_A_object_id_A_complex_id_A_vote_reason_id_B_reason_A_dat_202110291714.csv"
TEXT_COL = "review_text"

df = pd.read_csv(PATH)
df[TEXT_COL] = df[TEXT_COL].fillna("").astype(str)

def clean_text(s):
    s = str(s).lower()
    s = re.sub(r"http\S+|www\.\S+"," ", s)
    s = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", " ", s)
    s = re.sub(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", " ", s)
    s = re.sub(r"[^a-z\s']", " ", s)
    return re.sub(r"\s+", " ", s).strip()

df["clean"] = df[TEXT_COL].map(clean_text)

# ---- 種子（可再擴）----
SEEDS = {
    6: {  # Toxic / Threats / Hate
        "kw": [
            "kill you","hurt you","harm you","go to hell","call you names","hate you",
            "racist","sexist","pervert","creep","harass","harassment","threat","violent","violence",
            "bitch","asshole","fuck","wtf","moron","idiot"
        ],
        "rgx": [
            r"\bfuck\b", r"\bbitch\b", r"\basshole\b",
            r"\bgo to hell\b", r"\bkill (you|him|her)\b", r"\bhate (you|them)\b",
            r"\b(threat|harass|harassment)\b"
        ]
    },
    7: {  # Privacy / Personal info
        "kw": [
            "private info","personal info","share address","shared phone number","social security",
            "ssn","license plate","full name","email address","home address","leaked info"
        ],
        "rgx": [
            r"\b\d{3}-\d{2}-\d{4}\b",        # SSN-like
            r"\b\d{3}-\d{3}-\d{4}\b",        # phone
            r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}",
            r"\bapt\s?#?\d+\b", r"\bunit\s?#?\d+\b", r"\broom\s?#?\d+\b"
        ]
    },
    9: {  # COVID / Pandemic
        "kw": [
            "covid","coronavirus","pandemic","mask mandate","quarantine","contact tracing",
            "vaccine","vaccination","unvaccinated","social distancing","lockdown"
        ],
        "rgx": [
            r"\bcovid\b", r"\bcovid-19\b", r"\bcoronavirus\b",
            r"\b(mask|masks|mask mandate)\b", r"\bvaccine\b", r"\bvaccin(e|ation|ated)\b",
            r"\bquarantine\b", r"\blockdown\b", r"\bsocial distancing\b"
        ]
    }
}

# ---- 黑名單（更強）----
brand_geo = {
    "square","landing","club","farm","glen","heights","village","meadows","court","estates","homes",
    "lawrence","weaver","timber","grady","polo","orchard","wilshire","promenade","columbia","medina","sykesville","curran", "mrs", "realestate", "cf", "wilshire", "promenade"
}
facilities = {
    "dog","park","dog park","grocery","store","grocery store","pool","gym","parking","rules",
    "school","bus","stop","train","station","downtown","mall","stairs","hallway","landscaping"
}
appliances = {"appliance","appliances","washer","washers","dryer","dryers","kitchen","kitchens","fridge","microwave","oven","sink"}
typo_noise = {"apos","comminity","accomidating","ack","lim","gaddy","half", "ladies"}
lease_noise = {"accepted", "approved", "terms", "deposit", "unavailable", "mention", "break", "plan"}
custom_stops = list(set(ENGLISH_STOP_WORDS) | brand_geo | facilities | appliances | typo_noise | lease_noise)

def make_seed_mask(texts, seeds):
    t = texts.str.lower()
    mask = pd.Series(False, index=texts.index)
    # keyword 直接包含
    for k in seeds["kw"]:
        mask = mask | t.str.contains(re.escape(k))
    # regex
    for pat in seeds["rgx"]:
        mask = mask | t.str.contains(pat, regex=True, na=False)
    return mask.values

# 統一的抽詞函式（先 seed → 再 log-odds）
def extract_with_seed(label_id, topk=50):
    seeds = SEEDS[label_id]
    seed_mask = make_seed_mask(df["clean"], seeds)
    pos_n = int(seed_mask.sum())
    if pos_n < 20:
        print(f"[warn] label {label_id}: seed 命中太少（{pos_n}），請擴充 SEEDS")
    neg_mask = ~seed_mask

    # 向量化（unigram + bigram）
    vec = CountVectorizer(
        stop_words=custom_stops,
        lowercase=True,
        ngram_range=(1,2),
        min_df=5,
        max_df=0.8,
        token_pattern=r"(?u)\b[a-z][a-z']+\b"
    )
    X = vec.fit_transform(df["clean"])
    terms = np.array(vec.get_feature_names_out())

    # log-odds（seed 命中的集合 vs 其他）
    ct_pos = X[seed_mask].sum(axis=0).A1 + 0.5
    ct_neg = X[neg_mask].sum(axis=0).A1 + 0.5
    tot_pos, tot_neg = ct_pos.sum(), ct_neg.sum()
    logit_pos = np.log(ct_pos / (tot_pos - ct_pos))
    logit_neg = np.log(ct_neg / (tot_neg - ct_neg))
    delta = logit_pos - logit_neg
    order = np.argsort(delta)[::-1]

    # 過濾：去數字、去專有名詞/設施/家電、不要極短/殘片
    bad_tokens = brand_geo | facilities | appliances | typo_noise
    def keep(term):
        if any(ch.isdigit() for ch in term): return False
        toks = term.split()
        if len(toks) == 1 and len(toks[0]) <= 2: return False
        if any(t in bad_tokens for t in toks): return False
        return True

    picked = [terms[i] for i in order if keep(terms[i])]
    # 把 seeds 的 kw 置頂，當白名單，然後接上擴充詞
    whitelist = [k for k in seeds["kw"] if " " in k or len(k) >= 4]  # 片語優先
    merged = list(dict.fromkeys(whitelist + picked))
    return merged[:topk]

lexicons = {
    "toxic": extract_with_seed(6, topk=60),
    "privacy": extract_with_seed(7, topk=60),
    "covid": extract_with_seed(9, topk=60)
}

for nm in lexicons:
    print(f"=== {nm} cleaned top 20 ===\n", lexicons[nm][:20], "\n")

# 建議 regex（強證據）
suggested_patterns = {
    "toxic": r"\b(fuck|bitch|asshole|go to hell|kill (you|him|her)|hate (you|them)|threat|harass|harassment)\b",
    "privacy": r"(\b\d{3}-\d{2}-\d{4}\b|\b\d{3}-\d{3}-\d{4}\b|[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}|\b(apt|unit|room)\s?#?\d+\b)",
    "covid": r"\b(covid|covid-19|coronavirus|mask|mask mandate|vaccine|vaccin(e|ation|ated)|quarantine|lockdown|social distancing)\b"
}

# 匯出
out = {"lexicons": {}}
for nm in ["toxic","privacy","covid"]:
    out["lexicons"][nm] = {
        "keywords": lexicons[nm][:30],
        "suggested_pattern": suggested_patterns[nm]
    }

with open("rules_lexicons.yml", "w") as f:
    yaml.safe_dump(out, f, sort_keys=False, allow_unicode=True)

print("✅ 產出 rules_lexicons.yml（seed→擴充後）")


  mask = mask | t.str.contains(pat, regex=True, na=False)


=== toxic cleaned top 20 ===
 ['kill you', 'hurt you', 'harm you', 'go to hell', 'call you names', 'hate you', 'racist', 'sexist', 'pervert', 'creep', 'harass', 'harassment', 'threat', 'violent', 'violence', 'bitch', 'asshole', 'fuck', 'moron', 'idiot'] 

=== privacy cleaned top 20 ===
 ['private info', 'personal info', 'share address', 'shared phone number', 'social security', 'license plate', 'full name', 'email address', 'home address', 'leaked info', 'personal information', 'handle personal', 'private information', 'soss', 'work event', 'carelessness', 'house drunkard', 'try importantly', 'files knows', 'sentimentally'] 

=== covid cleaned top 20 ===
 ['covid', 'coronavirus', 'pandemic', 'mask mandate', 'quarantine', 'contact tracing', 'vaccine', 'vaccination', 'unvaccinated', 'social distancing', 'lockdown', 'explain conditions', 'pandemic going', 'serve emails', 'said administrative', 'cause failed', 'quote went', 'originally confirmed', 'quote cause', 'went honestly'] 

✅ 產出 rul