# Practice Session 08: Data streams


Author: <font color="blue">Rubén Vera Martínez</font>

E-mail: <font color="blue">ruben.vera01@estudiant.upf.edu</font>

Date: <font color="blue">29/11/2022</font>

In [1]:
import io
import nltk
import gzip
import random
import statistics
import secrets
import re

# 0. Dataset and how to iterate

In [2]:
# Leave this code as-is

INPUT_FILE = "movie_lines.tsv.gz"

In [3]:
# Leave this code as-is

# Producer in Python that reads a filename by words
def read_by_words(filename, max_words=-1, report_every=-1):
    
    # Open the input file
    with gzip.open(INPUT_FILE, "rt", encoding='utf8') as file:
        
        # Initialize counter of words to stop at max_words
        counter = 0
    
        # Regular expression to identify words having 3 letters or more and beginning with a-z
        word_expr = re.compile('^[a-z]{2,}$', re.IGNORECASE)

        # Iterate through lines in the file
        for line in file:
            
            elements = line.split("\t")
            
            text = ""
            if len(elements) >= 5:
                text = elements[4].strip()
                                        
            if counter > max_words and max_words != -1:
                break
                
            for word in nltk.word_tokenize(text):
                          
                if word_expr.match(word):
                    counter += 1
                    
                    # Report
                    if (report_every != -1) and (counter % report_every == 0):
                        if max_words == -1:
                            print("- Read %d words so far" % (counter))
                        else:
                            print("- Read %d/%d words so far" % (counter, max_words))

                    # Produce the word in lowercase
                    yield word.lower()

In [4]:
#nltk.download('punkt')

In [5]:
# Leave this code as-is

# Iterate through the file
for word in read_by_words(INPUT_FILE, max_words=300000, report_every=100000):
    # Prints 1/10000 of words
    if random.random() < 0.0001:
        print("Current word '%s'" % (word)) 

Current word 'of'
Current word 'you'
Current word 'do'
Current word 'and'
Current word 'did'
Current word 'and'
Current word 'man'
Current word 'me'
Current word 'me'
- Read 100000/300000 words so far
Current word 'you'
Current word 'who'
Current word 'something'
Current word 'ca'
Current word 'your'
Current word 'like'
- Read 200000/300000 words so far
Current word 'got'
Current word 'shit'
Current word 'on'
Current word 'databases'
Current word 'big'
Current word 'sort'
Current word 'me'
- Read 300000/300000 words so far


# 1. Determine approximately the top-5 words

In [6]:
def add_to_reservoir(reservoir, item, max_reservoir_size):
    #If reservoir is not full, we just add the item
    if(len(reservoir) < reservoir_size):    
        reservoir.append(item)
    #If it's full, we discard randomly one item and put there the new item.
    else:
        reservoir[random.randint(0,max_reservoir_size-1)] = item

    assert(len(reservoir) <= max_reservoir_size)

In [7]:
def reservoir_sampling(filename, reservoir_size, max_words=-1, report_every=-1):
    reservoir = []

    words_read = 0
    
    for word in read_by_words(filename, max_words=max_words, report_every=report_every):
        words_read+=1
        #If it's not full, call the function
        if(len(reservoir) < reservoir_size):
            add_to_reservoir(reservoir, word, reservoir_size)
        else:
        #If it's full, with probability 1-s/n we ignore the item
            rand_value = random.random()
            if rand_value < 1-reservoir_size/words_read:
                continue
            #with probability s/n we add the item discarding another item.
            else:
                add_to_reservoir(reservoir, word, reservoir_size)
        
    return (words_read, reservoir)

In [8]:
# Leave this code as-is

reservoir_size = 1000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=1000000, report_every=100000)

print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

- Read 100000/1000000 words so far
- Read 200000/1000000 words so far
- Read 300000/1000000 words so far
- Read 400000/1000000 words so far
- Read 500000/1000000 words so far
- Read 600000/1000000 words so far
- Read 700000/1000000 words so far
- Read 800000/1000000 words so far
- Read 900000/1000000 words so far
- Read 1000000/1000000 words so far
Number of items seen    : 1000028
Number of items sampled : 1000


In [9]:
# Leave this code as-is

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)

most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:10]
for absolute_frequency, word in most_frequent_items:
    print("%d %s" % (absolute_frequency, word))

39 you
35 to
25 the
19 it
16 and
15 of
14 that
14 just
13 with
13 do


In [10]:
freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)

most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:10]
for absolute_frequency, word in most_frequent_items:
    #Relative frequency = absolute/size-->in percentage: *100
    print((absolute_frequency/len(reservoir)*100), "%", word)

3.9 % you
3.5000000000000004 % to
2.5 % the
1.9 % it
1.6 % and
1.5 % of
1.4000000000000001 % that
1.4000000000000001 % just
1.3 % with
1.3 % do


In [11]:
#We do the reservoir sampling with size 100 and 3000 and print the absolute and relative frequency of top 5 words of each.
reservoir_size = 100
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=1000000, report_every=100000)
print("Reservoir size: 3000\n ")
print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)
    
most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:5]
for absolute_frequency, word in most_frequent_items:
    print("Absolute freq: %d, Relative freq: %f, word: %s" % (absolute_frequency, absolute_frequency/len(freq), word))
    

    
reservoir_size = 3000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=1000000, report_every=100000)
print("Reservoir size: 100\n ")
print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)
    
most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:5]
for absolute_frequency, word in most_frequent_items:
    print("Absolute freq: %d, Relative freq: %f, word: %s" % (absolute_frequency, absolute_frequency/len(freq), word))
    
    

- Read 100000/1000000 words so far
- Read 200000/1000000 words so far
- Read 300000/1000000 words so far
- Read 400000/1000000 words so far
- Read 500000/1000000 words so far
- Read 600000/1000000 words so far
- Read 700000/1000000 words so far
- Read 800000/1000000 words so far
- Read 900000/1000000 words so far
- Read 1000000/1000000 words so far
Reservoir size: 3000
 
Number of items seen    : 1000028
Number of items sampled : 100
Absolute freq: 4, Relative freq: 0.048780, word: you
Absolute freq: 3, Relative freq: 0.036585, word: we
Absolute freq: 3, Relative freq: 0.036585, word: that
Absolute freq: 3, Relative freq: 0.036585, word: of
Absolute freq: 2, Relative freq: 0.024390, word: what
- Read 100000/1000000 words so far
- Read 200000/1000000 words so far
- Read 300000/1000000 words so far
- Read 400000/1000000 words so far
- Read 500000/1000000 words so far
- Read 600000/1000000 words so far
- Read 700000/1000000 words so far
- Read 800000/1000000 words so far
- Read 900000/100

In [15]:
#We do the same as before but with reservoir_size = 1000, 1500 and 3000.
reservoir_size = 1000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=-1, report_every=-1)
print("Reservoir size: ", reservoir_size, "\n ")
print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)
    
most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:3]
for absolute_frequency, word in most_frequent_items:
    print("Absolute freq: %d, Relative freq: %f, word: %s" % (absolute_frequency, absolute_frequency/len(freq), word))
    

reservoir_size = 1500
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=-1, report_every=-1)
print("Reservoir size: ", reservoir_size, "\n ")
print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)
    
most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:3]
for absolute_frequency, word in most_frequent_items:
    print("Absolute freq: %d, Relative freq: %f, word: %s" % (absolute_frequency, absolute_frequency/len(freq), word))


reservoir_size = 5000
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, reservoir_size, max_words=-1, report_every=-1)
print("Reservoir size: ", reservoir_size, "\n ")
print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)
    
most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:3]
for absolute_frequency, word in most_frequent_items:
    print("Absolute freq: %d, Relative freq: %f, word: %s" % (absolute_frequency, absolute_frequency/len(freq), word))

Reservoir size:  1000 
 
Number of items seen    : 2944884
Number of items sampled : 1000
Absolute freq: 58, Relative freq: 0.123932, word: you
Absolute freq: 28, Relative freq: 0.059829, word: to
Absolute freq: 26, Relative freq: 0.055556, word: the
Reservoir size:  1500 
 
Number of items seen    : 2944884
Number of items sampled : 1500
Absolute freq: 69, Relative freq: 0.112195, word: you
Absolute freq: 61, Relative freq: 0.099187, word: the
Absolute freq: 34, Relative freq: 0.055285, word: do
Reservoir size:  5000 
 
Number of items seen    : 2944884
Number of items sampled : 5000
Absolute freq: 232, Relative freq: 0.164190, word: you
Absolute freq: 157, Relative freq: 0.111111, word: the
Absolute freq: 136, Relative freq: 0.096249, word: to


As we can see in the above output, with reservoir_size = 1000 we obtain different top3 words than for reservoir_size = 1500 but we obtain the same than for reservoir_size = 5000, so that would be the minimum size of reservoir. In the case of 1000, I founded some execution that founded the same 3 words, but it was not as consistent as it was on 1500 reservoir size. So, for this reasons I'd use reservoir_size = 1500.

# 2. Determine approximately the distinct number of words

In [13]:
# Leave this code as-is

def count_trailing_zeroes(number):
    count = 0
    while number & 1 == 0:
        count += 1
        number = number >> 1
    return count

In [14]:
# Leave this code as-is

def random_hash_function():
    # We use a cryptographically safe generator for the salt of our hash function
    salt = secrets.token_bytes(32)
    return lambda string: hash(string + str(salt))

In [15]:
number_of_passes = 10
estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        #For each word we compute a hush value of the random hash function and then append to an array, the number of trailing zeroes
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    #Then, do 2^R to estimate the number of distinct words
    estimate = pow(2,(max(R)))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))

Estimate on pass 1: 131072 distinct words
Estimate on pass 2: 65536 distinct words
Estimate on pass 3: 32768 distinct words
Estimate on pass 4: 16384 distinct words
Estimate on pass 5: 32768 distinct words
Estimate on pass 6: 131072 distinct words
Estimate on pass 7: 16384 distinct words
Estimate on pass 8: 8192 distinct words
Estimate on pass 9: 65536 distinct words
Estimate on pass 10: 16384 distinct words


In [16]:
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

* Average of estimates: 51609.6
* Median  of estimates: 32768.0


In [17]:
print("NUMBER OF PASSES = 10:\n")
number_of_passes = 10
estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

print("NUMBER OF PASSES = 20:\n")
number_of_passes = 20
estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=1000000, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

print("UNLIMITED MAX WORDS:\n")

number_of_passes = 10
estimates = []

for i in range(number_of_passes):
    hash_function = random_hash_function()
    R = []
    for word in read_by_words(INPUT_FILE, max_words=-1, report_every=-1):
        hash_value = hash_function(word)
        R.append(count_trailing_zeroes(hash_value))
    estimate = 2^(max(R))
    estimates.append(estimate)
    print("Estimate on pass %d: %d distinct words" % (i+1, estimate))
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

NUMBER OF PASSES = 10:

Estimate on pass 1: 16 distinct words
Estimate on pass 2: 13 distinct words
Estimate on pass 3: 14 distinct words
Estimate on pass 4: 22 distinct words
Estimate on pass 5: 13 distinct words
Estimate on pass 6: 19 distinct words
Estimate on pass 7: 18 distinct words
Estimate on pass 8: 15 distinct words
Estimate on pass 9: 12 distinct words
Estimate on pass 10: 13 distinct words
* Average of estimates: 15.5
* Median  of estimates: 14.5
Estimate on pass 1: 12 distinct words
Estimate on pass 2: 18 distinct words
Estimate on pass 3: 13 distinct words
Estimate on pass 4: 15 distinct words
Estimate on pass 5: 13 distinct words
Estimate on pass 6: 12 distinct words
Estimate on pass 7: 16 distinct words
Estimate on pass 8: 19 distinct words
Estimate on pass 9: 14 distinct words
Estimate on pass 10: 12 distinct words
* Average of estimates: 14.4
* Median  of estimates: 13.5
Estimate on pass 1: 13 distinct words
Estimate on pass 2: 13 distinct words
Estimate on pass 3: 15

<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>