# Heavy-Hitter Streaming: Algorithms 3.1 / 3.2 / 3.3 — Implementations & Demos

擬似コードを Python 実装し、**同一データ**で挙動差を比較する。

- Algorithm 3.1 = Misra–Gries（k-1 カウンタ）
- Algorithm 3.2 = Lossy Counting（Δ=⌊n/k⌋、バケット境界で一括削除）
- Algorithm 3.3 = SpaceSaving（最小カウンタ置換、常に k 個保持）


## Implementations

In [None]:
from typing import Dict, Iterable, List, Tuple, Any

def top_k_from_counts(counts: Dict[Any, int], k: int) -> List[Tuple[Any, int]]:
    return sorted(counts.items(), key=lambda x: (-x[1], x[0]))[:k]

class MisraGries:
    def __init__(self, k: int):
        assert k >= 2
        self.k = k
        self.n = 0
        self.T: Dict[Any, int] = {}
    def update(self, item: Any):
        self.n += 1
        if item in self.T:
            self.T[item] += 1
            return
        if len(self.T) < self.k - 1:
            self.T[item] = 1
            return
        to_del = []
        for j in list(self.T.keys()):
            self.T[j] -= 1
            if self.T[j] == 0:
                to_del.append(j)
        for j in to_del:
            del self.T[j]
    def consume(self, stream: Iterable[Any]):
        for x in stream:
            self.update(x)
        return self
    def candidates(self) -> Dict[Any, int]:
        return dict(self.T)
    def topk(self, k: int | None = None) -> List[Tuple[Any, int]]:
        return top_k_from_counts(self.T, k or len(self.T))

class LossyCounting:
    def __init__(self, k: int):
        assert k >= 2
        self.k = k
        self.n = 0
        self.delta = 0
        self.T: Dict[Any, int] = {}
    def _maybe_prune(self):
        new_delta = self.n // self.k
        if new_delta != self.delta:
            self.delta = new_delta
            for j, c in list(self.T.items()):
                if c < self.delta:
                    del self.T[j]
    def update(self, item: Any):
        self.n += 1
        if item in self.T:
            self.T[item] += 1
        else:
            self.T[item] = 1 + self.delta
        self._maybe_prune()
    def consume(self, stream: Iterable[Any]):
        for x in stream:
            self.update(x)
        return self
    def candidates(self) -> Dict[Any, int]:
        return dict(self.T)
    def topk(self, k: int | None = None) -> List[Tuple[Any, int]]:
        return top_k_from_counts(self.T, k or len(self.T))

class SpaceSaving:
    def __init__(self, k: int):
        assert k >= 1
        self.k = k
        self.n = 0
        self.T: Dict[Any, int] = {}
    def update(self, item: Any):
        self.n += 1
        if item in self.T:
            self.T[item] += 1
            return
        if len(self.T) < self.k:
            self.T[item] = 1
            return
        j, cj = min(self.T.items(), key=lambda x: x[1])
        del self.T[j]
        self.T[item] = cj + 1
    def consume(self, stream: Iterable[Any]):
        for x in stream:
            self.update(x)
        return self
    def candidates(self) -> Dict[Any, int]:
        return dict(self.T)
    def topk(self, k: int | None = None) -> List[Tuple[Any, int]]:
        return top_k_from_counts(self.T, k or len(self.T))

## Evaluation helpers

In [None]:
from collections import Counter
import pandas as pd

def run_all(stream, k=5):
    mg = MisraGries(k).consume(stream)
    lc = LossyCounting(k).consume(stream)
    ss = SpaceSaving(k).consume(stream)
    truth = Counter(stream)
    return mg, lc, ss, truth

def compare(mg, lc, ss, truth, k_show=10):
    rows = []
    algos = [('MG', mg.candidates()), ('LC', lc.candidates()), ('SS', ss.candidates())]
    keys = set()
    for _, d in algos:
        keys |= set(d.keys())
    keys |= set([x for x,_ in truth.most_common(k_show)])
    for key in keys:
        t = truth.get(key, 0)
        rows.append({
            'item': key,
            'truth': t,
            'MG': mg.candidates().get(key, 0),
            'LC': lc.candidates().get(key, 0),
            'SS': ss.candidates().get(key, 0),
            'abs_err_MG': abs(mg.candidates().get(key, 0) - t),
            'abs_err_LC': abs(lc.candidates().get(key, 0) - t),
            'abs_err_SS': abs(ss.candidates().get(key, 0) - t),
        })
    df = pd.DataFrame(rows)
    df = df.sort_values('truth', ascending=False).head(k_show)
    return df


## Demo 1 — 合成ストリーム（heavy を混ぜた一般ケース）
A, B を heavy にし、そのほかは中小頻度。k=5。3 アルゴリズムを同一ストリームで比較。

In [None]:
import random
random.seed(1)
stream = ['A']*120 + ['B']*90 + ['C']*40 + ['D']*35 + ['E']*30 + ['F']*20
random.shuffle(stream)

mg, lc, ss, truth = run_all(stream, k=5)
print('n =', len(stream))
print('MG top:', mg.topk(5))
print('LC top:', lc.topk(5))
print('SS top:', ss.topk(5))
compare(mg, lc, ss, truth, k_show=10)

## Demo 2 — Zipf っぽい分布
長めのストリームで長尾。k=10。

In [None]:
random.seed(2)
items = [chr(ord('A')+i) for i in range(20)]
weights = [1/(i+1) for i in range(20)]
total = sum(weights)
probs = [w/total for w in weights]
stream = random.choices(items, probs, k=2000)

mg, lc, ss, truth = run_all(stream, k=10)
print('n =', len(stream))
print('MG top:', mg.topk(10))
print('LC top:', lc.topk(10))
print('SS top:', ss.topk(10))
compare(mg, lc, ss, truth, k_show=15)

## Demo 3 — 交互出現（最小カウンタ置換の有利さ）
大量のレア要素 R_i と heavy H を交互に挿入。k=20。SpaceSaving が強いパターン。

In [None]:
rare = [f'R{i}' for i in range(1000)]
stream = []
for r in rare:
    stream += ['H', r]  # H が 1/2 を占める

mg, lc, ss, truth = run_all(stream, k=20)
print('n =', len(stream))
print('MG top:', mg.topk(5))
print('LC top:', lc.topk(5))
print('SS top:', ss.topk(5))
compare(mg, lc, ss, truth, k_show=10)

## Notes
- Misra–Gries は全体デクリメントで候補がリセット気味になるが、理論誤差は ⌊n/k⌋ に抑えられる。
- Lossy Counting はバケットごとに遅延削除。データスパース時に有利。
- SpaceSaving は常に k 個保持し最小置換。交互パターンや多種類ノイズに強く、heavy の追跡が頑健。