In [1]:
import random

def misra_gries(stream, k):
    """
    Misra–Gries アルゴリズム:
    入力: データストリーム (stream), パラメータ k
    出力: 候補要素とその推定カウント
    """
    counter = {}
    for x in stream:
        if x in counter:
            # 既に候補にある場合はカウントを増加
            counter[x] += 1
        elif len(counter) < k - 1:
            # 空きスロットがあれば新しい要素を追加
            counter[x] = 1
        else:
            # 空きがなく、候補でもない場合：
            # すべてのカウントを -1、ゼロになった要素を削除
            to_del = []
            for a in counter:
                counter[a] -= 1
                if counter[a] == 0:
                    to_del.append(a)
            for a in to_del:
                del counter[a]
    return counter


def verify_exact(stream, candidates):
    """
    二度目の走査で候補要素の真のカウントを計算
    """
    exact = {a: 0 for a in candidates}
    for x in stream:
        if x in exact:
            exact[x] += 1
    return exact


# ===== 実験デモ =====
if __name__ == "__main__":
    # ネットワーク流量を模擬：192.168.0.x のIPをランダム生成
    ips = [f"192.168.0.{random.randint(1,20)}" for _ in range(10000)]

    # 人為的に「高頻度IP」を追加
    ips += ["192.168.0.10"] * 3000
    ips += ["192.168.0.5"] * 2000
    random.shuffle(ips)

    k = 10  # 出現回数が N/k を超える候補を探す
    N = len(ips)

    # 一度目の走査で候補を抽出
    candidates = misra_gries(ips, k)
    # 二度目の走査で真のカウントを検証
    exact = verify_exact(ips, candidates)

    print(f"データストリームの長さ: {N}")
    print(f"候補IP: {candidates}")
    print("検証後の真の出現回数:")
    for ip, cnt in exact.items():
        if cnt > N // k:  # 出現回数が N/k を超えるものが heavy hitter
            print(f"{ip} は {cnt} 回出現 (頻度 {cnt/N:.2%})")


データストリームの長さ: 15000
候補IP: {'192.168.0.10': 2414, '192.168.0.5': 1353, '192.168.0.8': 2, '192.168.0.20': 1}
検証後の真の出現回数:
192.168.0.10 は 3537 回出現 (頻度 23.58%)
192.168.0.5 は 2475 回出現 (頻度 16.50%)


In [1]:
import random

def misra_gries(stream, k):
    counter = {}
    for x in stream:
        if x in counter:
            counter[x] += 1
        elif len(counter) < k - 1:
            counter[x] = 1
        else:
            to_del = []
            for a in counter:
                counter[a] -= 1
                if counter[a] == 0:
                    to_del.append(a)
            for a in to_del:
                del counter[a]
    return counter


def verify_exact(stream, candidates):
    exact = {a: 0 for a in candidates}
    for x in stream:
        if x in exact:
            exact[x] += 1
    return exact


def classify_users(exact, N, k):
    attackers, normal_users = {}, {}
    threshold = N // k
    for ip, cnt in exact.items():
        if cnt > threshold:
            attackers[ip] = cnt
        else:
            normal_users[ip] = cnt
    return attackers, normal_users


# ===== 実験デモ =====
if __name__ == "__main__":
    # 大規模なネットワーク流量を模擬
    N_normal = 200_000   # 正規トラフィック
    normal_ips = [f"10.0.{random.randint(0,255)}.{random.randint(1,254)}"
                  for _ in range(N_normal)]

    # 攻撃トラフィック（特定IPを大量に追加）
    attack_ip1 = "10.0.66.66"
    attack_ip2 = "10.0.99.99"
    ips = normal_ips + [attack_ip1] * 50_000 + [attack_ip2] * 30_000
    random.shuffle(ips)

    N = len(ips)
    k = 50  # 上位候補の数を設定

    # Step 1: 候補抽出
    candidates = misra_gries(ips, k)

    # Step 2: 候補の真の回数を計算
    exact = verify_exact(ips, candidates)

    # Step 3: 攻撃者と正規利用者を分類
    attackers, normal_users = classify_users(exact, N, k)

    print(f"データストリーム総数: {N}")
    print("\n=== 攻撃者IP ===")
    for ip, cnt in attackers.items():
        print(f"{ip} 出現 {cnt} 回 (割合 {cnt/N:.2%})")

    print("\n=== 正規利用者(一部表示) ===")
    for ip, cnt in list(normal_users.items())[:10]:
        print(f"{ip} 出現 {cnt} 回")

データストリーム総数: 280000

=== 攻撃者IP ===
10.0.66.66 出現 50003 回 (割合 17.86%)
10.0.99.99 出現 30002 回 (割合 10.71%)

=== 正規利用者(一部表示) ===
10.0.125.243 出現 5 回
10.0.229.156 出現 6 回
10.0.212.192 出現 3 回
10.0.12.117 出現 6 回
10.0.169.73 出現 1 回
10.0.28.232 出現 2 回
10.0.7.5 出現 6 回
10.0.103.121 出現 4 回
10.0.109.3 出現 3 回
10.0.96.20 出現 3 回
