<h1>Data Streaming Algorithms</h1>

<h2>HW1</h2>

<h2>Dry Part

<h3>Morris algorithm

In order to reduce the space requirement to logloglog(n) bits we'll maintain a counter X that will estimate the size of loglog(n) (instead of log(n)), thus meeting the space requirements. In order to do that, we'll update the algorithm to increment X with probability $\dfrac{1}{2^{2^j}}$ and return $log(2^{2^X} - 1)$.

We'll prove that $E[2^{2^X}] = 2^n + 1$ and conclude that $log(2^{2^X} - 1) \approx log(2^n + 1 - 1) = n$ and therefore our estimator is unbiased.

Proof that $E[2^{2^X}] = 2^n + 1$ by induction:

for $k$, we define $X_k$ to be the value of X after passing on $k$ elements from the stream.

for $k = 0$, $X_0 = 0$ because no elements were processed yet:
$E[2^{2^{X_k}}] = E[2^{2^{X_0}}] = 2 = 2^0 + 1 = 2^k + 1$

Assuming $E[2^{2^{X_k}}] = 2^k + 1$:

$E[2^{2^{X_{k+1}}}] =_1 \sum_{j=0}^{\infty} Pr[X_k = j] \cdot E[2^{2^{X_{k+1}}} | X_k = j] =_2 \sum_{j=0}^{\infty} Pr[X_k = j] \cdot (\dfrac{1}{2^{2^j}} \cdot 2^{2^{j+1}} + (1 - \dfrac{1}{2^{2^j}}) \cdot 2^{2^j}) =_3 \sum_{j=0}^{\infty} Pr[X_k = j] \cdot (2 \cdot 2^{2^j} - 1) =_4 2 \cdot \sum_{j=0}^{\infty} Pr[X_k = j] \cdot 2^{2^j} - \sum_{j=0}^{\infty} Pr[X_k = j] =_5 2 \cdot E[2^{2^{X_k}}] - 1 =_6 2 \cdot (2^k + 1) - 1 = 2^{k+1} + 1$

1. Law of total expectation
2. Expectation and Morris's estimator definitions
3. Simple algebra
4. Simple algebra
5. Complete probability and expectation definitions
6. Induction step

<h3>Reservoir sampling

In class we proved the Reservoir sampling algorithm for $j>k$. Now we will provide a supplementary proof for $j<=k$.
<br>
$P[a_{j}\ will\ be\ sampled] =_{1} P[a_{j}\ will\ not\ be\ replaced\ on\ j=k+1] \cdot  ... \cdot P[a_{j}\ will\ not\ be\ replaced\ on\ j=n] =_{2} (1 - P[a_{j}\ will\ be\ replaced\ on\ j=k+1]) \cdot  ... \cdot (1 - P[a_{j}\ will\ be\ replaced\ on\ j=n]) =_{3} (1 - \dfrac{1}{k+1}) \cdot  ... \cdot (1 - \dfrac{1}{n}) =_{4} \dfrac{k}{k+1} \cdot  ... \cdot \dfrac{n-1}{n} =_{5} \dfrac{k}{n}$
<br>
1. If $a_{j}$ is sampled, then it's not replaced in iteration k+1 and k+2 etc.
2. Probability complement.
3. $P[a_{j}\ will\ be\ replaced\ on\ j=k+1] = \dfrac{k}{k+1} \cdot \dfrac{1}{k}$

    $\dfrac{k}{k+1}$ - The probability to sample an element from the stream is the sample size divided by the element index. $\dfrac{1}{k}$ - The probabiliyy to replace a specific element from the sample is 1 divided by the size of the sample.

4. Simple algebra.
5. Simple algebra.

<h2> Wet Part

In [1]:
import numpy as np
import datetime
import threading

np.random.seed(42)

In [2]:
def fm_alpha(stream_arg):
    hash_function = dict(map(lambda key, value : (key, value) , range(1, 10001), np.random.uniform(0, 1, 10000)))
    x = 1
    for s in stream_arg:
        h_a = hash_function.get(s)
        if h_a < x:
            x = h_a
    return int(1 / x), x

In [3]:
def fm_beta(stream_arg, alpha_instances_arg):
    x_s = []
    for _ in range(alpha_instances_arg):
        _, min_hash = fm_alpha(stream_arg)
        x_s.append(min_hash)
    return int((1 / np.mean(x_s)) - 1)

In [4]:
def fm_final(stream_arg, beta_instances_arg, alpha_instances_arg, results):
    z_t = []
    for _ in range(beta_instances_arg):
        z_t.append(fm_beta(stream_arg, alpha_instances_arg))
    results.append(np.median(z_t))

In [5]:
# Generates a stream with the values 1 - 10000 with uniformly distributed frequencies between 10 - 25.
stream = []
for i in range(1, 10001):
    stream.extend(i for j in range(np.random.randint(10, 15 + 1)))
np.random.shuffle(stream)
unique, counts = np.unique(stream, return_counts=True)
print(f'unique elements in stream {len(unique)}')
print(f'total elements in stream {sum(counts)}')
uniques, _ = fm_alpha(stream)
print(f'FM alpha unique elements estimation {uniques}')

unique elements in stream 10000
total elements in stream 124999
FM alpha unique elements estimation 34071


In [6]:
experiments_count = 50
study_results = {}
parallel_start = datetime.datetime.now()
for beta_instances in [1]:
    for alpha_instances in [10, 25, 50]:
        results = []
        threads = []
        for i in range(experiments_count):
            thread = threading.Thread(target=fm_final, args=(stream, beta_instances, alpha_instances, results))
            threads.append(thread)
            thread.start()
        for thread in threads:
            thread.join()
        key = f'fm_final_{beta_instances}_fm_beta_{alpha_instances}_fm_alpha'
        study_results[key] = results
        print(f'{key} expectation: {np.mean(results)}')
print(f'total time {datetime.datetime.now() - parallel_start}')

fm_final_1_fm_beta_10_fm_alpha expectation: 11690.38
fm_final_1_fm_beta_25_fm_alpha expectation: 9887.26
fm_final_1_fm_beta_50_fm_alpha expectation: 10244.12
total time 0:01:08.496425
