In [10]:
!PIP install captum.attr 

ERROR: Could not find a version that satisfies the requirement captum.attr (from versions: none)
ERROR: No matching distribution found for captum.attr


In [None]:
# 使用 BERT 預測帳號是否為潛在詐騙者：文字語意判斷（深度學習）
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from collections import defaultdict

# 載入資料
mobile_df = pd.read_csv("mobile01_處理後.csv")
ptt_df = pd.read_csv("ptt_語料_處理後.csv")
finfo_df = pd.read_csv("finfo_posts_產險_壽險_投資型.csv")

# 平台標記
mobile_df['平台'] = 'Mobile01'
ptt_df['平台'] = 'PTT'
finfo_df['平台'] = 'Finfo'

# 帳號欄位統一化
for df in [mobile_df, ptt_df, finfo_df]:
    if '留言帳號' in df.columns and '發文者帳號' in df.columns:
        df['帳號'] = df['留言帳號'].fillna(df['發文者帳號'])
    elif '帳號' in df.columns:
        df['帳號'] = df['帳號']
    elif 'author' in df.columns:
        df['帳號'] = df['author']
    else:
        df['帳號'] = '未知帳號'

# 文字整合
for df in [mobile_df, ptt_df, finfo_df]:
    text_cols = []
    if '留言內容' in df.columns:
        text_cols.append(df['留言內容'].fillna(''))
    if '發文內容' in df.columns:
        text_cols.append(df['發文內容'].fillna(''))
    if not text_cols:
        df['text'] = ''
    else:
        df['text'] = text_cols[0]
        for col in text_cols[1:]:
            df['text'] += ' ' + col

# 詐騙關鍵詞欄位補上（如無則設 0）
for df in [mobile_df, ptt_df, finfo_df]:
    if '詐騙關鍵詞次數' not in df.columns:
        df['詐騙關鍵詞次數'] = 0

# 合併資料
combined_df = pd.concat([
    mobile_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']],
    ptt_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']],
    finfo_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']]
], ignore_index=True)

# 清理與標籤
combined_df = combined_df.dropna(subset=['帳號'])
combined_df = combined_df[combined_df['text'].str.strip() != '']
combined_df['label'] = (combined_df['詐騙關鍵詞次數'] > 0).astype(int)

# 分割資料
train_texts, test_texts, train_labels, test_labels, train_accounts, test_accounts = train_test_split(
    combined_df['text'].values,
    combined_df['label'].values,
    combined_df['帳號'].values,
    test_size=0.2,
    random_state=42
)

# Tokenizer 初始化
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')

# 自訂 Dataset
class ScamDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(list(texts), truncation=True, padding=True, max_length=128)
        self.labels = labels
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    def __len__(self):
        return len(self.labels)

train_dataset = ScamDataset(train_texts, train_labels)
test_dataset = ScamDataset(test_texts, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)

# 定義模型
class BertClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1)
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        dropped = self.dropout(pooled_output)
        return torch.sigmoid(self.classifier(dropped)).squeeze()

# 初始化模型與優化器
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = BertClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
loss_fn = nn.BCELoss()

# 訓練迴圈
for epoch in range(3):
    model.train()
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].float().to(device)

        outputs = model(input_ids, attention_mask)
        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1} 完成，Loss: {loss.item():.4f}")

# 預測與帳號風險統計
model.eval()
preds, trues, probs = [], [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids, attention_mask)
        probs.extend(outputs.cpu().numpy())
        pred_labels = (outputs > 0.5).int().cpu().numpy()
        preds.extend(pred_labels)
        trues.extend(labels.cpu().numpy())

print("\n分類報告：")
print(classification_report(trues, preds))

# 聚合帳號風險分數
account_risk = defaultdict(list)
for acct, prob in zip(test_accounts, probs):
    account_risk[acct].append(prob)

account_avg_risk = [(acct, np.mean(scores), len(scores)) for acct, scores in account_risk.items() if len(scores) >= 2]
account_risk_df = pd.DataFrame(account_avg_risk, columns=['帳號', '平均風險機率', '樣本數']).sort_values(by='平均風險機率', ascending=False)

print("\nTop 20 潛在高風險帳號：")
print(account_risk_df.head(20))

In [11]:
# 使用 DistilBERT 預測帳號是否為潛在詐騙者（適用全量資料）
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, accuracy_score, f1_score
from collections import defaultdict
import matplotlib.pyplot as plt
import gc
import joblib


# 檢查 GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("使用裝置：", device)

# 載入資料
mobile_df = pd.read_csv("mobile01_處理後.csv")
ptt_df = pd.read_csv("ptt_語料_處理後.csv")
finfo_df = pd.read_csv("finfo_posts_產險_壽險_投資型.csv")

# 平台標記
mobile_df['平台'] = 'Mobile01'
ptt_df['平台'] = 'PTT'
finfo_df['平台'] = 'Finfo'

# 帳號欄位統一化
for df in [mobile_df, ptt_df, finfo_df]:
    if '留言帳號' in df.columns and '發文者帳號' in df.columns:
        df['帳號'] = df['留言帳號'].fillna(df['發文者帳號'])
    elif '帳號' in df.columns:
        df['帳號'] = df['帳號']
    elif 'author' in df.columns:
        df['帳號'] = df['author']
    else:
        df['帳號'] = '未知帳號'

    df['留言內容'] = df['留言內容'] if '留言內容' in df.columns else ''
    df['發文內容'] = df['發文內容'] if '發文內容' in df.columns else ''
    df['text'] = df['留言內容'].fillna('') + ' ' + df['發文內容'].fillna('')

    if '詐騙關鍵詞次數' not in df.columns:
        df['詐騙關鍵詞次數'] = 0

# 合併資料
combined_df = pd.concat([
    mobile_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']],
    ptt_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']],
    finfo_df[['平台', '帳號', 'text', '詐騙關鍵詞次數']]
], ignore_index=True)

combined_df = combined_df.dropna(subset=['帳號'])
combined_df = combined_df[combined_df['text'].str.strip() != '']
combined_df['label'] = (combined_df['詐騙關鍵詞次數'] > 0).astype(int)

# 分割資料
train_texts, test_texts, train_labels, test_labels, train_accounts, test_accounts = train_test_split(
    combined_df['text'].values,
    combined_df['label'].values,
    combined_df['帳號'].values,
    test_size=0.2,
    random_state=42
)
test_text_raw = test_texts

# Tokenizer
model_name = 'ckiplab/bert-base-chinese'
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Dataset 類別
class ScamDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(list(texts), truncation=True, padding=True, max_length=64)
        self.labels = labels
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    def __len__(self):
        return len(self.labels)

train_dataset = ScamDataset(train_texts, train_labels)
test_dataset = ScamDataset(test_texts, test_labels)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4)

# 模型定義
class DistilBertClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1)
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0]
        dropped = self.dropout(pooled_output)
        return torch.sigmoid(self.classifier(dropped)).squeeze()

model = DistilBertClassifier().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
loss_fn = nn.BCELoss()

# 儲存指標
history = {'loss': [], 'accuracy': [], 'f1': []}

# 訓練
for epoch in range(3):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].float().to(device)

        outputs = model(input_ids, attention_mask)
        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    gc.collect()
    torch.cuda.empty_cache()
    print(f"Epoch {epoch+1} / 3，Loss: {total_loss / len(train_loader):.4f}")

    # 預測（每 epoch）
    model.eval()
    preds, trues, probs = [], [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            probs.extend(outputs.cpu().numpy())
            pred_labels = (outputs > 0.5).int().cpu().numpy()
            preds.extend(pred_labels)
            trues.extend(labels.cpu().numpy())

    acc = accuracy_score(trues, preds)
    f1 = f1_score(trues, preds)
    history['loss'].append(total_loss / len(train_loader))
    history['accuracy'].append(acc)
    history['f1'].append(f1)

    print(f"Accuracy: {acc:.4f} | F1-score: {f1:.4f}")

    # 選項 A：帳號風險統計 + 儲存
    account_risk = defaultdict(list)
    for acct, prob in zip(test_accounts, probs):
        account_risk[acct].append(prob)

    account_avg_risk = [(acct, np.mean(scores), len(scores)) for acct, scores in account_risk.items() if len(scores) >= 2]
    account_risk_df = pd.DataFrame(account_avg_risk, columns=['帳號', '平均風險機率', '樣本數']).sort_values(by='平均風險機率', ascending=False)
    account_risk_df.to_csv(f"account_risk_epoch{epoch+1}.csv", index=False, encoding='utf-8-sig')

    # 選項 B：逐筆預測結果儲存
    detailed_df = pd.DataFrame({
        '帳號': test_accounts,
        '原始文字': test_text_raw,
        '真實標籤': trues,
        '預測機率': probs,
        '預測結果': preds
    })
    detailed_df.to_csv(f"predictions_epoch{epoch+1}.csv", index=False, encoding='utf-8-sig')

# 儲存最終模型與 tokenizer
joblib.dump(model.state_dict(), "distilbert_scam_model.pt")
tokenizer.save_pretrained("distilbert_tokenizer")

# 畫 loss 與指標圖
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.plot(history['loss'], marker='o')
plt.title("Loss")
plt.xlabel("Epoch")

plt.subplot(1, 3, 2)
plt.plot(history['accuracy'], marker='o')
plt.title("Accuracy")
plt.xlabel("Epoch")

plt.subplot(1, 3, 3)
plt.plot(history['f1'], marker='o')
plt.title("F1-score")
plt.xlabel("Epoch")

plt.tight_layout()
plt.savefig("training_metrics.png")
plt.show()

print("\nTop 20 潛在高風險帳號：")
print(account_risk_df.head(20))


使用裝置： cpu


Some weights of BertModel were not initialized from the model checkpoint at ckiplab/bert-base-chinese and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


KeyboardInterrupt: 

In [None]:
# 安裝必要套件（第一次執行需要）
!pip install transformers datasets scikit-learn --quiet

# 載入套件
import pandas as pd
from datasets import Dataset
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
import torch

# === 1. 準備你的資料 ===
# 假設你有一份 DataFrame 叫 df，有兩欄：'text' 和 'label'（0 = 正常, 1 = 詐騙）
# 你可以用自己的資料匯入，這裡做個模擬
df = pd.read_csv('data.csv')


# 分訓練/驗證集
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)

# 轉換成 HuggingFace Dataset 格式
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# === 2. 下載 tokenizer & 處理文字 ===
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

def tokenize_function(example):
    return tokenizer(example['text'], padding='max_length', truncation=True, max_length=128)

train_dataset = train_dataset.map(tokenize_function, batched=True)
val_dataset = val_dataset.map(tokenize_function, batched=True)

# === 3. 設定模型 ===
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)

# === 4. 訓練參數設定 ===
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=8,           # 適合 GTX1060
    per_device_eval_batch_size=16,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    logging_dir='./logs',
    load_best_model_at_end=True,
    metric_for_best_model='accuracy',
)

# === 5. 計算指標 ===
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc, 'precision': precision, 'recall': recall, 'f1': f1}

# === 6. 初始化 Trainer 並訓練 ===
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# === 7. 開始訓練 ===
trainer.train()

# === 8. 評估驗證集表現 ===
trainer.evaluate()


In [None]:
# ========================================
# ✅ 1. 安裝套件 + 掛載 Google Drive
# ========================================
!pip install transformers datasets scikit-learn seaborn --quiet

from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import os

# ========================================
# ✅ 2. 載入資料 + 清理欄位
# ========================================
df1 = pd.read_csv('/content/drive/MyDrive/finfo_posts_產險_壽險_投資型.csv')
df2 = pd.read_csv('/content/drive/MyDrive/mobile01_處理後.csv')
df3 = pd.read_csv('/content/drive/MyDrive/ptt_語料_處理後.csv')

for df in [df1, df2, df3]:
    df.columns = df.columns.str.strip()

def extract_text(row):
    text = ''
    if '發文內容' in row and pd.notna(row['發文內容']):
        text += str(row['發文內容']) + ' '
    if '留言內容' in row and pd.notna(row['留言內容']):
        text += str(row['留言內容'])
    elif 'content' in row and pd.notna(row['content']):
        text += str(row['content'])
    return text.strip()

def prepare_labeled_df(df):
    df['text'] = df.apply(extract_text, axis=1)
    df['label'] = df['詐騙關鍵詞次數'].apply(lambda x: 1 if x > 0 else 0)
    return df[['text', 'label']]

df_train = pd.concat([prepare_labeled_df(df2), prepare_labeled_df(df3)], ignore_index=True)
df_unlabeled = df1.copy()
df_unlabeled['text'] = df_unlabeled.apply(extract_text, axis=1)

# ========================================
# ✅ 3. Tokenizer + Dataset 切分
# ========================================
from datasets import Dataset
from transformers import BertTokenizerFast

tokenizer = BertTokenizerFast.from_pretrained("bert-base-chinese")

def tokenize_function(example):
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=128)

raw_dataset = Dataset.from_pandas(df_train)
tokenized_dataset = raw_dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.rename_column("label", "labels")
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

split_dataset = tokenized_dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"]
eval_dataset = split_dataset["test"]

# ========================================
# ✅ 4. 模型訓練 + EarlyStopping（乾淨寫法）
# ========================================
from transformers import BertForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np

model = BertForSequenceClassification.from_pretrained("bert-base-chinese", num_labels=2)

training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/bert_results",
    eval_strategy="epoch",  # ✅ 使用未來建議寫法
    save_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    weight_decay=0.01,
    load_best_model_at_end=True,
    logging_dir="/content/drive/MyDrive/bert_logs",
    logging_strategy="epoch",
    metric_for_best_model="eval_loss",
    save_total_limit=1,
    report_to="none"  # ✅ 關閉 wandb 提示
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="binary")
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

trainer.train()

# ========================================
# ✅ 5. 預測 df1 ➜ 輸出 prediction_result.csv
# ========================================
from transformers import TextClassificationPipeline
import torch

pipe = TextClassificationPipeline(
    model=model,
    tokenizer=tokenizer,
    return_all_scores=True,
    device=0 if torch.cuda.is_available() else -1
)

results = pipe(df_unlabeled["text"].tolist(), batch_size=16)
pred_labels = [int(np.argmax([p["score"] for p in res])) for res in results]
pred_probs = [round(res[1]["score"], 4) for res in results]

df_unlabeled["predicted_label"] = pred_labels
df_unlabeled["predicted_prob"] = pred_probs

output_path = "/content/drive/MyDrive/prediction_result.csv"
df_unlabeled[["text", "predicted_label", "predicted_prob"]].to_csv(output_path, index=False, encoding="utf-8-sig")

print(f"✅ 預測完成！結果已儲存：{output_path}")

# ========================================
# ✅ 6. 視覺化：分佈圖、機率直方圖
# ========================================
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(5, 4))
sns.countplot(x='predicted_label', data=df_unlabeled)
plt.title("📊 預測類別分佈（0=正常，1=詐騙）")
plt.xlabel("預測標籤")
plt.ylabel("樣本數")
plt.xticks([0, 1], ['正常 (0)', '詐騙 (1)'])
plt.show()

plt.figure(figsize=(6, 4))
sns.histplot(df_unlabeled['predicted_prob'], bins=20, kde=True, color='orange')
plt.title("📊 預測為詐騙的機率分佈")
plt.xlabel("詐騙機率")
plt.ylabel("樣本數")
plt.show()
