# Practice Session 08: Data streams

In this session we will take a large corpus of documents and compute some statistics using data streams methods.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Author: <font color="blue">Judith Camacho </font>

E-mail: <font color="blue">judith.camacho01@estudiant.upf.edu </font>

Date: <font color="blue"> 29/11/2020 </font>

In [11]:
import io
import nltk
import gzip
import random
import statistics
import secrets
import re

In [15]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

# 0. Dataset and how to iterate

The input file contain documents in English presented at the European Parliament, collected into the [DCEP](https://ec.europa.eu/jrc/en/language-technologies/dcep) corpus. In this case, we have sampled only documents of type 'REPORT'. What we want is to compute the most frequent words in this dataset. 

During this practice, we will **never** load this file in main memory.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [12]:
# Name of input file
INPUT_FILE = "DCEP-reports-en.txt.gz"


The function `read_by_words` is a [generator](https://wiki.python.org/moin/Generators), that is, a function that behaves as an iterator. This is a common pattern used in stream processing, and in Python is implemented with the `yield` keyword, instead of `return`.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [16]:
# Producer in Python that reads a filename by words
def read_by_words(filename, max_words=-1, report_every=-1):
    
    # Open the input file
    with gzip.open(INPUT_FILE, "rt") as file:
        
        # Initialize counter of words to stop at max_words
        counter = 0
    
        # Regular expression to identify words having 4 letters or more and beginning with a-z
        word_expr = re.compile('^[a-z][a-z0-9]{4,}$', re.IGNORECASE)

        # Iterate through lines in the file
        for line in file:
            
            if counter > max_words and max_words != -1:
                break
                
            for word in nltk.word_tokenize(line):
                if word_expr.match(word):
                    counter += 1
                    
                    # Report
                    if (report_every != -1) and (counter % report_every == 0):
                        if max_words == -1:
                            print("- Read %d words so far" % (counter))
                        else:
                            print("- Read %d/%d words so far" % (counter, max_words))

                    # Produce the word
                    yield word

We will do a first pass over the data. Here we will read only the first 300K words. Try with a larger limit if your computer is fast, with a lower limit if your computer is slow. Find something that makes one pass take about 30 seconds and use it for development.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [17]:
# Iterate through the file
for word in read_by_words(INPUT_FILE, max_words=300000, report_every=100000):
    # Prints 0.005% of words
    if random.random() < 0.00005:
        print("Current word '%s'" % (word)) 

Current word 'improve'
Current word 'develop'
Current word 'which'
Current word 'adopt'
Current word 'PARLIAMENT'
Current word 'Technologies'
Current word 'avoid'
Current word 'Innovation'
- Read 100000/300000 words so far
Current word 'assessment'
Current word 'promote'
- Read 200000/300000 words so far
Current word 'Jeggle'
Current word 'geographic'
Current word 'follows'
Current word 'measures'
- Read 300000/300000 words so far


# 1. Determine approximately the top-5 words

Instead of loading the entire dataset in main memory, we will use reservoir sampling to determine approximately the top-5 words.

**Reservoir sampling**: In reservoir sampling, if we have a reservoir of size S:

* We store the first S elements of the stream
* When the n<sup>th</sup> element arrives (let's call it X<sub>n</sub>):
   * With probability 1 - s/n, we ignore this element.
   * With probability s/n, we:
      * Discard a random element from the reservoir
      * Add element X<sub>n</sub> to the reservoir (calling *add_to_reservoir*)
      
<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Implement a function `add_reservoir(reservoir, item, max_size)` that adds an item to the reservoir, maintaining its size. If the reservoir is already of size *max_size*, a random item is selected and evicted *before* adding the item. It is important to evict an old item *before* adding the new item. Use the following skeleton:

```python

def add_to_reservoir(reservoir, item, max_reservoir_size):
    # YOUR CODE HERE
    assert(len(reservoir) <= max_reservoir_size)
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [43]:
def add_to_reservoir(reservoir, item, max_reservoir_size):
    reservoir_size = len(reservoir)
    if (reservoir_size < max_reservoir_size):
        reservoir.append(item)
    else:
        pop_item = random.randint(0,max_reservoir_size-1)
        reservoir.pop(pop_item)
        reservoir.append(item)

    assert(len(reservoir) <= max_reservoir_size)

Create a function to iterate through the file using the reservoir sampling method seen in class. In this function you will decide, for every item, whether to call *add_to_reservoir* or to ignore the item.

You can use the following skeleton:

```python
def reservoir_sampling(filename, reservoir_size, max_words=-1, report_every=-1):
    reservoir = []
    
    words_read = 0
    
    for word in read_by_words(filename, max_words=max_words, report_every=report_every):
    
            # YOUR CODE HERE

    return (words_read, reservoir)
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [44]:
def reservoir_sampling(filename, reservoir_size, max_words=-1, report_every=-1):
    reservoir = []
 
    words_read = 0
    size = float(reservoir_size)
    for word in read_by_words(filename, max_words=max_words, report_every=report_every):
        words_read += 1
        prob_ignore = 1 - (size/(size + words_read*1.0))
        # if random number is > than probability to ignore, we should append it
        if random.random() > prob_ignore:
            # if random number is < than s/n, pop random item and the append
            if (1-prob_ignore) > random.random():
                add_to_reservoir(reservoir, word, reservoir_size)
 
 
    return (words_read, reservoir)

Test your function using the following code:

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [125]:
reservoir_size = 500
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=300000, report_every=100000)

print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Number of items seen    : 300006
Number of items sampled : 500


The reservoir contains repeated items. You can compute the absolute frequencies of the top 5 using the following code.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [126]:
INPUT_FILE

'DCEP-reports-en.txt.gz'

In [127]:
def absolute_freq(reservoir, top):
    freq = {}
    for item in reservoir:
        freq[item] = reservoir.count(item)

    most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:top]
    for absolute_frequency, word in most_frequent_items:
        print("%d %s" % (absolute_frequency, word))

In [128]:
absolute_freq(reservoir,10)

14 trade
11 Community
11 Commission
9 European
7 regard
7 having
6 which
6 rules
6 report
6 measures


Write code to compute the 5 most frequent items in the reservoir and their relative frequencies, as percentages.

If you see an item C times in the reservoir, you can estimate its absolute frequency as *C x dataset_size / reservoir_size*, i.e., the number of times it appears in the entire dataset (*dataset_size* is the total number of items read). 

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [129]:
def relative_freq(reservoir, items_seen, top):
    freq = {}
    for item in reservoir:
        freq[item] = (reservoir.count(item) * items_seen / len(reservoir)) / 100

    most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:top]
    for relative_frequency, word in most_frequent_items:
        print("%d%% %s" % (relative_frequency, word))

In [130]:
relative_freq(reservoir, items_seen,5)

84% trade
66% Community
66% Commission
54% European
42% regard


In [131]:
absolute_freq(reservoir,5)

14 trade
11 Community
11 Commission
9 European
7 regard


For various sizes of the reservoir, e.g., 50, 100, 500, ..., list the top-5 words and your estimate of their absolute and relative frequency in the entire dataset.
 
<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [132]:
# with max_words = 600000
reservoir_size = 500
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=600000, report_every=100000)

print("\n Relative frequencies:")
absolute_freq(reservoir,10)

print("\n Absolute frequencies:")
relative_freq(reservoir, items_seen,10)

- Read 100000/600000 words so far
- Read 200000/600000 words so far
- Read 300000/600000 words so far
- Read 400000/600000 words so far
- Read 500000/600000 words so far
- Read 600000/600000 words so far

 Relative frequencies:
17 Community
14 trade
11 Commission
10 Parliament
8 which
8 having
7 regard
6 measures
6 international
6 European

 Absolute frequencies:
204% Community
168% trade
132% Commission
120% Parliament
96% which
96% having
84% regard
72% measures
72% international
72% European


In [133]:
# with max_words = 1000000
reservoir_size = 500
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=1000000, report_every=100000)

print("\n Relative frequencies:")
absolute_freq(reservoir,10)

print("\n Absolute frequencies:")
relative_freq(reservoir, items_seen,10)

- Read 100000/1000000 words so far
- Read 200000/1000000 words so far
- Read 300000/1000000 words so far
- Read 400000/1000000 words so far
- Read 500000/1000000 words so far
- Read 600000/1000000 words so far
- Read 700000/1000000 words so far
- Read 800000/1000000 words so far
- Read 900000/1000000 words so far
- Read 1000000/1000000 words so far

 Relative frequencies:
15 Community
11 Commission
9 which
9 European
8 Parliament
7 trade
6 regard
6 measures
6 having
4 these

 Absolute frequencies:
334% Community
244% Commission
200% which
200% European
178% Parliament
155% trade
133% regard
133% measures
133% having
89% these


Find by trial and error, and include in your report, the minimum reservoir size you need to have somewhat stable results (e.g., the same top-3 words in two consecutive runs of the algorithm).

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [134]:
reservoir_size = 10000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=300000, report_every=100000)

print("\n Relative frequencies:")
absolute_freq(reservoir,3)

print("\n Absolute frequencies:")
relative_freq(reservoir, items_seen,3)

- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far

 Relative frequencies:
186 Commission
153 European
144 Parliament

 Absolute frequencies:
57% Commission
46% European
44% Parliament


In [135]:
reservoir_size = 10000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=300000, report_every=100000)

print("\n Relative frequencies:")
absolute_freq(reservoir,3)

print("\n Absolute frequencies:")
relative_freq(reservoir, items_seen,3)

- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far

 Relative frequencies:
187 Commission
163 European
147 Council

 Absolute frequencies:
58% Commission
50% European
45% Council


If I remove the max_words, the algorithm executes forever... This was the las number of words read when I stopped it: - Read 9700000 words so far.
For the reservoir_size, for instance, with size = 10000 (I have tried several values and this is the first one that gives me the same top-3 in 2 executions)I get the same top 3 words in two different executions.



# 2. Determine approximately the distinct number of words

We will estimate the number of distinct words without creating a dictionary or hash table, but instead, we will use the Flajolet-Martin probabilistic counting method.

**Flajolet-Martin probabilistic counting**:

* For several passes
   * Create hash funcion h
   * For every element *u* in the stream:
      * Compute hash value *h(u)*
      * Let *r(u)* be the number of trailing zeroes in *h(u)*
      * Maintain *R* as the maximum value of *r(u)* seen so far
   * Add *2<sup>R</sup>* as an estimate for the number of distinct elements *u* seen
* The final estimate is the average or the median of the estimates found in each pass

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

Use this function to count trailing zeroes in the binary representation of a number.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [136]:
def count_trailing_zeroes(number):
    count = 0
    while number & 1 == 0:
        count += 1
        number = number >> 1
    return count

Use this function to generate a random hash function. Note this generates a function, so you can do `hash_function = random_hash_function()` and then call `hash_function(x)` to compute the hash value of `x`.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [137]:
def random_hash_function():
    # We use a cryptographically safe generator for the salt of our hash function
    salt = secrets.token_bytes(32)
    return lambda string: hash(string + str(salt))

Perform *number_of_passes* passes over the file, reading the entire file on each pass (we don't use the reservoir in this part). In each pass, create a new hash function and use it to hash userids. Keep the maximum number of trailing zeroes seen in the hash value of a userid. 

```python
number_of_passes = 10
estimates = []

for i in range(number_of_passes):
    # YOUR_CODE_HERE: read the file and generate an estimate
    
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
```

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [138]:
number_of_passes = 10
estimates = []
reservoir_size = 500
for i in range(number_of_passes):
    R = 0
    items_read, stream = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=300000, report_every=100000)
    h = random_hash_function()
    for u in stream:
        hu = h(u)
        r = count_trailing_zeroes(hu)
        R = max(r,R)
    estimate = 2^R
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))

- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 1: 5 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 2: 11 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 3: 11 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 4: 14 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 5: 11 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 6: 10 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 7: 11 distinct words
- Read 100000/300000 

The next cell should print the median of the estimates.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [139]:
print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

* Average of estimates: 10.0
* Median  of estimates: 11.0


Now, double the number of passes (20 instead of 10) and re-run. Print the median of your estimates.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [140]:
number_of_passes = 20
estimates = []
reservoir_size = 500
for i in range(number_of_passes):
    R = 0
    items_read, stream = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=300000, report_every=100000)
    h = random_hash_function()
    for u in stream:
        hu = h(u)
        r = count_trailing_zeroes(hu)
        R = max(r,R)
    estimate = 2^R
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))

- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 1: 9 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 2: 9 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 3: 4 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 4: 10 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 5: 10 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 6: 9 distinct words
- Read 100000/300000 words so far
- Read 200000/300000 words so far
- Read 300000/300000 words so far
Estimate on pass 7: 11 distinct words
- Read 100000/300000 wor

In [143]:
print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

* Average of estimates: 8.1
* Median  of estimates: 9.0


Finally, remove the limit of max words that we used when reading the file, and run the algorithm for 10 passes.

If you notice the algorithm will take more than one hour to run, you can put back the limit of max words and increase it until the total running time of the 10 passes is about one hour. In other words, we are not asking you to run this for more than one hour.

<font size="-1" color="gray">(Remove this cell when delivering.)</font>

In [144]:
number_of_passes = 10
estimates = []
reservoir_size = 500
for i in range(number_of_passes):
    R = 0
    items_read, stream = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=100000000, report_every=100000)
    h = random_hash_function()
    for u in stream:
        hu = h(u)
        r = count_trailing_zeroes(hu)
        R = max(r,R)
    estimate = 2^R
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))

- Read 100000/100000000 words so far
- Read 200000/100000000 words so far
- Read 300000/100000000 words so far
- Read 400000/100000000 words so far
- Read 500000/100000000 words so far
- Read 600000/100000000 words so far
- Read 700000/100000000 words so far
- Read 800000/100000000 words so far
- Read 900000/100000000 words so far
- Read 1000000/100000000 words so far
- Read 1100000/100000000 words so far
- Read 1200000/100000000 words so far
- Read 1300000/100000000 words so far
- Read 1400000/100000000 words so far
- Read 1500000/100000000 words so far
- Read 1600000/100000000 words so far
- Read 1700000/100000000 words so far
- Read 1800000/100000000 words so far
- Read 1900000/100000000 words so far
- Read 2000000/100000000 words so far
- Read 2100000/100000000 words so far
- Read 2200000/100000000 words so far
- Read 2300000/100000000 words so far
- Read 2400000/100000000 words so far
- Read 2500000/100000000 words so far
- Read 2600000/100000000 words so far
- Read 2700000/100000

In [145]:
print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

* Average of estimates: 8.9
* Median  of estimates: 10.0


In this case, the median and the average don't seem to be a better estimator (they are very similar to the results using a smaller max_words), which proofs that our method works very well for both large and smaller number of words.


<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>