# 跨行聯合雷達：使用 GNN 與聯邦學習偵測人頭帳戶 (Colab 示範)

這個參考「[一年臺灣民眾被詐騙一千億，得覺醒](https://medium.com/@bohachu/%E4%B8%80%E5%B9%B4%E8%87%BA%E7%81%A3%E6%B0%91%E7%9C%BE%E8%A2%AB%E8%A9%90%E9%A8%99%E4%B8%80%E5%8D%83%E5%84%84-%E5%BE%97%E8%A6%BA%E9%86%92-11ba5b6ec18e)」文章的 Colab 檔案，旨在示範「跨行聯合雷達」的核心概念，說明如何應用**圖神經網路 (Graph Neural Network, GNN)** 與**聯邦學習 (Federated Learning, FL)**，在保護各銀行客戶隱私的前提下，共同偵測並打擊洗錢詐騙所需的人頭帳戶網路。

---

### 核心流程：
1.  **資料模擬與格式統一**: 我們將模擬多家銀行間的交易流水，其中包含正常用戶交易以及一個隱藏的詐騙洗錢網路。所有帳戶 ID 都會被雜湊 (Hash) 處理，以符合隱私保護原則。
2.  **圖譜構建與特徵工程**: 將所有交易紀錄轉換成一個巨大的圖 (Graph)，其中節點 (Node) 代表銀行帳戶，邊 (Edge) 代表交易。
3.  **圖神經網路 (GNN) 模型**: 建立一個基於 `GraphSAGE` 的 GNN 模型，它能學習圖中節點的結構與特徵，並預測每個帳戶為人頭帳戶的機率。
4.  **聯邦學習模擬**: 模擬兩家銀行 (`Bank_A`, `Bank_B`) 在各自的資料上進行本地端訓練，然後將模型更新（梯度或權重）進行「安全聚合 (Secure Aggregation)」，以更新一個全域模型 (Global Model)。這個過程確保了原始交易資料不會離開銀行。
5.  **偵測與阻斷**: 使用訓練完成的全域模型來預測所有帳戶的風險分數。當分數超過閾值 (例如 `0.8`) 時，觸發阻斷邏輯，如加入黑名單、即時凍結等。

---

## 步驟 1: 環境設定與安裝所需套件

首先，我們需要安裝 PyTorch Geometric 相關函式庫，它是建立 GNN 模型的基石。

In [None]:
# 安裝 PyTorch 與 PyTorch Geometric
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install torch-scatter torch-sparse torch-cluster torch-spline-conv torch-geometric -f https://data.pyg.org/whl/torch-2.1.0+cu118.html
!pip install pandas faker networkx matplotlib

## 步驟 2: 資料模擬與格式統一

我們將創建一個虛構的交易網路。這個網路包含：
- **正常帳戶**: 隨機進行交易。
- **人頭帳戶 (錢袋子)**: 形成一個「星狀」或「樹狀」結構。資金從外圍帳戶匯集到少數幾個核心帳戶，這是典型的洗錢模式。

所有帳戶號碼都會被 `SHA-256` 雜湊，模擬真實場景中的隱私保護措施。

In [None]:
import pandas as pd
import numpy as np
from faker import Faker
import hashlib
import random

# 初始化 Faker
fake = Faker()

# --- 參數設定 ---
NUM_NORMAL_ACCOUNTS = 1000
NUM_MULE_ACCOUNTS = 50  # 人頭帳戶
NUM_TRANSACTIONS = 5000

# --- 生成帳戶 ---
normal_accounts = [f'ACC_{i:05d}' for i in range(NUM_NORMAL_ACCOUNTS)]
mule_accounts = [f'MULE_{i:03d}' for i in range(NUM_MULE_ACCOUNTS)]
all_accounts = normal_accounts + mule_accounts
random.shuffle(all_accounts)

# --- 建立帳戶與銀行的對應關係 (模擬跨行) ---
account_to_bank = {
    acc: f"Bank_{'A' if random.random() > 0.5 else 'B'}"
    for acc in all_accounts
}

# --- 生成交易 ---
transactions = []

# 1. 生成正常交易
for _ in range(int(NUM_TRANSACTIONS * 0.8)):
    from_acc, to_acc = random.sample(normal_accounts, 2)
    transactions.append({
        'from_acct': from_acc,
        'to_acct': to_acc,
        'amount': round(random.uniform(100, 50000), 2),
        'is_fraud': 0
    })

# 2. 生成詐騙洗錢交易 (星狀結構)
money_collectors = random.sample(mule_accounts, 5) # 5個核心收款帳戶
spreader_mules = [acc for acc in mule_accounts if acc not in money_collectors]

for _ in range(int(NUM_TRANSACTIONS * 0.2)):
    # 模擬資金從外圍人頭帳戶流向核心帳戶
    from_acc = random.choice(spreader_mules)
    to_acc = random.choice(money_collectors)
    transactions.append({
        'from_acct': from_acc,
        'to_acct': to_acc,
        'amount': round(random.uniform(10000, 100000), 2),
        'is_fraud': 1
    })

# --- 轉換為 DataFrame ---
df = pd.DataFrame(transactions)

# --- 進行 Hash 保護隱私 ---
def hash_account(acct):
    return hashlib.sha256(acct.encode()).hexdigest()

df['from_acct_hash'] = df['from_acct'].apply(hash_account)
df['to_acct_hash'] = df['to_acct'].apply(hash_account)

print("模擬交易資料預覽：")
print(df[['from_acct_hash', 'to_acct_hash', 'amount', 'is_fraud']].head())

## 步驟 3: 圖譜構建與特徵工程

現在，我們將交易資料轉換為 PyTorch Geometric 的 `Data` 物件。我們還會為每個帳戶（節點）計算一些基本的統計特徵，例如：
- **度 (Degree)**: 帳戶的交易對象數量。
- **總交易金額/次數**

In [None]:
import torch
from torch_geometric.data import Data
import networkx as nx
import matplotlib.pyplot as plt

# 建立帳戶 Hash 到索引的映射
all_hashes = pd.concat([df['from_acct_hash'], df['to_acct_hash']]).unique()
hash_to_idx = {h: i for i, h in enumerate(all_hashes)}
idx_to_hash = {i: h for i, h in enumerate(all_hashes)}
num_nodes = len(all_hashes)

# 建立邊 (交易)
source_nodes = [hash_to_idx[h] for h in df['from_acct_hash']]
target_nodes = [hash_to_idx[h] for h in df['to_acct_hash']]
edge_index = torch.tensor([source_nodes, target_nodes], dtype=torch.long)

# --- 特徵工程 ---
features = np.zeros((num_nodes, 3)) # 3個特徵: out_degree, in_degree, total_amount
labels = np.zeros(num_nodes)

amount_map = {}
for _, row in df.iterrows():
    from_idx = hash_to_idx[row['from_acct_hash']]
    to_idx = hash_to_idx[row['to_acct_hash']]
    amount = row['amount']

    features[from_idx, 0] += 1 # out_degree
    features[to_idx, 1] += 1 # in_degree
    amount_map.setdefault(from_idx, 0)
    amount_map.setdefault(to_idx, 0)
    amount_map[from_idx] += amount
    amount_map[to_idx] += amount

for i in range(num_nodes):
    features[i, 2] = amount_map.get(i, 0)

# --- 建立標籤 (Label) ---
mule_hashes = {hash_account(acc) for acc in mule_accounts}
for h, i in hash_to_idx.items():
    if h in mule_hashes:
        labels[i] = 1 # 1 代表人頭帳戶

# --- 標準化特徵 ---
mean = features.mean(axis=0)
std = features.std(axis=0)
features = (features - mean) / (std + 1e-6)

# --- 建立 PyG Data 物件 ---
graph_data = Data(
    x=torch.tensor(features, dtype=torch.float),
    edge_index=edge_index,
    y=torch.tensor(labels, dtype=torch.long)
)

print("PyG 圖資料物件:")
print(graph_data)

# --- 可視化詐騙網路 (抽樣) ---
def visualize_sample_graph(df, mule_accounts):
    G = nx.DiGraph()
    sample_df = df.sample(n=200, random_state=42)
    for _, row in sample_df.iterrows():
        G.add_edge(row['from_acct'], row['to_acct'])

    node_colors = []
    for node in G.nodes():
        if node in mule_accounts:
            if node in money_collectors:
                node_colors.append('red') # 核心
            else:
                node_colors.append('orange') # 外圍
        else:
            node_colors.append('skyblue') # 正常

    plt.figure(figsize=(15, 15))
    pos = nx.spring_layout(G, k=0.5)
    nx.draw(G, pos, with_labels=False, node_color=node_colors, node_size=50, arrowsize=10, width=0.5)
    plt.title("交易網路抽樣可視化 (紅色/橘色為人頭帳戶)")
    plt.show()

visualize_sample_graph(df, mule_accounts)

## 步驟 4: GNN 模型與聯邦學習模擬

### 4.1 定義 GNN 模型
我們使用兩層 `GraphSAGE` 卷積層，最後接一個線性層輸出分類結果 (正常帳戶 vs 人頭帳戶)。

In [None]:
import torch.nn.functional as F
from torch_geometric.nn import SAGEConv

class GNN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()
        self.conv1 = SAGEConv(in_channels, hidden_channels)
        self.conv2 = SAGEConv(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# 模型參數
model = GNN(
    in_channels=graph_data.num_node_features,
    hidden_channels=64,
    out_channels=2 # 二分類
)
print(model)

### 4.2 模擬聯邦學習
這是整個流程最關鍵的部分。我們將資料按銀行拆分，模擬真實世界中資料不出銀行的情況。

**模擬流程**:
1.  **初始化**：創建一個全域模型 (Global Model)。
2.  **分發**：在每一輪訓練開始時，各銀行（參與方）下載全域模型的最新權重。
3.  **本地訓練**：每家銀行使用自己的本地資料進行一輪或多輪模型訓練。
4.  **安全聚合**：各銀行上傳其模型更新（例如，權重）。一個中心的協調器 (Coordinator) 將所有更新**平均化**，以此來更新全域模型。**這一步是聯邦學習的核心，它聚合了集體的智慧，但沒有暴露任何一方的原始資料。**
5.  **重複**：重複步驟 2-4，直到模型收斂。

In [None]:
import copy
from torch.optim import Adam

# --- 資料拆分給不同銀行 ---
# The original code attempted to split hashed account names, which caused an IndexError.
# We need to use the original account names and the account_to_bank mapping
# to correctly assign node indices to each bank.

bank_A_nodes = set()
bank_B_nodes = set()

for original_account, bank in account_to_bank.items():
    account_hash = hash_account(original_account)
    if account_hash in hash_to_idx:
        node_idx = hash_to_idx[account_hash]
        if bank == 'Bank_A':
            bank_A_nodes.add(node_idx)
        else:
            bank_B_nodes.add(node_idx)

# Ensure all nodes are assigned to a bank for subgraph creation
all_node_indices = set(range(num_nodes))
unassigned_nodes = all_node_indices - (bank_A_nodes | bank_B_nodes)
# Assign unassigned nodes (if any, though they should be covered by account_to_bank)
# to one of the banks, or handle them as needed. For simplicity, we'll assign to Bank_A.
for node_idx in unassigned_nodes:
    bank_A_nodes.add(node_idx)


# Create masks for edges based on whether either endpoint belongs to the bank
edge_mask_A = [(source in bank_A_nodes or target in bank_A_nodes) for source, target in edge_index.T.tolist()]
edge_mask_B = [(source in bank_B_nodes or target in bank_B_nodes) for source, target in edge_index.T.tolist()]

# Create subgraphs for each bank
# Note: edge_subgraph keeps the original node indices but filters edges and features.
# This is suitable for the federated learning setup where nodes are globally identified.
data_bank_A = graph_data.edge_subgraph(torch.tensor(edge_mask_A))
data_bank_B = graph_data.edge_subgraph(torch.tensor(edge_mask_B))


print(f"銀行 A 的資料: {data_bank_A}")
print(f"銀行 B 的資料: {data_bank_B}")


# --- 聯邦學習參數 ---
NUM_ROUNDS = 20
LOCAL_EPOCHS = 5
LEARNING_RATE = 0.01

# 1. 初始化全域模型
global_model = GNN(graph_data.num_node_features, 64, 2)
optimizer = Adam(global_model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.CrossEntropyLoss()

def train_local(model, data, epochs):
    model.train()
    optimizer = Adam(model.parameters(), lr=LEARNING_RATE) # Local optimizer for each client
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data.x, data.edge_index)
        # Filter labels to only include labels for nodes present in the subgraph
        # This requires aligning subgraph node indices with global labels.
        # A simpler approach for this simulation is to train on the whole graph but
        # only consider the loss on nodes belonging to the bank.
        # However, edge_subgraph changes node indices. Let's revert to a simpler split
        # that doesn't use edge_subgraph for training data, but rather masks the loss.

        # Reverting to a simpler data split for training that doesn't alter node indices
        # This is a common simplification in FL simulations.
        # We'll use the original graph_data but apply masks for training and evaluation.

        out = model(graph_data.x, graph_data.edge_index)

        # Create masks for the loss function based on bank ownership
        bank_A_mask = torch.tensor([(i in bank_A_nodes) for i in range(num_nodes)], dtype=torch.bool)
        bank_B_mask = torch.tensor([(i in bank_B_nodes) for i in range(num_nodes)], dtype=torch.bool)

        if data == data_bank_A: # This check is problematic after removing edge_subgraph for training input
             # Instead, pass the mask to the train_local function
             pass # This logic will be handled by passing masks

    # New simplified train_local function signature and logic
    # This function will be defined outside the loop and take data and mask
    pass # Function definition moved below


# --- Start Federated Learning Loop ---
print("\n開始聯邦學習訓練...")

# Redefine train_local to take mask
def train_local(model, graph_data, node_mask, epochs, optimizer, criterion):
    model.train()
    total_loss = 0.0
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(graph_data.x, graph_data.edge_index)
        # Apply mask to calculate loss only on relevant nodes
        loss = criterion(out[node_mask], graph_data.y[node_mask])
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return model.state_dict(), total_loss / epochs


# Initialize optimizer and criterion for the global model
global_optimizer = Adam(global_model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.CrossEntropyLoss()


for round_num in range(NUM_ROUNDS):
    # 2. Distribute model
    local_model_A = copy.deepcopy(global_model)
    local_model_B = copy.deepcopy(global_model)

    # Create masks for loss calculation
    bank_A_mask = torch.tensor([(i in bank_A_nodes) for i in range(num_nodes)], dtype=torch.bool)
    bank_B_mask = torch.tensor([(i in bank_B_nodes) for i in range(num_nodes)], dtype=torch.bool)


    # 3. Local Training
    # Use separate optimizers for local training steps
    optimizer_A = Adam(local_model_A.parameters(), lr=LEARNING_RATE)
    optimizer_B = Adam(local_model_B.parameters(), lr=LEARNING_RATE)


    weights_A, loss_A = train_local(local_model_A, graph_data, bank_A_mask, LOCAL_EPOCHS, optimizer_A, criterion)
    weights_B, loss_B = train_local(local_model_B, graph_data, bank_B_mask, LOCAL_EPOCHS, optimizer_B, criterion)


    # 4. Secure Aggregation (Simulated with simple averaging)
    global_weights = global_model.state_dict()
    for key in global_weights.keys():
        # Average weights
        global_weights[key] = (weights_A[key] + weights_B[key]) / 2.0

    global_model.load_state_dict(global_weights)

    if (round_num + 1) % 5 == 0:
        print(f'聯邦學習回合 {round_num + 1}/{NUM_ROUNDS} 完成, 銀行A本地損失: {loss_A:.4f}, 銀行B本地損失: {loss_B:.4f}')

print("\n聯邦學習訓練完成！")

## 步驟 5: 偵測與阻斷

訓練完成後，我們用最終的全域模型對**所有**帳戶進行風險評估。如果預測為「人頭帳戶」的機率超過 `0.8`，我們就將其標記出來，並觸發後續的阻斷邏輯。

In [None]:
from sklearn.metrics import classification_report

# 使用最終模型進行預測
global_model.eval()
with torch.no_grad():
    predictions = global_model(graph_data.x, graph_data.edge_index)
    # 計算機率
    probabilities = torch.exp(predictions)[:, 1] # 取出為詐騙帳戶的機率
    predicted_classes = predictions.argmax(dim=1)

# --- 風險評估與阻斷邏輯 ---
RISK_THRESHOLD = 0.8
blacklist = []

for i, prob in enumerate(probabilities):
    if prob > RISK_THRESHOLD:
        account_hash = idx_to_hash[i]
        blacklist.append(account_hash)
        # 在真實世界中，這裡會觸發 API
        # API.add_to_blacklist(account_hash)
        # API.freeze_account(account_hash)
        # API.notify_165(account_hash)

print(f"偵測到 {len(blacklist)} 個高風險帳戶 (風險 > {RISK_THRESHOLD})!")
print("黑名單 (Hash):", blacklist[:10]) # 只顯示前10個

# --- 評估模型成效 ---
print("\n--- 模型成效評估 ---")
report = classification_report(graph_data.y.numpy(), predicted_classes.numpy(), target_names=['正常帳戶', '人頭帳戶'])
print(report)

# --- 驗證黑名單的準確性 ---
correctly_identified = 0
for h in blacklist:
    if h in mule_hashes:
        correctly_identified += 1

accuracy = correctly_identified / len(blacklist) if len(blacklist) > 0 else 0
print(f"\n在黑名單中，正確抓到人頭帳戶的比例: {accuracy:.2%}")
print(f"總共 {len(mule_hashes)} 個人頭帳戶，模型偵測到 {len(blacklist)} 個高風險帳戶，其中 {correctly_identified} 個是正確的。")

## 結論

這個 Colab 檔案透過一個簡化的模擬，完整演示了「跨行聯合雷達」的技術核心：

1.  **隱私保護**: 帳戶資訊被雜湊處理，原始交易資料保留在各銀行內部，符合法規要求。
2.  **集體智慧**: 透過聯邦學習，各銀行共享模型洞見而非資料，共同建立了一個能看見全局的「雷達」。單一銀行無法看到的跨行洗錢網路，在全局模型下無所遁形。
3.  **精準打擊**: GNN 模型能有效學習人頭帳戶在交易網路中的結構性特徵，實現了高準確度的自動化偵測。

這套機制不僅能有效抓出約八成的收水手帳戶、斷掉詐騙集團的錢袋子，更是金融科技 (FinTech) 與監理科技 (RegTech) 結合，共同對抗金融犯罪的典範。