# Practice Session 08: Data streams

We will take a large corpus of documents and compute some statistics using data streams methods.

Author: Tània Pazos

E-mail: tania.pazos01@estudiant.upf.edu

Date: 20/11/24

In [5]:
import io
import nltk
import gzip
import random
import statistics
import secrets
import re
import gzip

# 0. Dataset and how to iterate

The input file contain lines of dialogue of a set of movies from the [Movie Dialog Corpus](https://www.kaggle.com/datasets/Cornell-University/movie-dialog-corpus). We will use the file `movie_lines.tsv` which contains the text of the dialogue, about 3 million words in about 300,000 lines of dialogue.

During this practice, we will never load this file in memory.

In [6]:
# Leave this code as-is

INPUT_FILE = "data/movie_lines.tsv.gz"

In [7]:
# Leave this code as-is

POS_NOUN = 'NN'
POS_VERB = 'VB'
POS_ADJECTIVE = 'JJ'

# Producer in Python that reads a file by words that are nouns
def read_by_parts_of_speech(filename, parts_of_speech, max_words=-1, report_every=-1):
    
    # Open the input file
    with gzip.open(INPUT_FILE, "rt", encoding='utf8') as file:
        
        # Initialize counter of words to stop at max_words
        counter = 0
    
        # Iterate through lines in the file
        for line in file:
            
            elements = line.split("\t")
            
            text = ""
            if len(elements) >= 5:
                text = elements[4].strip()
                                        
            if counter > max_words and max_words != -1:
                break
                
            for sentence in nltk.sent_tokenize(text):
                
                tagged = nltk.pos_tag(nltk.word_tokenize(sentence))
                for word in [part[0] for part in tagged if part[1] in parts_of_speech]:
                
                    counter += 1

                    # Report
                    if (report_every != -1) and (counter % report_every == 0):
                        if max_words == -1:
                            print("- Read %d words so far" % (counter))
                        else:
                            print("- Read %d/%d words so far" % (counter, max_words))

                    # Produce the word in lowercase
                    yield word.lower()

We will do a first pass over the data. Here we will read only the first 30K adjectives.

In [8]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/taniapazospuig/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/taniapazospuig/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [46]:
for word in read_by_parts_of_speech(INPUT_FILE, [POS_ADJECTIVE], max_words=30000, report_every=10000):
    # Prints 1/1000 of words
    if random.random() < 0.001:
        print("Current adjective '%s'" % (word)) 

Current noun 'amazin'
Current noun 'sick'
Current noun 'fine'
Current noun 'much'
Current noun 'mannish'
Current noun 'second'
Current noun 'pro'
Current noun 'dramatic'
Current noun 'bad'
Current noun 'sick'
Current noun 'real'
- Read 10000/30000 words so far
Current noun 'good'
Current noun 'own'
Current noun 'long'
Current noun 'fucking'
Current noun 'ta'
Current noun 'little'
Current noun 'sixteen'
Current noun 'american'
Current noun 'full'
Current noun 'current'
Current noun 'nice'
- Read 20000/30000 words so far
Current noun 'new'
Current noun 'serious'
Current noun 'good'
Current noun 'certain'
Current noun 'sorry'
Current noun 'evil'
Current noun 'double-'
Current noun 'ready'
Current noun 'pretty'
Current noun 'little'
Current noun 'many'
Current noun 'little'
Current noun 'worth'
Current noun 'uh'
Current noun 'interesting'
Current noun 'american'
Current noun 'short'
Current noun 'fucking'
Current noun 'cockamamie'
Current noun 'last'
Current noun 'chief'
Current noun 'sout

# 1. Determine approximately the top-10 words

Instead of loading the entire dataset in main memory, we will use reservoir sampling to determine approximately the top-10 words.

The following function uses reservoir sampling to maintain a random sample of fixed size S from a stream of data where the total number of items N is unknown or very large. 

In [15]:
def add_to_reservoir(reservoir, item, max_reservoir_size):
    # If the reservoir is full, evict an old random item first
    if len(reservoir) >= max_reservoir_size:
        index_to_replace = random.randint(0, len(reservoir) - 1)
        reservoir[index_to_replace] = item
    else:
        # If the reservoir is not full, just add the new item
        reservoir.append(item)

The function `reservoir_sampling` iterates through a file using the reservoir sampling method.

In [27]:
def reservoir_sampling(filename, parts_of_speech, reservoir_size, max_words=-1, report_every=-1):
    reservoir = []
    words_read = 0
    
    # Iterate through words from the file
    for word in read_by_parts_of_speech(filename, parts_of_speech, max_words=max_words, report_every=report_every):
        if max_words != -1 and words_read >= max_words:
            break  # Stop if we have read more words than max_words
            
        words_read += 1
        
        if len(reservoir) < reservoir_size:
            # Reservoir is not full, just add the word
            add_to_reservoir(reservoir, word, reservoir_size)
        else:
            # Reservoir is full, decide whether to evict a word
            # With probability reservoir_size /words_read, add the word to the reservoir
            if random.random() < reservoir_size / words_read:
                add_to_reservoir(reservoir, word, reservoir_size)
                
         
        # Report progress every report_every words
        if report_every != -1 and words_read % report_every == 0:
            if max_words == -1:
                print(f"- Read {words_read} words so far")
            else:
                print(f"- Read {words_read}/{max_words} words so far")
            
    return words_read, reservoir

In [28]:
# Leave this code as-is

reservoir_size = 1500
(items_seen, reservoir) = reservoir_sampling(INPUT_FILE, [POS_ADJECTIVE], reservoir_size, max_words=30000, report_every=10000)

print("Number of items seen    : %d" % items_seen)
print("Number of items sampled : %d" % len(reservoir) )

- Read 10000/30000 words so far
- Read 10000/30000 words so far
- Read 20000/30000 words so far
- Read 20000/30000 words so far
- Read 30000/30000 words so far
- Read 30000/30000 words so far
Number of items seen    : 30000
Number of items sampled : 1500


The reservoir contains repeated items. We compute the absolute frequencies of the top 20 words with the code below.

In [30]:
# Leave this code as-is

freq = {}
for item in reservoir:
    freq[item] = reservoir.count(item)

most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:20]
for absolute_frequency, word in most_frequent_items:
    print("%d %s" % (absolute_frequency, word))

61 good
38 little
31 other
29 much
29 last
29 big
22 sorry
22 right
22 first
17 only
17 dead
17 bad
16 young
15 wrong
15 same
15 real
15 next
14 whole
14 sure
14 nice


Now, we compute the relative frequencies of the top 20 words.

In [36]:
total_items = len(reservoir)

for absolute_frequency, word in most_frequent_items:
    relative_frequency = (absolute_frequency / total_items) * 100  # Convert to percentage
    print(f"{relative_frequency:.2f}% {word}")

4.07% good
2.53% little
2.07% other
1.93% much
1.93% last
1.93% big
1.47% sorry
1.47% right
1.47% first
1.13% only
1.13% dead
1.13% bad
1.07% young
1.00% wrong
1.00% same
1.00% real
1.00% next
0.93% whole
0.93% sure
0.93% nice


If we see an item C times in the reservoir, we can estimate the item appears *C x dataset_size / reservoir_size* times in the entire dataset (*dataset_size* is the size of the entire dataset). 

The code below lists the top-5 words and the estimate of their relative and aboslute frequency in the entire dataset for various sizes of the reservoir: 50, 100, 500, 1000, 1500, and 2000.

In [48]:
def estimate_frequency_in_dataset(item_count_in_reservoir, reservoir_size, dataset_size):
    return item_count_in_reservoir * dataset_size / reservoir_size

In [51]:
# Range of reservoir sizes to test
reservoir_sizes = [50, 100, 500, 1000, 1500, 2000]
max_words = 200000
dataset_size = 3000000 # Assume 3 million words in the dataset

for reservoir_size in reservoir_sizes:
    # Run reservoir sampling with the given reservoir size
    print(f"\nTesting reservoir size: {reservoir_size}")
    
    items_seen, reservoir = reservoir_sampling(INPUT_FILE, [POS_ADJECTIVE], reservoir_size, max_words=max_words)
    
    # Compute absolute frequencies in the reservoir
    freq = {}
    for item in reservoir:
        freq[item] = reservoir.count(item)

    # Sort items by frequency in descending order and get the top 5
    most_frequent_items = sorted([(frequency, word) for word, frequency in freq.items()], reverse=True)[:5]
    
    # Print the estimated frequencies for the top-5 words in the entire dataset
    for absolute_frequency, word in most_frequent_items:
        estimated_frequency = estimate_frequency_in_dataset(absolute_frequency, reservoir_size, dataset_size)
        relative_frequency = (estimated_frequency / dataset_size) * 100  # Convert to percentage
        
        # Print only the estimates for the entire dataset
        print(f"Relative Frequency in dataset: {relative_frequency:.2f}% | Absolute Frequency in dataset: {estimated_frequency:.0f} | Word: {word}")


Testing reservoir size: 50
Relative Frequency in dataset: 10.00% | Absolute Frequency in dataset: 300000 | Word: good
Relative Frequency in dataset: 6.00% | Absolute Frequency in dataset: 180000 | Word: true
Relative Frequency in dataset: 4.00% | Absolute Frequency in dataset: 120000 | Word: old
Relative Frequency in dataset: 4.00% | Absolute Frequency in dataset: 120000 | Word: little
Relative Frequency in dataset: 4.00% | Absolute Frequency in dataset: 120000 | Word: late

Testing reservoir size: 100
Relative Frequency in dataset: 3.00% | Absolute Frequency in dataset: 90000 | Word: u
Relative Frequency in dataset: 3.00% | Absolute Frequency in dataset: 90000 | Word: sure
Relative Frequency in dataset: 3.00% | Absolute Frequency in dataset: 90000 | Word: only
Relative Frequency in dataset: 3.00% | Absolute Frequency in dataset: 90000 | Word: good
Relative Frequency in dataset: 3.00% | Absolute Frequency in dataset: 90000 | Word: few

Testing reservoir size: 500
Relative Frequency in

After testing several reservoir sizes, it can be seen that larger reservoir sizes show more stable estimates of word frequencies for the entire dataset. Indeed, smaller reservoir sizes (50 and 100) significantly vary in the top-5 words and their frequencies across runs. However, larger reservoirs (500, 1000, 1500 and 2000) yield more consistent results, with several words appearing consistently in the top-5 list across multiple runs. 

Based on this analysis, a reservoir size of 1500 is recommended, as it strikes a good balance between computational efficiency and result stability. The variability in the top-5 words and their frequency estimates across multiple runs for a reservoir size of 1500 is minimal, making it a reliable choice.

# 2. Determine approximately the distinct number of words

We will estimate the number of distinct words without creating a dictionary or hash table, but instead, we will use the Flajolet-Martin probabilistic counting method. The Flajolet-Martin probabilistic counting method works as follows:

* For several passes
   * Create hash funcion h
   * For every element *u* in the stream:
      * Compute hash value *h(u)*
      * Let *r(u)* be the number of trailing zeroes in *h(u)*
      * Maintain *R* as the maximum value of *r(u)* seen so far
   * Add *2<sup>R</sup>* as an estimate for the number of distinct elements *u* seen
* The final estimate is the average or the median of the estimates found in each pass

The function below counts the number of trailing zeroes in the binary representation of a number.

In [52]:
# Leave this code as-is

def count_trailing_zeroes(number):
    count = 0
    while number & 1 == 0:
        count += 1
        number = number >> 1
    return count

We will use this function to generate a random hash function.

In [53]:
# Leave this code as-is

def random_hash_function():
    # We use a cryptographically safe generator for the salt of our hash function
    salt = secrets.token_bytes(32)
    return lambda string: hash(string + str(salt))

Now, we are ready to perform `number_of_passes` passes over the entire file and compute the estimate for the number of distinct words on each pass. Note that we will count nouns, verbs, and adjectives. We will set the limit of words read to 100000 to control computing time.

In [56]:
number_of_passes = 10
estimates = []

for i in range(number_of_passes):
    # Generate a new hash function for this pass
    hash_function = random_hash_function()
    
    # Maximum number of trailing zeroes observed in this pass
    max_trailing_zeroes = 0
    
    # Read words by parts of speech
    for word in read_by_parts_of_speech(INPUT_FILE, [POS_NOUN, POS_VERB, POS_ADJECTIVE], max_words=100000):
        hashed_value = hash_function(word)  # Compute hash value of the word
        trailing_zeroes = count_trailing_zeroes(hashed_value)  # Count trailing zeroes
        max_trailing_zeroes = max(max_trailing_zeroes, trailing_zeroes)  # Update max
    
    # Calculate the estimate for this pass
    estimate = 2 ** max_trailing_zeroes
    estimates.append(estimate)
    print(f"Estimate on pass {i + 1}: {estimate} distinct words")

Estimate on pass 1: 8192 distinct words
Estimate on pass 2: 524288 distinct words
Estimate on pass 3: 4096 distinct words
Estimate on pass 4: 2048 distinct words
Estimate on pass 5: 2048 distinct words
Estimate on pass 6: 2048 distinct words
Estimate on pass 7: 32768 distinct words
Estimate on pass 8: 8192 distinct words
Estimate on pass 9: 8192 distinct words
Estimate on pass 10: 4096 distinct words


In [57]:
# Leave this code as-is

print("* Average of estimates: %.1f" % statistics.mean(estimates))
print("* Median  of estimates: %.1f" % statistics.median(estimates))

* Average of estimates: 59596.8
* Median  of estimates: 6144.0


Finally, we will repeat this process for nouns, adjectives, and verbs separately. We will perform 3 separate runs of 10 passes each. For each pass, we will compute the noun, verb, and adjective estimate for the number of distinct words. After each run, we will compute the average and the median of the estimates.

This is done in the code below; it is commented because it takes a few minutes to run. The obtained results are copied in a markdown cell.

In [62]:
'''
number_of_runs = 3
number_of_passes = 10

for run in range(number_of_runs):
   
    estimates_noun_run = []
    estimates_verb_run = []
    estimates_adjective_run = []

    print(f"\n=== Run {run + 1} ===")
    
    for i in range(number_of_passes):
        # Initialize the maximum trailing zeroes
        max_trailing_zeroes_noun = 0
        max_trailing_zeroes_verb = 0
        max_trailing_zeroes_adjective = 0

        # Create a new random hash function for each pass
        hash_function = random_hash_function()

        # Read nouns
        for word in read_by_parts_of_speech(INPUT_FILE, [POS_NOUN], max_words=100000):
            hashed_value = hash_function(word)
            trailing_zeroes = count_trailing_zeroes(hashed_value)
            max_trailing_zeroes_noun = max(max_trailing_zeroes_noun, trailing_zeroes)

        # Read verbs
        for word in read_by_parts_of_speech(INPUT_FILE, [POS_VERB], max_words=100000):
            hashed_value = hash_function(word)
            trailing_zeroes = count_trailing_zeroes(hashed_value)
            max_trailing_zeroes_verb = max(max_trailing_zeroes_verb, trailing_zeroes)

        # Read adjectives
        for word in read_by_parts_of_speech(INPUT_FILE, [POS_ADJECTIVE], max_words=100000):
            hashed_value = hash_function(word)
            trailing_zeroes = count_trailing_zeroes(hashed_value)
            max_trailing_zeroes_adjective = max(max_trailing_zeroes_adjective, trailing_zeroes)

        # Compute estimates for this pass
        estimate_noun = 2 ** max_trailing_zeroes_noun
        estimate_verb = 2 ** max_trailing_zeroes_verb
        estimate_adjective = 2 ** max_trailing_zeroes_adjective

        # Append estimates to the lists
        estimates_noun_run.append(estimate_noun)
        estimates_verb_run.append(estimate_verb)
        estimates_adjective_run.append(estimate_adjective)

        
        print(f"Pass {i + 1}:")
        print(f"  Noun estimate: {estimate_noun}")
        print(f"  Verb estimate: {estimate_verb}")
        print(f"  Adjective estimate: {estimate_adjective}")

    # Compute the average and median for this run
    avg_noun = statistics.mean(estimates_noun_run)
    med_noun = statistics.median(estimates_noun_run)

    avg_verb = statistics.mean(estimates_verb_run)
    med_verb = statistics.median(estimates_verb_run)

    avg_adjective = statistics.mean(estimates_adjective_run)
    med_adjective = statistics.median(estimates_adjective_run)

   
    print(f"\nRun {run + 1} summary:")
    print(f"  * Noun estimates: Average = {avg_noun:.1f}, Median = {med_noun:.1f}")
    print(f"  * Verb estimates: Average = {avg_verb:.1f}, Median = {med_verb:.1f}")
    print(f"  * Adjective estimates: Average = {avg_adjective:.1f}, Median = {med_adjective:.1f}")
'''


=== Run 1 ===
Pass 1:
  Noun estimate: 32768
  Verb estimate: 16384
  Adjective estimate: 32768
Pass 2:
  Noun estimate: 1048576
  Verb estimate: 8192
  Adjective estimate: 262144
Pass 3:
  Noun estimate: 65536
  Verb estimate: 1024
  Adjective estimate: 16384
Pass 4:
  Noun estimate: 2048
  Verb estimate: 2048
  Adjective estimate: 16384
Pass 5:
  Noun estimate: 32768
  Verb estimate: 8192
  Adjective estimate: 65536
Pass 6:
  Noun estimate: 4096
  Verb estimate: 16384
  Adjective estimate: 16384
Pass 7:
  Noun estimate: 8192
  Verb estimate: 32768
  Adjective estimate: 8192
Pass 8:
  Noun estimate: 8192
  Verb estimate: 2048
  Adjective estimate: 131072
Pass 9:
  Noun estimate: 8192
  Verb estimate: 8192
  Adjective estimate: 32768
Pass 10:
  Noun estimate: 8192
  Verb estimate: 1024
  Adjective estimate: 2048

Run 1 summary:
  * Noun estimates: Average = 121856.0, Median = 8192.0
  * Verb estimates: Average = 9625.6, Median = 8192.0
  * Adjective estimates: Average = 58368.0, Media

=== Run 1 ===

Pass 1:
  Noun estimate: 32768
  Verb estimate: 16384
  Adjective estimate: 32768
  
Pass 2:
  Noun estimate: 1048576
  Verb estimate: 8192
  Adjective estimate: 262144
  
Pass 3:
  Noun estimate: 65536
  Verb estimate: 1024
  Adjective estimate: 16384
  
Pass 4:
  Noun estimate: 2048
  Verb estimate: 2048
  Adjective estimate: 16384
  
Pass 5:
  Noun estimate: 32768
  Verb estimate: 8192
  Adjective estimate: 65536
  
Pass 6:
  Noun estimate: 4096
  Verb estimate: 16384
  Adjective estimate: 16384
  
Pass 7:
  Noun estimate: 8192
  Verb estimate: 32768
  Adjective estimate: 8192
  
Pass 8:
  Noun estimate: 8192
  Verb estimate: 2048
  Adjective estimate: 131072
  
Pass 9:
  Noun estimate: 8192
  Verb estimate: 8192
  Adjective estimate: 32768
  
Pass 10:
  Noun estimate: 8192
  Verb estimate: 1024
  Adjective estimate: 2048

Run 1 summary:
  * Noun estimates: Average = 121856.0, Median = 8192.0
  * Verb estimates: Average = 9625.6, Median = 8192.0
  * Adjective estimates: Average = 58368.0, Median = 24576.0

=== Run 2 ===

Pass 1:
  Noun estimate: 4096
  Verb estimate: 65536
  Adjective estimate: 2048
  
Pass 2:
  Noun estimate: 2048
  Verb estimate: 2048
  Adjective estimate: 2048
  
Pass 3:
  Noun estimate: 8192
  Verb estimate: 4096
  Adjective estimate: 8192
  
Pass 4:
  Noun estimate: 16384
  Verb estimate: 16384
  Adjective estimate: 32768
  
Pass 5:
  Noun estimate: 16384
  Verb estimate: 16384
  Adjective estimate: 65536
  
Pass 6:
  Noun estimate: 32768
  Verb estimate: 16384
  Adjective estimate: 32768
  
Pass 7:
  Noun estimate: 32768
  Verb estimate: 4096
  Adjective estimate: 8192
  
Pass 8:
  Noun estimate: 32768
  Verb estimate: 4096
  Adjective estimate: 8192
  
Pass 9:
  Noun estimate: 4096
  Verb estimate: 4096
  Adjective estimate: 65536
  
Pass 10:
  Noun estimate: 4096
  Verb estimate: 512
  Adjective estimate: 16384

Run 2 summary:
  * Noun estimates: Average = 15360.0, Median = 12288.0
  * Verb estimates: Average = 13363.2, Median = 4096.0
  * Adjective estimates: Average = 24166.4, Median = 12288.0

=== Run 3 ===

Pass 1:
  Noun estimate: 8192
  Verb estimate: 8192
  Adjective estimate: 4096
  
Pass 2:
  Noun estimate: 8192
  Verb estimate: 1024
  Adjective estimate: 2048
  
Pass 3:
  Noun estimate: 4096
  Verb estimate: 131072
  Adjective estimate: 4096
  
Pass 4:
  Noun estimate: 8192
  Verb estimate: 8192
  Adjective estimate: 4096
  
Pass 5:
  Noun estimate: 8192
  Verb estimate: 4096
  Adjective estimate: 4096
  
Pass 6:
  Noun estimate: 2048
  Verb estimate: 8192
  Adjective estimate: 4096
  
Pass 7:
  Noun estimate: 8192
  Verb estimate: 65536
  Adjective estimate: 1024
  
Pass 8:
  Noun estimate: 4096
  Verb estimate: 4096
  Adjective estimate: 8192
  
Pass 9:
  Noun estimate: 4096
  Verb estimate: 4096
  Adjective estimate: 16384
  
Pass 10:
  Noun estimate: 4096
  Verb estimate: 4096
  Adjective estimate: 2048

Run 3 summary:
  * Noun estimates: Average = 5939.2, Median = 6144.0
  * Verb estimates: Average = 23859.2, Median = 6144.0
  * Adjective estimates: Average = 5017.6, Median = 4096.0

From the results of the three runs, it can be seen that averages are spread out significantly. For instance, if we focus on the noun estimates, the averages range from 15360 in the second run to 121856 in the first run. On the other hand, the median values are more stable across runs. Again in the case of nouns, the median of estimates go from 6144 in the third run to 12288 in the second run. For verb and adjective estimates, the same behaviour is observed. 

All in all, given that probabilistic counting methods can sometimes produce skewed distributions with extreme values, the median is a more appropriate measure than the average. This can be explained becuse the median is a more robust measure, i.e., it is less affected by outliers or extreme values. 

<font size="+2" color="#003300">I hereby declare that, except for the code provided by the course instructors, all of my code, report, and figures were produced by myself.</font>