In [1]:
# Wanqi Zhu, June 2018
import numpy as np
import math
import line_profiler
%load_ext line_profiler
import matplotlib.pyplot as plt


First, we show an implementation of $L_p$-norm-sketch over an iterable of elements. Useful for testing correctness and when the data stream needs to be processed all at once.

This uses numpy vectorization to speed up computation by a significant amount. Running over a large data stream is slow, but in most real life practices, we care about the time *per query*, which is very fast ($O(s_1s_2)$)

In the code below, we store a $s_1 X s_2$ grid of elements with their respective counters. At the end, we take rowwise mean (axis=1) and columnwise median (axis=0) to find our final estimate.


In [2]:
def lp_norm_sketch(A, k, n, _lambda, _epsilon):
    # compute the number of counters we need
    c1 = 23  # a constant that can be anything bigger than 4
             # if we want to minimize s1*s2, we can find this constant by searching to make s2 ~= 1
    s1 = math.ceil(c1*(k*n**(1-1/k)-1)/(_lambda**2))
    s2 = math.ceil(2/c1/((1/2-1/c1)**2) *math.log(1/_epsilon))
    #print(s1*s2)
    
    X = np.zeros((s2, s1))  # the random element we care about
    r = np.zeros((s2, s1))  # stores the frequency count
    
    for cnt, a in enumerate(A):
        # since we don't know m in advance, we
        # update X and r as we go
        # when we encounter the m-th element, we update w.p. 1/m
        to_change = np.random.rand(s2, s1) < 1/(1+cnt)
        X[to_change] = a
        r[to_change] = 0  # reset counter as needed
        r[X == a] += 1
    
    return np.median(np.mean((cnt+1)*(r**k - (r-1)**k), axis=1), axis=0)



In [3]:
# generating some arrays for testing
n = 50
m = 1000
A = np.random.randint(n, size=(m, ))
A


array([11, 13,  7,  0, 40, 12, 34, 17,  6, 43, 38, 44, 37, 33,  1,  8, 38,
       38, 35, 48,  4, 36, 26, 35, 11,  0, 47, 48, 24, 32,  5, 12, 32, 40,
       41, 31, 47,  6, 48, 10, 10, 39,  9, 36,  8, 22,  2, 34, 11,  7, 17,
       39, 42, 31, 10, 10, 40,  5,  3, 14,  6, 36, 42,  3, 17, 22, 36, 41,
       37, 33, 45, 24, 48, 38, 24, 16, 34, 26,  6, 19,  5,  6,  4, 47, 44,
       16, 45, 40, 29, 29, 27, 22, 24, 47, 37, 21, 31, 31, 27, 16,  9, 14,
       19, 23, 38, 34, 36, 45, 45, 18,  9, 17, 22, 17, 34, 14,  2,  2, 28,
       48, 34, 18, 22, 11, 39, 32,  2, 46, 43, 31, 38, 31, 27,  3, 43, 37,
       48, 17,  8, 29, 31, 26,  3, 25,  0,  4, 42, 34, 39, 45, 36, 22, 33,
       28, 17, 18, 43, 23,  2, 42, 21, 14, 47, 15,  6,  1, 45, 34, 40, 37,
        4, 22, 26, 30, 49, 37,  0,  2, 12, 25, 47, 47,  3, 13, 49, 38, 45,
       44,  0, 30, 31, 27, 28,  4, 47, 22, 44,  2,  6, 27, 17, 48, 30,  3,
       34, 38, 30, 20, 47, 49, 29, 33, 45, 41, 36, 45, 26, 20, 47, 13, 22,
       28,  9,  0, 37, 25

In [4]:
# finds the true Lp-norm through brute force estimation
# this is fast, but requires us knowing the full data stream in advance (linear space in size of data stream)
uniq, cnts = np.unique(A, return_counts=True)
k = 2
np.sum(cnts**k)


21432

In [5]:
# testing. You should see the result to be similar to the true answer above
[lp_norm_sketch(A, k, n, 0.1, 0.1) for i in range(10)]


[21333.939855096436,
 21380.388394481753,
 21418.301518509943,
 21404.141992258577,
 21292.32143447911,
 21352.73100208423,
 21372.051477156187,
 21576.90144572733,
 21508.684288880802,
 21434.049029013797]

In [6]:
# we can run profiling to see which steps takes the longest. Turns out it's the random number generator!
%lprun -f lp_norm_sketch lp_norm_sketch(A, k, n, 0.1, 0.1)


Now, let's play with testing correctness on lambda epsilon bounds

In [7]:
def test_lp_norm_correctness(A, k, n, _lambda, _epsilon, printOut=True):
    uniq, cnts = np.unique(A, return_counts=True)
    true = np.sum(cnts**k)
    
    estimate = lp_norm_sketch(A, k, n, 0.1, 0.1)
    
    if printOut:
        print("""Testing on array of length {} with parameters k={}, n={}, lambda={}, epsilon={}\n
True value: {:.2f} Estimate: {:.2f} Error range: ({:.2f}, {:.2f}) w.p. {:.2f}\n\nWITHIN RANGE? {}\n\n"""
          .format(len(A), k, n, _lambda, _epsilon, true, estimate, (1-_lambda)*true, (1+_lambda)*true, 1-_epsilon,
          abs(1 - estimate/true) < _lambda))
    
    return true, estimate, abs(1 - estimate/true) < _lambda



In [8]:
test_lp_norm_correctness(A, k, n, 0.2, 0.1)


Testing on array of length 1000 with parameters k=2, n=50, lambda=0.2, epsilon=0.1

True value: 21432.00 Estimate: 21614.68 Error range: (17145.60, 25718.40) w.p. 0.90

WITHIN RANGE? True




(21432, 21614.682237734476, True)

In [None]:
# This checks how many of our estimates are within the expected bound
def run_random_tests(n, m, k, _lambda = 0.1, _epsilon = 0.1, trials=100):
    true = []
    estimate = []
    correct = []
    
    for i in range(trials):
        A = np.random.randint(n, size=(m, ))
        t, e, c = test_lp_norm_correctness(A, k, n, _lambda, _epsilon, printOut=False)
        true.append(t)
        estimate.append(e)
        correct.append(c)
        
    print(sum(correct)/trials, 1-_epsilon)
    
    return true, estimate, correct



In [None]:
run_random_tests(10, 50, 2, trials=10000)



All of the above is useful only when we know the data stream ahead of time -- and the whole point of using sublinear space is that we don't! As such, the following implementation is more applicable.

We can initialize a class and then call *see(value)* to process each element in the stream as it comes along, e.g. along a network router. Then anytime we want an estimate, we call *estimate()*.

For further work, extending this class into an easily usable format, e.g. a module or command line interface, would be useful.

In [None]:
class lp_norm_sketch():
    def __init__(self, k, n, _lambda, _epsilon, stream=None):
        """
            Optionally pass in a pre-existing stream or to sample elements from
        """
        self.k = k
        self.n = n
        self.lambda = _lambda
        self.epsilon = _epsilon
        self.stream = stream
        
        self.s1 = math.ceil(8*k*n**(1-1/k)/(_lambda**2))
        self.s2 = math.ceil(2*math.log(1/_epsilon))

        self.X = np.zeros((s2, s1))  # the random element we care about
        self.r = np.zeros((s2, s1))  # stores the frequency count
        
        self.m = 0
        

    def see(val=None):
        """processes the next element of the stream"""
        if val is None:
            if stream is None:
                raise Exception("No stream or value provided")
            
            val = next(stream)

        self.m += 1

        to_change = np.random.rand(self.s2, self.s1) < 1/m
        self.X[to_change] = a
        self.r[to_change] = 0  # reset counter as needed
        self.r[X == a] += 1
        
        
    def estimate():
        return np.median(np.mean(self.m*(self.r**self.k - (self.r-1)**self.k), axis=1), axis=0)
        

For more testing and graphs of results, see [project.py](project.py).

