In [1]:
import hyperloglog
import random
import string
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import math

## One example

In [138]:
# Parameters
k = 32  # Number of HLL centers
m = 100000  # Total set size with multiplicities
p = 1000 # Total unique elements
precision = 0.01

In [125]:
def random_string(length=30):
    return ''.join(random.choices(string.ascii_letters, k=length))

# items = [random_string() for _ in range(m)]
pool = [random_string() for _ in range(p)]  # or any base size

# Sample m items from this pool with replacement
items = random.choices(pool, k=m)

### Get the estimate as if processing everything at one center

In [126]:
test_one_center = hyperloglog.HyperLogLog(precision)

In [127]:
for item in items:
    test_one_center.add(item)
one_center_cardinality = len(test_one_center)
print(f"Processing everything at one center: {one_center_cardinality}")

Processing everything at one center: 1002


### Processing at k different centers

In [139]:
# Create HLLs
centers = [hyperloglog.HyperLogLog(precision) for _ in range(k)]

In [140]:
check = {i:0 for i in range(k)}

In [141]:
# Assign each item to a random HLL
for item in items:
    center_idx = random.randint(0, k - 1)
    centers[center_idx].add(item)
    check[center_idx] += 1

In [142]:
# check

In [143]:
# # Optionally, inspect each center's estimate
# estimates = [len(center) for center in centers]
# for i, est in enumerate(estimates):
#     print(f"Center {i}: Estimated count = {est}")

# # Total estimated cardinality across all centers
# total_estimated = sum(estimates)
# print(f"\nSum of individual HLL estimates: {total_estimated}")
# print(f"Actual number of items: {m}")

In [144]:
coordinator = hyperloglog.HyperLogLog(precision)

In [145]:
num_iterations = int(math.log(k, 2))
number_bits = len(coordinator.M)
index = 0
bits = math.ceil(number_bits // num_iterations)

In [146]:
updated_centers = centers

In [147]:
for _ in range(num_iterations):
    print(index)
    intermediate_M = [[0 for _ in range(0, index)] + center.M[index:min(index+bits,number_bits)] + [0 for _ in range(min(index+bits,number_bits),number_bits)] for center in updated_centers]

    for i in range(len(updated_centers)):
        coordinator.update_M(intermediate_M[i])
    
    top_half_indices = sorted(
        range(len(updated_centers)),
        key=lambda i: np.mean(centers[i].M[:min(index+bits,number_bits)]) if len(centers[i].M) >= 10 else -np.inf,
        reverse=True
    )[:len(updated_centers) // 2]

    updated_centers = [centers[i] for i in top_half_indices]
    
    print(len(updated_centers))
    index = min(index+bits,number_bits)

distributed_cardinality = len(coordinator)
print(f"Processing in a distributed manner: {distributed_cardinality}")

0
16
3276
8
6552
4
9828
2
13104
1
Processing in a distributed manner: 1001


In [149]:
print(f"The difference between the estimates of processing all the items at once center and in a distributed setting: {one_center_cardinality - distributed_cardinality}")

The difference between the estimates of processing all the items at once center and in a distributed setting: 1


### Ablation results
- Keep the number of distinct elements p and number of centers k constant and vary the set size m
- Keep the number of distinct elements p and set size m constant and vary the number of centers k
- k,m constant; p vary
