# Stream Mining

Stream mining is the practice of data mining a continuous stream of data records. This data stream either arrives too fast or the complete stream is too large such that it is not feasible to process using conventional data mining techniques, even ones that are tailored for big data.

One common aspect of data mining streams is that it has to be processed quickly, usually as soon as the data arrives. We do not have the luxury of first storing and then analyzing the data at a later time that is more convenient. In this notebook we will show the techniques for doing the following analysis on streaming data:

1. **Sampling**
2. **Filtering**
3. **Counting Distinct Elements**
4. **Estimating Moments**
5. **Counting Ones**
6. **Most Commong "Recent" Elements**

Most of the algorithms for stream mining involve summarizing the stream in some way. These techniques utilizes hash functions in their implementation. In most cases, it will be better to just get the "approximate" answer bounded by some error, instead of getting the exact answer which may not be feasible to compute.

## Stream Data Model

<img src="StreamDataModel.png" height=600 width=400>

We show a canonical example of a streaming data management system above. A number of stream is feeeing into the system, each providing data on its own schedule. The data from the stream need be uniform both in the data type and the rate at which the data arrives. The fact that the rate of arrival of stream elements is not under the control of the system is the main distinguishing factor between stream mining and a regular data mining system.

Streams may eventually be saved in a longer term archival storage like a disk or database, but we have to assume that we cannot answer analytics queries using the archived data because retrieving it is a time-consuming process and the latency of the streaming data coming in is much faster. Instead there is a limited working storage which can be accessed faster that we can use, but it has insufficient capacity to store the entire stream.

### Stream Sources

1. Sensor Data
2. Video Cameras
3. Internet Web Traffic
4. Server Logs

### Stream Queries

There are two main types of queries we want to make of streaming data. The first are standing queries which are permanently executing. For example if we have streaming data from temperature sensors, our standing queries could be:

* Output alerts when temperature data passes a certain threshold.
* Average temperature using the last $n$ elements in a stream.
* What's the maximum temperature ever recorded in the stream

All of these are calculations that can be done quickly when new data appears in the stream with no store the stream in its entirety. 

The other form of queries are *adhoc queries*. Here the queries can be one-time or the parameter of the query can be changing based on the need. A common approach would be to store a sliding window of the $n$ most recent data in the working storage of the stream management system. 

### Examples of Stream Processing Systems

<table>
    <tr>
        <td><img src="kafka.png" height=180 width=180></td>
        <td><img src="kinesis.png" height=200 width=200></td>
    </tr>
</table>

Two examples of streaming systems are: Apache Kafka (https://kafka.apache.org) and Amazon Kinesis (https://aws.amazon.com/kinesis/). For the examples in this notebook we will Apache Kafka to simulate data coming from a stream.

In [2]:
import numpy as np
import sys
import hashlib
import random
import math

import dask.array as da
import dask.dataframe as dd

random.seed(10) #Set seed for repeatability

#Hash a string to an integer
def hashInt(val, buckets=sys.maxsize, hashType='default'):
    if hashType == 'default':
        return hash(val) % buckets
    elif hashType == 'md5':
        return hash(hashlib.md5(val.encode('utf-8')).hexdigest()) % buckets

#Hash a string to a bit string
def hashBits(val, hashType='default'):
    if hashType == 'default':
        return bin(hash(val))
    elif hashType == 'md5':
        return bin(hash(hashlib.md5(val.encode('utf-8')).hexdigest()))
    
words = np.array(open('words.txt').read().splitlines())
userIds = np.arange(1, 1000)

### Kafka Example

In [None]:
import kafka

#Build our streams
words = np.array(open('words.txt').read().splitlines()) #copied from /usr/share/dict/words
userIds = np.arange(1, 1000)

producer = kafka.KafkaProducer()

#Create streams of 2 million elements
for i in range(1000000):
    if (i+1) % 100000 == 0:
        print(i+1)
        
    producer.send('sampling', (str(np.random.choice(words))).encode('utf-8'))

    producer.flush()
    
producer.close()

In [None]:
consumer = kafka.KafkaConsumer('test', auto_offset_reset='earliest')
n = 0
for msg in consumer:
    print(msg.value.decode("utf-8"))
    n += 1
    if n == 8:
        break
        
consumer.close()

## Sampling Data in a Stream

We start with the general problem of sampling data. We want to be able to select a subset of the stream to make calculations easier, but also have the results be statistically representative of the stream as a whole. One example might be a search engine wanting to answer the question of "What fraction of the typical user’s queries were repeated over the past month?". Assume also that due to space constraints, we wish to store only $\frac{1}{10}$th of stream elements.

An easy but incorrect approach would be to just generate a random number from 0 to 9 for each incoming data, and only consider data when the random number is equal to 0. However this will give us the wrong answer as for each user we would be drastically reducing the fraction of repeated queries too. If a user has $s$ queries which were done exactly once, and $d$ queries which were done exactly twice, and no queries were done more than 2 times. The correct answer for the fraction of repeated queries for this user is $\frac{d}{s+d}$. However using the sampling method just detailed, the fraction we will get is $\frac{d}{10s + 19d}$.

The right way would be to only consider $\frac{1}{10}$th of all users, but consider all queries for those randomly selected users. Assume further that the possible list of users is so large that isn't feasible to store which users should have their queries tracked and which users shouldn't. We can instead use a hash function that maps the user to 1 of 10 buckets. If the user hashes to the first bucket, then this user is someone whose queries we would want to track. Note that we do not actually need to save the users into the buckets, we just need to know at runtime which bucket the user goes to using the fast hash function. This saves us from having to keep track of all users in the universal set. 

In [7]:
samples = {} 

n = 0
while n < 1000000:
    userId = np.random.choice(userIds)
    word = np.random.choice(words)
    
    #hash to 10 buckets. Process only if hash points to first bucket
    if hashInt(userId, 10) == 0:
        if userId not in samples: 
            samples[userId] = {}
        
        if word not in samples[userId]:
            samples[userId][word] = 0
        
        samples[userId][word] += 1
        
    #Print fraction of repeated queries every 100000 elements
    n += 1
    if n % 100000 == 0:
        fractionRepeats = 0
        for userId in samples:
            repeats = 0
            
            for word in samples[userId]:
                if samples[userId][word] > 1:
                    repeats += 1
    
            fractionRepeats += repeats / len(samples[userId])

        fractionRepeats /= len(samples)
        print("Iterations:", n, 'Average Repeat Per User:', fractionRepeats)

Iterations: 100000 Average Repeat Per User: 8.48824378236143e-05
Iterations: 200000 Average Repeat Per User: 0.0004026243221023727
Iterations: 300000 Average Repeat Per User: 0.0003966167804915212
Iterations: 400000 Average Repeat Per User: 0.0004724339706863245
Iterations: 500000 Average Repeat Per User: 0.00086515906940257
Iterations: 600000 Average Repeat Per User: 0.0010044780481139246
Iterations: 700000 Average Repeat Per User: 0.0012795931598440701
Iterations: 800000 Average Repeat Per User: 0.0015721791194151934
Iterations: 900000 Average Repeat Per User: 0.0018303686818008796
Iterations: 1000000 Average Repeat Per User: 0.002072822397857278


### Fixed Size Samples

For infinite or very large streams, our set of samples will also eventually become too large for our working storage. For cases like this we may want to fix a maximum number of samples we will keep, and randomly replace older samples with newer ones over time. One technique to do this will be to do **Reservoir Sampling**.

Let $s$ be the maximum number of samples we want to keep, and $n$ be the number of samples we have been tracking from the start. 

1. While $n <= s$, keep sampling normally.
2. A new element we want to samples arrives such that $n > s$. With probability $\frac{s}{n}$, keep the new sample. Otherwise discard it.
3. If we're keeping the new sample, discard one of the older samples at random.

After $n$ elements, the sample contains each element seen so far with probability $\frac{s}{n}$. For our user query sample above, it may still be better to think of replacing users instead of their individual queries.

We implement the fixed size sampling algorithm detailed above for $s=20$ while going through our *words* list.

In [8]:
samples = []
maxSamples = 20 #s
samplesTracked = 0 #n

for w in words:
    samplesTracked += 1
    
    if len(samples) < maxSamples:
        samples.append(w)
    else:
        if random.random() < (maxSamples / samplesTracked):
            randIndex = random.randint(0, maxSamples-1)
            samples[randIndex] = w

print("Samples:", len(samples))
print(samples)

Samples: 20
['wrencher', 'wonderfulness', 'Mantuan', 'dimorphic', 'disturbance', 'Sparassis', 'unstow', 'peastone', 'downline', 'manneristically', 'spelterman', 'darned', 'calathiform', 'oblongly', 'excavationist', 'oneirocrit', 'subloreal', 'unspiritedly', 'binomially', 'thioarsenic']


## Filtering Streams

Another common use case is filtering data in a stream. In this case we have a set $S$ of data that we want to filter for, but the size of $S$ is too large to fit in memory. Additionally, it is assumed that the elements in $S$ are only a small fraction of the universal set, such that it will be a big benefit just to be able to quickly verify that a value isn't part of $S$. In that case we can use a **Bloom filter**.

### Bloom Filter

A Bloom filter is a probabilistic data structure that is used for testing whether a value is a member of a set. Instead of getting absolute *True* or *False* values when testing for membership, the Bloom filter instead returns True or False with the following meaning:

1. True - Value is "possibly" in set.
2. False - Value is **definitely** not in set.

In other words the Bloom filter may return a false positive, but never a false negative. In return for this ambiguity the Bloom filter uses far less space than a hash table or a hash set, and is easier to keep in memory for fast lookups. There is a tradeoff between the space needed by the Bloom filter and the probability of getting a false positve, the larger the space utilized the less chance for getting false positives.

In [9]:
#Setup bloom filter and filtering functions

size = 100000
bloomFilter = np.zeros(size, dtype=int)

#Filter for 10% of words
filterSet = np.random.choice(words, int(len(words) / 10))


for key in filterSet:
    #Hash twice using different methods, then set the index for both to 1.
    index1 = hashInt(key, buckets=size)
    index2 = hashInt(key, buckets=size, hashType='md5')

    bloomFilter[index1] = 1
    bloomFilter[index2] = 1

def inBloomFilter(val):
    #Hash twice, then verify that all values of the hash indexes are 1
    index1 = hashInt(val, buckets=size)
    index2 = hashInt(val, buckets=size, hashType='md5')
    
    if bloomFilter[index1] == 1 and bloomFilter[index2] == 1:
        return True
    else:
        return False

In [12]:
n = 0

bloomYes = 0
actualYes = 0


while n < 1000000:
    word = np.random.choice(words)
    
    if inBloomFilter(word):
        bloomYes += 1
        
        if word in filterSet:
            actualYes += 1
            
    n += 1
    if n % 100000 == 0:
        print('Iterations', n, 'Maybe in Filter Set:', bloomYes, 'Really in Filter Set:', actualYes)


Iterations 100000 Maybe in Filter Set: 21455 Really in Filter Set: 9572
Iterations 200000 Maybe in Filter Set: 42892 Really in Filter Set: 19080
Iterations 300000 Maybe in Filter Set: 64396 Really in Filter Set: 28566
Iterations 400000 Maybe in Filter Set: 85674 Really in Filter Set: 37980
Iterations 500000 Maybe in Filter Set: 106912 Really in Filter Set: 47438
Iterations 600000 Maybe in Filter Set: 128308 Really in Filter Set: 56984
Iterations 700000 Maybe in Filter Set: 149718 Really in Filter Set: 66486
Iterations 800000 Maybe in Filter Set: 171132 Really in Filter Set: 76124
Iterations 900000 Maybe in Filter Set: 192420 Really in Filter Set: 85549
Iterations 1000000 Maybe in Filter Set: 213840 Really in Filter Set: 95106


### Probability of False Positives

Let $n$ be the length of the bit-array, $m$ be the size of set $S$, and $k$ be the number of hash functions we're using. The probability of getting a false positive is $(1-e^\frac{-km}{n})^k$. For our example above, with $n=100000$, $m=23588$, and $k=2$, the probabity of a false positive is $14.14\%$


## Counting Distinct Elements in a Stream

Another common problem is counting the number of distinct elements that we have seen in the stream so far. Again the assumption here is that the universal set of all elements is too large to keep in memory, so we'll need to find another way to count how many distinct values we've seen. If we're okay with simply getting an estimate instead of the actual value, we can use the Flajolet-Martin (FM) algorithm.

### Flajolet-Martin Algorithm

In the FM algorithm we hash the elements of a strea into a bit string. A bit string is a sequence of zeros and ones, such as 1011000111010100100. A bit string of length $N$ can hold $2^N$ possible combinations. For the FM algorithm to work, we need $N$ to be large enough such that the bit string will have more possible combinations than there are elements in the universal set. This basically means that there should be no possible collisions when we has the elements into a bit string. 

The idea behind the FM algorithm is that the more distinct elements we see, the higher the likelihood that one of their hash values will be "unusual". The specific "unusualness" we will exploit here is that the bit string ends in many consecutive 0s.

For example, the bit string 1011000111010100100 ends with 2 consecutive zeros. We call this value of 2 the *tail length* of the bit string. Now let $R$ be the maximum tail length of that we have seen of any hashed bit string of the stream. The estimate of the number of distinct elements using FM is simply $2^R$.

In [15]:
n = 0
trueDistinct = set()
maxTailLength = -1

while n < 1000000:
    word = np.random.choice(words)
    trueDistinct.add(word)
    
    bitString = hashBits(word)
    tailLength = 0
    for i in reversed(range(len(bitString))):
        if bitString[i] == '0':
            tailLength += 1
        else:
            break

    if tailLength > maxTailLength:
        maxTailLength = tailLength
        
    n += 1
    if n % 100000 == 0:
        print(n, 'True Distinct:', len(trueDistinct), 'FM Approx:', 2**maxTailLength)
        

100000 True Distinct: 81616 FM Approx: 65536
200000 True Distinct: 134838 FM Approx: 524288
300000 True Distinct: 169837 FM Approx: 524288
400000 True Distinct: 192620 FM Approx: 524288
500000 True Distinct: 207627 FM Approx: 524288
600000 True Distinct: 217436 FM Approx: 524288
700000 True Distinct: 223843 FM Approx: 524288
800000 True Distinct: 228031 FM Approx: 524288
900000 True Distinct: 230650 FM Approx: 524288
1000000 True Distinct: 232459 FM Approx: 524288


In [19]:
#Library function for non-streams
def flajoletMartin(array):
    max_tail_length = 0
    
    for val in array:
        bit_string = bin(hash(hashlib.md5(val.encode('utf-8')).hexdigest()))
        
        i = len(bit_string) - 1
        tail_length = 0
        while i >= 0:
            if bit_string[i] == '0':
                tail_length += 1
            else:
                #neatly handles the '0b' prefix of the binary string too. 
                #Just break when we see "b"
                break
                
            i -= 1
            
        if tail_length > max_tail_length:
            max_tail_length = tail_length
            
    return (2**max_tail_length)

testList = []
n=0
while n < 100000:
    n += 1
    testList.append(np.random.choice(words))

print(flajoletMartin(testList))

65536


### Using Multiple Hash Functions

We can improve the estimate further by using multiple hash functions. With multiple hash functions we first split hash functions into groups, get the maximum tail length for each hash function, then get the average for each group. Lasly, we get the median over all the averages and that will be our estimate.

This way we can get estimates that aren't just powers of 2. If the correct count is between two large powers of 2, for example 7000, it will be impossible to get a good estimate using just one hash function.

## Moments

Computing the "moments" of a stream involves getting the distribution of the different elements in a stream, and is a generalization of the previous problem of counting distinct elements. Let $m_i$ be the number of occurences of element $i$ in a stream. The *kth-order moment* of a stream is the sum over all $i$ of $(m_i)^k$, or $\sum_i(m_i)^k$.

The 0th order moment of a stream is the sum of $(m_i)^0$, or the sum of 1 for each element $i$ found in the stream, or simply the count of distinct elements in the stream. We can use the Flajolet-Martin algorithm detailed above for this.

The 1st moment is the sum of $m_i$'s, or simply the length of the stream.

The 2nd moment is the sum $(m_i)^2$. This measures how uneven the distribution of the elements of the stream is, and is also called the *surprise number*. Again, if the we have the capacity to store $m_i$ for each element of the stream then this is easy to calculate. However if the number of elements if too large we need to look for alternatives such as approximation. One way to estimate the 2nd moment of the stream is to use the Alon-Matias-Szegedy (AMS) algorithm.

### Alon-Matias-Szegedy Algorithm for Second Moments

In the AMS algoritm we calculate a number of variables $X$ as we go through the stream. The more $X$'s we calculate, the more accurate our estimate will be. Each element $X$ contains:

1. $X.element$ - an element of the universal set found in the stream.
2. $X.value$ - the number of times we have seen $X.element$ in the stream since we have started tracking it.

The estimate of the 2nd moment is $n \times (2 \times X.value − 1)$, where $n$ is the length of the stream. We improve the final estimate by averaging the estimate of all $X$'s. One thing to note is we do not start tracking all the $X$ variables at the start of the stream. Instead, we start tracking each $X$ variable at random points as we go through the stream. 

As an example, suppose we have a stream of length $n=15$ and having values *a,b,c,b,d,a,c,d,a,b,d,c,a,a,b*. $m_a=5$, $m_b=4$, $m_c=3$, and $m_d=3$. The surprise number for this stream is therefore $52 + 42 + 32 + 32 = 59$. Let's calculate 3 variables $X_1$, $X_2$, and $X_3$, starting at 3rd, 8th, and 13th positions respectively at "random".

* At position 3 we find *c*, so we set $X_1.element$ to *c* and $X_1.value$ to 1. We'll encounter two more *c* as we go through the stream so the final value of $X_1.value$ is 3.

* At position 8 we find *d*, so $X_2.element=d$ and $X_2.value=1$. At the end of the stream $X_2.value=2, since we only count starting from position 8.

* At position 13 is *a*, so $X_3.element=a$. The final value of $X_3.value$ is 2.

We calculate the estimate as $n \times (2 \times X.value − 1)$, so that's $15×(2×3−1)=75$ for $X_1$ and $15×(2×2−1)=45$ for both $X_2$ and $X_3$. The average of the estimates is $(75+45+45) \div 3 = 55$, which is close to the actual value of $59$.

In [20]:
variables = {}
realDistribution = {}

n = 0

while n < 1000000:
    word = np.random.choice(words)
    
    if word in variables:
        variables[word] += 1
    else:
        if random.random() < 0.1: #add to variables with 10% chance
            variables[word] = 1
    
    #Get the real distribution for validation
    if word not in realDistribution:
        realDistribution[word] = 0
        
    realDistribution[word] += 1
    
    n += 1
    if n % 100000 == 0:
        real2ndMoment = 0
        for w in realDistribution:
            real2ndMoment += realDistribution[w] ** 2
    
        #Calculate 2nd moment using the AMS variables
        approx2ndMoment = 0
        for w in variables:
            approx2ndMoment += n * ((2 * variables[w]) - 1)
            
        approx2ndMoment /= len(variables)

        print('Iteration', n, 'Real 2nd Moment:', real2ndMoment, 'Approx 2nd Moment:', approx2ndMoment)
        

Iteration 100000 Real 2nd Moment: 142268 Approx 2nd Moment: 142076.22356186778
Iteration 200000 Real 2nd Moment: 369446 Approx 2nd Moment: 369975.5602932765
Iteration 300000 Real 2nd Moment: 680670 Approx 2nd Moment: 686277.8972605155
Iteration 400000 Real 2nd Moment: 1077736 Approx 2nd Moment: 1097115.2541452956
Iteration 500000 Real 2nd Moment: 1559600 Approx 2nd Moment: 1598531.5647043167
Iteration 600000 Real 2nd Moment: 2127440 Approx 2nd Moment: 2195491.3622203344
Iteration 700000 Real 2nd Moment: 2778316 Approx 2nd Moment: 2883885.3955207868
Iteration 800000 Real 2nd Moment: 3513340 Approx 2nd Moment: 3668286.941681869
Iteration 900000 Real 2nd Moment: 4334438 Approx 2nd Moment: 4545672.967889112
Iteration 1000000 Real 2nd Moment: 5239120 Approx 2nd Moment: 5536791.019167607


### Higher Order Elements

We can estimate higher-order elements similar to how we estimate 2nd-order elements. The only difference is instead of $n \times (2 \times X.value − 1)$ for the estimate, the formula is now $n \times (X.value^k−(X.value−1)^k)$.

Below we provide an alonMatiasSzegedy() function that takes as argument the moment value to approximate for, the number of variables to track, and the probability of an element in an array becoming a variable.

In [31]:
def alonMatiasSzegedy(array, k_moment=2, num_variables=3, var_prob=0.1):
    variables = {}
    n = len(array)
  
    for val in array:
        if val not in variables:
            if len(variables) < num_variables and random.random() < var_prob:
                variables[val] = 1
        else:
            variables[val] += 1


    amsMoment = 0
    for w in variables:
        amsMoment +=   n * (variables[w]**k_moment - ((variables[w] - 1)**k_moment))

    amsMoment /= len(variables)
    return amsMoment

testList = []
n=0
while n < 100000:
    n += 1
    testList.append(np.random.choice(words))
    
print(alonMatiasSzegedy(testList))

233333.33333333334


## Counting Ones in a Window

Suppose we have a bit stream window of length $N$ and we want to count the number of ones in the last $k$ bits. Again assume that $k$ is too large so we cannot afford to store it all $k$ bits in memory, as would be needed to get an exact count. We can use the Datar-Gionis-Indyk-Motwani (DGIM) algorithm to approximate the count with an error rate of no greater than 50%.

### Datar-Gionis-Indyk-Motwani Algorithm

Assume we have a bit stream window of '10101100111011011000101110110010110'. We give each bit a timestamp starting with 1 for the earliest bit, 2 for the 2nd bit, and so on. We divide the bits into *buckets* consisting of:

1. The timestamp of its most rightmost (most recent) end.
2. The number of ones in the bucket. This number must be a power of 2, and we refer to this as the *size* of the bucket.

Furthermore, the buckets of the bit stream have to follow the following rules:

1. The right end of a bucket is always a bit with a value 1.
2. Every bit with a value of 1 is in some bucket.
3. No bit is in more than one bucket.
4. There are just one or two buckets of any given size.
5. All sizes must be a power of 2.
6. Buckets cannot decrease in size as we move to the left.

Once we've split the bit stream into buckets, we can now estimate the number of 1's for any $k$ in the window. Find the bucket $b$ with the earliest timestamp that includes some of the $k$ most recent bits. The estimate of the number of ones is the sum of the sizes of all buckets to the right of $b$, plus half the size of $b$ itself.

For example, take the stream above having the buckets as shown and with $k=10$. We want to get the count of ones in the 10 rightmost bits, or 0110010110. The bucket with size 4 containing '11101' is the earliest bucket which contains the at least some of last $k$ bits. We take half the size of this bucket, which equals to 2, plus the sizes of the buckets to the right, so 2, 1, and 1. The DGIM estimate for the number of ones is therefore $2+2+1+1=6$, which is close to the actual answer of 5.

Let's try another example, this time counting the ones in the last **20** elements of bit stream ...100011101011010001110111101000001010101001000000000100101001100, divided into the following buckets:

<img src='BitBuckets2.png' heigh="250" width="700">

In [None]:
bitStream = '100011101011010001110111101000001010101001000000000100101001100'

k=20 #Count ones in last k=20 elements
buckets = {}
buckets[2] = 1
buckets[3] = 1
buckets[6] = 2
buckets[11] = 2
buckets[24] = 4
buckets[36] = 4
buckets[41] = 8
buckets[56] = 8

earliestBucket = True
approxCount = 0
for ts in sorted(buckets.keys(), reverse=True):
    if ts > k:
        continue
        
    if earliestBucket:
        approxCount += int(buckets[ts] / 2)
        earliestBucket = False
    else:
        approxCount += buckets[ts]
        
print('Approx Count', approxCount)

realCount = 0

for b in range(k):
    if bitStream[-b-1] == '1':
        realCount += 1

print('Real Count', realCount)

### Updating Buckets with New Data

When a new data comes in from the stream, we may need to update the buckets in a way that everything will still follow the DGIM conditions. When a new bit enters, we take the following steps:

1. Check the leftmost bucket. If the timestamp has has gone past the our window of length $N$,remove it from the list of buckets we are tracking.

2. If the new bit is 0, we do not need to do anything. 

3. If the new bit is 1, we first create a new bucket of size 1 containing the new bit. If there was only one bucket of size 1, then nothing more needs to be done.

4. If there are now 3 buckets of size 1, we combine the 2 leftmost buckets of size into one bucket of size 2.

5. Do step #4 repeatedly for the buckets with sizes greater than 1 until all the buckets follow the DGIM rules once again.

<img src="BitBuckets2Add.png" heigh="250" width="700">


In [34]:
N = 60

#Existing bucket groupings
buckets = {}
buckets[2] = 1
buckets[3] = 1
buckets[6] = 2
buckets[11] = 2
buckets[24] = 4
buckets[36] = 4
buckets[41] = 8
buckets[56] = 8


for key in sorted(buckets.keys(), reverse=True): #add all existing buckets back with their rightmost index incremented by 1
    if key+1 > N: #No need to add if bucket is now out of window
        continue
        
    buckets[key+1] = buckets[key]
    del buckets[key]

buckets[0] = 1 #create a new bucket of size 1 containing the new bit

for size in [1,2,4,8,16]:
    buckets_ = [k for k,v in buckets.items() if v==size]
    if len(buckets_) > 2:
        max_bucket = max(buckets_)
        buckets[max_bucket] = size*2
        
    continue
    
    sizeCount = 0
    sizeIndexes = []
    
    for key in sorted(buckets.keys()):
        if buckets[key] == size:
            sizeCount += 1
            sizeIndexes.append(key)

        elif buckets[key] > size:
            break
    
    #Only need to do something if more than 2 buckets have the same size
    if sizeCount > 2:
        newSize = size * 2
        
        #combine the 2 leftmost buckets of the same size into one bucket of twice the size.
        buckets[sizeIndexes[1]] = newSize
        del buckets[sizeIndexes[2]]

    
print(buckets)

{57: 16, 42: 8, 37: 8, 25: 4, 12: 4, 7: 2, 4: 2, 3: 1, 0: 1}


### Error Bounds

Suppose that the exact count is $c$, and the DGIM estimate involves a leftmost bucket $b$ of size $2^j$. We consider 2 error cases separately, one where the estimate is larger than $c$ and one where the estimate is smaller than $c$.

1. The estimate is smaller than $c$. The worst case scenario is all the 1's in $b$ should have been part of the count, and using only $\frac{1}{2}$ of the size of $b$ only gives us $2^{j-1}$ 1's. However $c$ should be at least $2^{j+1}-1$. Thereforce we conclude that our estimate is at least 50% of $c$.

2. The estimate is larger than $c$. The worst case scenario is only the rightmost bit of bucket $b$ is included, and there is only one bucket of each of the sizes smaller than $b$. Then $c = 1+2^{j−1} +2^{j−2} +···+1=2^j$, and the estimate we get from DGIM is $2^{j−1} + 2^{j−1} + 2^{j−2} + · · · + 1 = 2^j + 2^{j−1} − 1$. Therefor we can conclude that the estimate is no more than 50% greater of $c$. 

We can improve the error bounds further by replacing the condition that there can only be 1 or 2 buckets for each size, with instead there being $r$ or $r-1$ buckets for each size with $r > 2$. 

## Most Common Recent Elements


Previous counting problems have assumed a sliding window that holds the tail-end of a stream for processing. However sometimes we want to consider all the elements in the stream from the start while also distinguising between really old elements and data that is more recent. For example, we want to find out what movies are popular currently. While "currently" is a relatively ambiguous term, we do know that we want to discount movies that were popular decades ago, like *The Godfather*, and emphasize more recent movies like *Avengers: Endgame*. Imagine that we have a stream of movies being watched coming in according to increasing timestamp. A simple way to get the get most common movies would be to calculate the count for each movie, possibly even with some of the techniques above (i.e., counting ones). However, this will only give us an approximation and won't satisfy our preference to discount older movies. One way to do this is through the technique of Decaying Windows.

### Decaying Windows

In decaying windows, we exponentially decay the count for each movie by a constant $c$. $c$ will have a really small value, like $10^{-6}$ or $10^{-9}$. The value for each element is decayed by multiplying the current count by $(1-c)$ at every time step. Given a stream $a_1, a_2,...,a_t$, the exponentially decaying window of this stream is 

\begin{equation}
\sum_{i=0}^{t-1}a_{t-i}(1-c)^i
\end{equation}

When a new element $a_{t+1}$ is added into the stream all we need to do is:

1. Multiply the current sum by $1-c$.
2. Add $a_{t+1}$.

In our movie example, we imagine a separate stream for each movie with a 1 each time that movie is watched, and a 0 each time another movie is watched. The decaying sum of the 1’s measures the current popularity of the movie. To save space, we can completely disregard unpopular movies by dropping movies whose count falls below a certain threshold, such as 0.5. When new data arrives at the stream we do the following:

1. For each movie whose score we are currently tracking, multiply the score by $(1-c)$.
2. If the data is for movie $M$, add 1 to $M$'s score if we are currently tracking it, or start tracking $M$ and initialize it with a score of 1 if we are not.
3. Drop any movies we are tracking if the score is less than our threshold of 0.5.

Due to the exponential decay, the sum of all scores is $\frac{1}{c}$. There cannot be more than $\frac{2}{c}$ movies with score of 0.5 or more, otherwise the sum of scores would be greater than $\frac{1}{c}$. Therefore $\frac{2}{c}$ is the maximum number of scores we would have to track.

Let's provide an example using the MovieLens 100k dataset, a list of movie ratings from September 20, 1997 to April 22, 1998. First, let's map movies to their release timestamp so we can easily sort them by when they were released.

In [35]:
movieStream = {}
movieNames = {}


for line in open('ml-100k/u.item', encoding = "ISO-8859-1"):
    tokens = line.strip().split('|')
    movieNames[tokens[0]] = tokens[1]
    #print(tokens[0], tokens[1])
    
for line in open('ml-100k/u.data'):
    tokens = line.strip().split()
    movie = tokens[1]
    ts = tokens[3]
    
    #handle duplicate timestamps
    while ts in movieStream:
        ts += 'a'
    
    movieStream[ts] = movie

Next we count movie rating with decaying windows. Play with the variable $c$ below by setting it to 0 to see which movies would be most popular by simple raw counts.

In [39]:
scores = {}

c = 0.0001

n = 0

for key in sorted(movieStream.keys()):
    movie = movieStream[key]
    
    remove = set()
    
    for key in scores:
        scores[key] *= (1-c)
        if scores[key] < 0.5 and key != movie:
            remove.add(key)

    
    for rem in remove:
        del scores[rem]
    
    if movie in scores:
        scores[movie] += 1
    else:
        scores[movie] = 1
        
    n += 1
    if n == 100000:
        break

sortedScores = {k: v for k, v in sorted(scores.items(), reverse=True, key=lambda item: item[1])}
count = 0
for movie in sortedScores:
    print(movieNames[movie], sortedScores[movie])
    count += 1
    if count == 10:
        break

Titanic (1997) 67.28333046382443
Contact (1997) 55.42960303711523
Air Force One (1997) 54.49994366275277
Star Wars (1977) 50.654564738716964
Full Monty, The (1997) 49.25542831929427
English Patient, The (1996) 46.814044430751856
Good Will Hunting (1997) 45.61329962426032
Liar Liar (1997) 44.01656947009622
Scream (1996) 43.58306822429063
Fargo (1996) 40.69582727523209


We find that Titanic is the most popular movie by our method, even though by raw count it wouldn't even hit the top 10. This is because Titanic was released late 1997 so the reviews for it would be more recent.

## References

* Jure Leskovec, Anand Rajaraman, and Jeffrey D Ullman. Mining of massive datasets, chapter 4. Cambridge University Press, 3rd edition, 2019. http://www.mmds.org/.

* Bloom, Burton H. "Space/ time trade-offs in hash coding with allowable errors ." Communications of the ACM 13.7 (1970): 422-426.

* Apache Kafka https://kafka.apache.org/

* MovieLens 100K http://grouplens.org/datasets/movielens/