# 「冷卻金流」概念驗證 Colab

這個參考「[一年臺灣民眾被詐騙一千億，得覺醒](https://medium.com/@bohachu/%E4%B8%80%E5%B9%B4%E8%87%BA%E7%81%A3%E6%B0%91%E7%9C%BE%E8%A2%AB%E8%A9%90%E9%A8%99%E4%B8%80%E5%8D%83%E5%84%84-%E5%BE%97%E8%A6%BA%E9%86%92-11ba5b6ec18e)」文章的 Colab 檔案，包含以下幾個部分：

*   **風險評分模型 (Risk Scoring Model)**：
一個簡單的機器學習模型，模擬根據交易特徵（如金額、收款帳戶年齡等）來評估風險分數。
*   **延遲佇列服務 (Delayed Queue Service)**：
使用 Redis 來模擬一個延遲訊息佇列，高風險的交易會先進入此佇列，等待一段時間。
*   **前端提示與取消 API (Frontend Notification & Cancellation API)**：
 一個簡單的 Flask API，用來模擬銀行 App 的後端，提供取消交易的端點。
*   **監理合規 (Regulatory Compliance)**：
說明此舉如何與《洗錢防制法》第 8–1 條的「暫緩交易」精神相符。

請注意，這份 Colab 檔案是一個概念性驗證 (Proof of Concept)，旨在演示核心邏輯，並未包含生產環境所需的完整錯誤處理、安全性設計與使用者介面。

In [None]:
# @title 步驟一：環境設定與套件安裝

# Install necessary packages

!pip install xgboost pandas numpy redis flask flask-ngrok

import xgboost as xgb
import pandas as pd
import numpy as np
import redis
import threading
import time
import datetime
from flask import Flask, request, jsonify
from flask_ngrok import run_with_ngrok
import uuid

print("套件安裝完成！")

In [None]:
# @title 步驟二：風險評分模型 (Risk Scoring Model)

# 2.1 模擬訓練資料
# 在真實世界中，這會是大量的歷史交易數據
data = {
    'amount': np.random.randint(100, 100000, 1000),
    'recipient_age_days': np.random.randint(1, 3650, 1000),
    'is_new_recipient': np.random.randint(0, 2, 1000),
    'contains_keywords': np.random.randint(0, 2, 1000),  # 模擬 "高收益"、"老師" 等關鍵字
    'is_fraud': np.random.randint(0, 2, 1000) # 0: 正常, 1: 詐騙
}
df = pd.DataFrame(data)

# 讓詐騙交易的特徵更明顯
df.loc[df['is_fraud'] == 1, 'amount'] = df.loc[df['is_fraud'] == 1, 'amount'] * 1.5
df.loc[df['is_fraud'] == 1, 'recipient_age_days'] = df.loc[df['is_fraud'] == 1, 'recipient_age_days'] / 10
df.loc[df['is_fraud'] == 1, 'is_new_recipient'] = 1
df.loc[df['is_fraud'] == 1, 'contains_keywords'] = 1


X = df.drop('is_fraud', axis=1)
y = df['is_fraud']

# 2.2 訓練 XGBoost 模型
print("正在訓練風險評分模型...")
model = xgb.XGBClassifier(objective="binary:logistic", eval_metric="logloss", use_label_encoder=False)
model.fit(X, y)
print("模型訓練完成！")


# 2.3 風險評分函式
def get_risk_score(transaction_features):
    """
    輸入一筆交易的特徵，回傳其風險分數。
    """
    df_features = pd.DataFrame([transaction_features])
    score = model.predict_proba(df_features)[:, 1][0]
    return score


# 2.4 模擬一筆新的交易並評分
new_transaction = {
    'amount': 80000,
    'recipient_age_days': 5, # 收款帳戶很新
    'is_new_recipient': 1,     # 是新的收款人
    'contains_keywords': 1     # 假設備註有觸發關鍵字
}

risk_score = get_risk_score(new_transaction)
print(f"\n模擬一筆新的可疑交易：{new_transaction}")
print(f"預測的風險分數為: {risk_score:.4f}")

if risk_score > 0.7:
    print("風險分數 > 0.7，此交易將進入延遲佇列。")
else:
    print("風險分數 <= 0.7，此交易將正常處理。")

In [None]:
# @title 步驟三：延遲佇列服務 (Delayed Queue Service)

# 3.1 設定 Redis 連線
# 注意：在 Colab 中，你需要另外啟動一個 Redis 實例。
# 這裡我們使用一個模擬的 Redis Client 來演示邏輯。
# 若要在本地端執行，請確保已安裝並啟動 Redis server。

try:
    # 填寫你的 Redis Server 資訊
    # r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    # r.ping()
    # print("Redis 連線成功！")
    # is_redis_connected = True

    # Colab 中無法直接連線本機 Redis，故使用模擬字典替代
    print("正在使用模擬的 Redis Client (字典)...")
    delayed_queue = {} # 使用字典模擬 Redis sorted set
    is_redis_connected = False

except redis.exceptions.ConnectionError as e:
    print(f"Redis 連線失敗: {e}")
    print("將使用一個模擬的字典來代替 Redis 進行演示。")
    delayed_queue = {} # 使用字典模擬 Redis sorted set
    is_redis_connected = False


# 3.2 延遲佇列相關函式
def add_to_delayed_queue(transaction_id, risk_score):
    """
    將高風險交易加入延遲佇列。
    """
    # 根據風險分數計算延遲時間
    # queue_time = min(24h, 6h × 風險分數 / 0.7)
    delay_hours = min(24, 6 * risk_score / 0.7)
    # Convert delay_hours to a standard Python float
    delay_hours_float = float(delay_hours)
    execute_time = datetime.datetime.now() + datetime.timedelta(hours=delay_hours_float)
    execute_timestamp = execute_time.timestamp()

    if is_redis_connected:
        r.zadd('delayed_queue', {transaction_id: execute_timestamp})
    else:
        # 模擬 Redis sorted set
        delayed_queue[transaction_id] = execute_timestamp

    print(f"交易 {transaction_id} 已加入延遲佇列，預計於 {delay_hours_float:.2f} 小時後 ({execute_time.strftime('%Y-%m-%d %H:%M:%S')}) 處理。")
    return execute_time

def process_delayed_queue():
    """
    模擬一個背景執行的 worker，持續處理到期的交易。
    """
    while True:
        now_timestamp = time.time()

        if is_redis_connected:
            # 從 Redis 取得所有到期的交易
            expired_transactions = r.zrangebyscore('delayed_queue', 0, now_timestamp)
            if expired_transactions:
                for trans_id in expired_transactions:
                    print(f"系統自動處理：交易 {trans_id} 已過冷卻期，執行撥款。")
                    r.zrem('delayed_queue', trans_id) # 從佇列中移除
        else:
            # 從模擬字典中取得所有到期的交易
            to_process = []
            for trans_id, execute_timestamp in delayed_queue.items():
                if execute_timestamp <= now_timestamp:
                    to_process.append(trans_id)

            if to_process:
                for trans_id in to_process:
                     print(f"系統自動處理：交易 {trans_id} 已過冷卻期，執行撥款。")
                     del delayed_queue[trans_id] # 從佇列中移除

        time.sleep(5) # 每 5 秒檢查一次


# 3.3 模擬將高風險交易加入佇列
transaction_id = f"TXN_{uuid.uuid4().hex[:8].upper()}"
if risk_score > 0.7:
    predicted_execute_time = add_to_delayed_queue(transaction_id, risk_score)
else:
    print(f"交易 {transaction_id} 風險低，直接處理。")

# 啟動背景 worker 來處理延遲佇列 (在一個新的執行緒中)
# 注意：在 Colab 中，這個執行緒會在 Cell 執行完畢後停止。
# 在真實應用中，這會是一個獨立運行的服務。
worker_thread = threading.Thread(target=process_delayed_queue, daemon=True)
worker_thread.start()
print("\n背景佇列處理 worker 已啟動...")

In [None]:
# @title 步驟四：前端提示與取消 API (Frontend Notification & Cancellation API)

# 4.1 使用 Flask 建立一個簡單的 API 服務
app = Flask(__name__)
run_with_ngrok(app)  # 讓 Colab 的 Flask App 可以從外部存取

@app.route("/")
def index():
    return "冷卻金流 - 交易取消 API 服務"

@app.route('/cancel_transaction', methods=['POST'])
def cancel_transaction():
    """
    提供給前端 App 呼叫的取消交易 API。
    """
    data = request.get_json()
    trans_id_to_cancel = data.get('transaction_id')

    if not trans_id_to_cancel:
        return jsonify({"status": "error", "message": "缺少 transaction_id"}), 400

    if is_redis_connected:
        # 嘗試從 Redis 的延遲佇列中移除
        removed_count = r.zrem('delayed_queue', trans_id_to_cancel)
    else:
        # 嘗試從模擬字典中移除
        if trans_id_to_cancel in delayed_queue:
            del delayed_queue[trans_id_to_cancel]
            removed_count = 1
        else:
            removed_count = 0

    if removed_count > 0:
        message = f"交易 {trans_id_to_cancel} 已成功取消！"
        print(message)
        return jsonify({"status": "success", "message": message})
    else:
        message = f"取消失敗：交易 {trans_id_to_cancel} 不在延遲佇列中，或可能已被處理。"
        print(message)
        return jsonify({"status": "error", "message": message}), 404

# 4.2 啟動 Flask App
# 啟動後會提供一個 ngrok 的公開網址，可以用來測試 API
print("\n正在啟動 Flask API 服務...")
# 我們會在一個獨立的執行緒中啟動 app，才不會卡住 Colab
flask_thread = threading.Thread(target=lambda: app.run(), daemon=True)
flask_thread.start()
time.sleep(3) # 等待 ngrok 啟動
print("Flask API 服務已啟動！請使用下方的 ngrok 網址進行測試。")


# 4.3 如何測試 API
print("\n--- API 測試指南 ---")
print(f"您可以使用 curl 或 Postman 來測試取消交易的功能。")
print(f"假設要取消的交易 ID 為: {transaction_id}")
print(f"請將下方指令中的 <ngrok_url> 替換為上面顯示的 ngrok 網址。")
print("\nCURL 指令範例:")
print(f"curl -X POST -H \"Content-Type: application/json\" \\")
print(f"     -d '{{\"transaction_id\": \"{transaction_id}\"}}' \\")
print(f"     <ngrok_url>/cancel_transaction")
print("---------------------\n")

# 讓使用者看到佇列在取消前的狀態
print(f"目前的延遲佇列: {delayed_queue}")
print(f"預計在 10 秒後，如果交易未被取消，背景 worker 將會處理它（如果已到期）。")

In [None]:
# @title 步驟五：監理合規 (Regulatory Compliance)

print("""
### 與《洗錢防制法》的關聯

根據提供的文件，此「冷卻金流」機制可以引用《洗錢防制法》第 8–1 條的「暫緩交易」條款作為法理依據。

**《洗錢防制法》第八條（摘錄精神）：**
金融機構對於疑似洗錢之交易，應向指定之機構申報。在某些情況下，執法機關可以要求金融機構「暫緩」該筆交易的執行。

**本機制的合規性考量：**
1.  **目的相符：** 「冷卻金流」的目的是為了防範金融犯罪（特別是詐騙），這與洗錢防制法的立法精神一致。
2.  **主動防禦：** 此機制是金融機構基於風險控管，主動採取的防範措施，可以視為履行其防制洗錢義務的一部分。
3.  **法源依據：** 雖然法條原意可能是指接獲執法機關通知後的「暫緩」，但金融機構基於風險自主決定的「延遲處理」，可以解釋為在履行其注意義務。若能獲得金融主管機關的函釋支持，其適法性將更為明確。

透過這套系統，銀行不僅保護了客戶的資產，也履行了社會責任，並在監管要求上有更堅實的基礎。
""")