# Introduction into Data Science - Assignment Part II

This is the second part of the assignment in IDS 2023/2024.

This part of the assignment consists of five questions — each of these questions is contained in a separate Jupyter notebook:
- [Question 1: Data Preprocessing](Q1_Preprocessing_Visualization.ipynb)
- [Question 2: Association Rules](Q2_Frequent_Itemsets_Association_Rules.ipynb)
- [Question 3: Process Mining](Q3_Process_Mining.ipynb)
- [Question 4: Text Mining](Q4_Text_Mining.ipynb)
- [Question 5: Big Data](Q5_Big_Data.ipynb)

Additional required files are in two folders.
- [datasets](datasets/)
- [scripts](scripts/)

Please use the provided notebook to work on the questions. When you are done, upload your version of each of the notebooks to Moodle. Your submission will, therefore, consist of five jupyter notebook and _no_ additional file. Any additionally provided files will not be considered in grading.
Enter your commented Python code and answers in the corresponding cells. Make sure to answer all questions in a clear and explicit manner and discuss your outputs. _Please do not change the general structure of this notebook_. You can, however, add additional markdown or code cells if necessary. Please **DO NOT CLEAR THE OUTPUT** of the notebook you are submitting! Additionally, please ensure that the code in the notebook runs if placed in the same folder as all of the provided files, delivering the same outputs as the ones you submit in the notebook. This includes being runnable in the bundled conda environment.

*Please make sure to include the names and matriculation numbers of all group members in the provided slots in each of the notebooks.* If a name or a student id is missing, the student will not receive any points.

Hint 1: **Plan your time wisely.** A few parts of this assignment may take some time to run. It might be necessary to consider time management when you plan your group work. Also, do not attempt to upload your assignment at the last minute before the deadline. This often does not work, and you will miss the deadline. Late submissions will not be considered.

Hint 2: RWTHMoodle allows multiple submissions, with every new submission overwriting the previous one. **Partial submissions are possible and encouraged.** This might be helpful in case of technical issues with RWTHMoodle, which may occur close to the deadline.

Hint 3: As a technical note. Some IDEs such as DataSpell may automatically strip jupyter notebook cell metadata. If you are able, please re-add it from the source notebooks before submission. This is necessary for our grading.

Enter your group number and members with matriculation numbers below.

In [1]:
GROUP_NO = 123 # group number
GROUP_MEMBERS = {
    123456: "firstname lastname", # mat. no. : name,
    234567: "firstname lastname",
    345678: "firstname lastname",
}

---

In [2]:
# required imports
# do not edit!

import asyncio
import itertools
import typing
from datetime import timedelta, datetime

import numpy as np
import pandas as pd

import plotly.express as px
from plotly.graph_objects import FigureWidget

# The streaming_utils.py file has to be in the scripts/ directory next to this notebook
from scripts.streaming_utils import CreationEvent, wiki_stream, EventStreams, Switch, auto_off, run_loop

# Question 5: Big Data (21 pts)

In this section, we will be processing live wikipedia edits in a streaming fashion.
API consumption and event handling are common use-cases for streaming approaches.

The Wikimedia Foundation provides publicly accessible event streams about updates on the wikis and web resources they operate.\
Refer to the following for documentation about the endpoints: [API documentation](https://stream.wikimedia.org/?doc#/streams/get_v2_stream_revision_create).

Some important things to note that hold for the entire task: 
1. We are using some advanced python constructs in the provided code such as asynchronous co-routines. You do not have to understand or adapt such code; simply use the functionality and stay within the `# YOUR CODE HERE` regions.
2. Some pre-implemented code is separated into the file `streaming_utils.py`. You are encouraged to look at it to see how the used methods are defined. For the following imports to work, it has to be in the same directory as this notebook.
3. You do not have to "go beyond" the provided stream objects. When we ask for e.g. "the number of edits", we are always referring to what the `wiki_edits_stream` provides. You do not have to tune the stream settings or filters to somehow attempt to determine what you think is the "true" value.
4. The API wrapper we use here sometimes prints errors (in red) when an API response appears to be incompletely transmitted. This can happen, you do not have to fix this or correct for it; simply ignore it.
5. You can freely edit cells with a `# configuration` comment; you do not have to recreate the initial state. In the same manner, you can create additional cells for your debugging etc. Just make sure to not change the structure of explicit solution cells that contain `# YOUR CODE HERE` or "__Student Answer:__ _your answer here_".

With that out of the way, let's dive in!

## Exploring the Event Stream (0 pts)

Get yourself familiar with the provided utility code that connects to the API and receives all edit and page creation events that occur on wikis operated by the Wikimedia foundation. \
To do so, have a look at the following provided code snippet.
It's a simple processing loop that prints for each event the following information in a readable manner:
- the (wiki) domain
- the edited/created page title
- the username ("user_text") and user account age in days (using "user_registration_dt"), if available
- revision content type and length

This example shows you how to iterate over the event stream using our utility code. The event processing loop runs continuously in the background, however after `AUTO_STOP_TIME`, the loop is automatically terminated to stop infinite loops and breaking the notebook with the huge amount of console output. This variable is used in all tasks and you can edit it as you like for your testing.
You can also manually terminate the background processing with the `Switch` object returned by `run_loop`.
The following code cells show some exemplary usage. Note that this co-routine-based execution swallows exceptions which means your function might raise an exception that does not show up in the notebook console. You can assess whether an exception occurred (e.g., due to a programming mistake) via calling `[your task].exception()` as you can see a few cells below.

In [3]:
# configuration
AUTO_STOP_TIME = timedelta(seconds=10)

In [4]:
async def simple_printing(switch: Switch):
    with wiki_stream(streams=['page-create', 'revision-create']) as wiki_edits_stream:
        wiki_edits_stream: EventStreams = wiki_edits_stream
        print('Listening...')

        # `wiki_edits_stream` is a blocking event source
        # that means this for-loop will wait until events arrive from the server
        # that also means it won't terminate on its own, which is why we use the `switch` object (the AUTO_STOP termination uses it too)
        for e in wiki_edits_stream:
            # this check makes sure that we can gracefully stop this endless iteration over future events if the `switch` object's state is toggled `off`
            # the switch object can be manipulated in separate cells, as you can see below
            if switch.off:
                break
            
            change = typing.cast(CreationEvent, e)  # if you have a nice python IDE, this will help with auto-completion
            
            reg_date = change['performer'].get('user_registration_dt')

            age = (pd.Timestamp.now(tz='UTC') - pd.to_datetime(reg_date,
                                                              utc=True)).days if reg_date is not None else 'unknown'
            print(
                f"""Page "{change['page_title']}" on {change['meta']['domain']} edited by {change['performer']['user_text']} (account age: {age} days) details: {change.get('rev_content_model')}:{change.get('rev_len')}""")
            
            # this line is here for technical reasons related to co-routines
            # it is good practice to signal that execution may be paused here, to not excessively hog the CPU
            # you do not have to concern yourself with this; it is always part of the given code skeleton
            await asyncio.sleep(0)

        print('Stopping')

In [5]:
# run it like this
printing_task, printing_auto_stop_task, printing_switch = run_loop(simple_printing, AUTO_STOP_TIME)

Switching off in 0:00:10
Listening...
Page "2022_Desert_Cup_T20I_Series" on en.wikipedia.org edited by Batagur baska (account age: 98 days) details: wikitext:21058
Page "Q111252519" on www.wikidata.org edited by Florentyna (account age: 4074 days) details: wikibase-item:11533
Page "Q87621692" on www.wikidata.org edited by Emijrpbot (account age: 4106 days) details: wikibase-item:10586
Page "Q76900260" on www.wikidata.org edited by Cewbot (account age: 2856 days) details: wikibase-item:35862
Page "Хемлок_(Охајо)" on sr.wikipedia.org edited by Dcirovicbot (account age: 3807 days) details: wikitext:4978
Page "Q78935039" on www.wikidata.org edited by Emijrpbot (account age: 4106 days) details: wikibase-item:8702
Page "Jesús_Cuadrado" on ca.wikipedia.org edited by Enric (account age: unknown days) details: wikitext:3316
Page "Talk:Burevestnik_(1906)" on en.wikipedia.org edited by Cewbot (account age: 3315 days) details: wikitext:371
Page "Q111194231" on www.wikidata.org edited by Florentyna

In [None]:
# check state by printing the task
printing_task

In [None]:
# stop gracefully
printing_switch.turn_off()

In [None]:
# stop forcefully
printing_task.cancel()

In [None]:
# when the task has terminated, you can explicitly check for a possible exception (just printing the task like above also includes this info)
# when you get no output at all from your code, it's a good idea to take a look at this to debug possible programming errors
# print(printing_task.exception())

## Computing Running Statistics (6.5 pts)

We have learned that storing the entire event stream in memory is often not feasible. Many typical analysis tasks, or in this case, simple statistics, can be reformulated such that they do not require the entire dataset but can make do with a much smaller internal state.

An example is a running (weighted) average. The usual formulation for weights $w_i$ and data points $x_i$ ($i \in \{1,\ldots,N\}$) is
$$\mu = \frac{1}{\sum_{i}w_i} \sum_{i} w_i x_i$$

As a prerequisite for our stream processing, we need a function to compute a running average incrementally.

### a)
Complete the implementation of `compute_running_average` in the following code cell.

_Hint: Notice that calling `compute_running_average` with the weights set to the running stream length $N$ (number of elements condensed into the running average), reduces to $\mathit{compute\_running\_average}(\mu, v, N, 1) = \frac{N}{N+1}\cdot \mu + \frac{1}{N+1} \cdot v$._

In [6]:
def compute_running_average(running_average: float, new_value: float, running_weight: float, new_weight: float) -> \
        tuple[float, float]:
    """
    Computes the updated running weighted average for new value `new_value`.
    Specifically, returns a tuple of the new running average and new running weight.
    @param running_average intermediate result up to the last element
    @param new_value new element
    @param running_weight accumulated weight up to the last element
    @param new_weight weight of the new element
    @returns (new_running_average, new_running_weight)
    """
    # YOUR CODE HERE
    new_running_average = ((running_weight * running_average) + new_value) / (running_weight + new_weight)
    new_running_weight = running_weight + new_weight
    return (new_running_average, new_running_weight)


### b)
Describe how this algorithm's space usage scales. Assume $N$ for the length of the stream up to this point and constant space for primitive datatypes like integers, floats, etc. \[in contrast to unbounded numbers that scale logarithmically in size with their value\]. You may use Big $\mathcal{O}$-notation, but you do not have to. \
_Hint: The standard version needs all $w_i$ and $x_i$ values, so $2N$ variables in total. In Big $\mathcal{O}$-notation that is $\mathcal{O}(N)$._

__Student Answer:__ _your answer goes here_

### c)

We can see that there are a lot of edits happening, but can we get some numbers on that? \
**Specifically, in this task, we only care about edits made by non-bot (presumably human) users where the content of the page actually changed due to the revision. Therefore, you have to filter the events that are counted.**

To measure the rate, we want to make use of the running average function implemented in part i) to calculate the number of (filtered) edits per second.

Instead of processing events individually, we use batch processing with a batching interval as configured by `BATCH_INTERVAL`. \
At the end of each batch, the rate of edits should be calculated and incorporated into the running average. Use the true, i.e., measured, duration of the batch as the weight, not the static `BATCH_INTERVAL`.
Both the edit rate within the batch and the overall average should be printed.

The output may look something like this: `[16:05:14.779062] actual batch duration: 5.17s|edits per second: {batch=9.87 | overall=8.33}`.

_Hint: Check the [API documentation](https://stream.wikimedia.org/?doc#/streams/get_v2_stream_revision_create) to find relevant fields._


In [7]:
# configuration
BATCH_INTERVAL = timedelta(seconds=2)
AUTO_STOP_TIME = timedelta(seconds=10)

In [8]:
import warnings
warnings.filterwarnings('ignore')

In [9]:
async def edit_rate(switch: Switch):
    with wiki_stream(streams=['page-create', 'revision-create']) as wiki_edits_stream:
        wiki_edits_stream: EventStreams = wiki_edits_stream
        
        print('Listening...')
        running_avg = 0
        running_weight = 0
        num_batches = AUTO_STOP_TIME / BATCH_INTERVAL
        batchnum = 1
        while switch.on and batchnum <= num_batches:
            batch_start_time = datetime.utcnow()
            print('Starting batch')
            num_edits = 0
            print("batchnum: ", batchnum, "with start time :", batch_start_time)                                                             
            for e in wiki_edits_stream:
                #print('hi')
                if switch.off:
                    break
                num_edits += 1
                if ((datetime.utcnow() - batch_start_time).total_seconds() >= BATCH_INTERVAL.total_seconds()):
                    break
                await asyncio.sleep(0)

            duration = (datetime.utcnow() - batch_start_time)
            #running_weight += duration.total_seconds()
            running_avg, running_weight = compute_running_average(running_avg, num_edits, running_weight, duration.total_seconds() )
            
            print('Finished batch at time: ', datetime.utcnow(), "duration :", (datetime.utcnow() - batch_start_time))
            print('average number of edits per second: ', (num_edits / duration.total_seconds()))
            print('running average is: ', running_avg)
            batchnum += 1
            await asyncio.sleep(0)
    print('Stopping...')

time0 = 

In [10]:
AUTO_STOP_TIME = timedelta(seconds=20)
BATCH_INTERVAL = timedelta(seconds=5)
edit_rate_task, edit_rate_auto_stop_task, edit_rate_switch = run_loop(edit_rate, AUTO_STOP_TIME)

Switching off in 0:00:20
Listening...
Starting batch
batchnum:  1 with start time : 2024-01-29 14:50:29.145908
Finished batch at time:  2024-01-29 14:50:34.200298 duration : 0:00:05.054391
average number of edits per second:  19.389104708090105
running average is:  19.389104708090105
Starting batch
batchnum:  2 with start time : 2024-01-29 14:50:34.200532
Finished batch at time:  2024-01-29 14:50:39.251216 duration : 0:00:05.050685
average number of edits per second:  23.561188505545495
running average is:  21.474381557603202
Starting batch
batchnum:  3 with start time : 2024-01-29 14:50:39.251455


{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/File:Campanula_scheuchzeri004.jpg","request_id":"510109f7-c6d3-4696-bff6-9f2118372a04","id":"59284c8e-db9e-4cc7-ae34-46d5c78ee483","dt":"2024-01-29T14:50:40Z","domain":"commons.wikimedia.org","stream":"mediawiki.revision-create","topic":"codfw.mediawiki.revision-create","partition":0,"offset":591104577},"database":"commonswiki","page_id":20404149,"page_title":"File:Campanula_scheuchzeri004.jpg","page_namespace":6,"rev_id":846827004,"rev_timestamp":"2024-01-29T14:50:40Z","rev_sha1":"5moh86ja5rr2yzo7p2z1nwotjkkl9k6","rev_minor_edit":false,"rev_len":7681,"rev_content_model":"wikitext","rev_content_format":"text/x-wiki","performer":{"user_text":"SchlurcherBot","user_groups":["bot","ipblock-exempt","*","user","autoconfirmed"],"user_is_bot":true,"user_id":771189,"user_registration_dt":"2009-07-27T17:51:36Z","user_edit_count":123929470},"page_is_redirect":false,"comment":"/* wbeditentity-update:0| 

Finished batch at time:  2024-01-29 14:50:44.255075 duration : 0:00:05.003620
average number of edits per second:  21.584385855272295
running average is:  21.510812227751718
Starting batch
batchnum:  4 with start time : 2024-01-29 14:50:44.255181
Finished batch at time:  2024-01-29 14:50:49.355857 duration : 0:00:05.100677
average number of edits per second:  24.114474350531527
running average is:  22.167954717202214
Stopping...
Switching off now


In [None]:
# check statebatch_start_time
edit_rate_task

In [None]:
# manually stop
edit_rate_switch.turn_off()

## Visualizing Streams with Limited Memory Usage (5.5 pts)
To visualize the variability of the edit rate better, we want to continuously update a line plot instead of this endless printed output.\
To not have an unbounded plot size, only the values for the last few updates should be kept in memory.

### d)
Which type of "Streaming Data Range" variant introduced in the lecture does this plotting correspond to?

__Student Answer:__ _your answer goes here_

### e)
Complete the implementation of the method `visualized_edit_rate` (a few cells below) to continuously update the following `FigureWidget` `fw` every `UPDATE_INTERVAL` seconds with the last `UPDATE_HISTORY` measured edit rates.
To do this, adapt the batch-based edit rate algorithm from _c)_. The update interval corresponds to the batching interval. \
**To simplify this task and make it less dependent on _c)_, do not filter the edits here. You also do not need the running average here: only the within-batch edit rates are relevant.**

Do not worry about the figure updating functionality itself, simply use the provided `figure_update_method(time: list[datetime], values: list[float])` in the code skeleton.


In [11]:
# configuration
UPDATE_HISTORY = 10
UPDATE_INTERVAL = timedelta(seconds=5)

In [12]:
# recommended to not edit

initialization_dummy = pd.DataFrame([[datetime.now(), 0.0]], columns=['update time', 'edit rate'])
f = px.line(initialization_dummy, x='update time', y='edit rate', markers=True,
            title=f'The running edit rate (last {UPDATE_HISTORY} values)')
f.update_layout(yaxis_title_text='edit rate [1/s]')
fw = FigureWidget(f)

display(initialization_dummy)

def upd_fw(time: list[datetime], values: list[float]):
    """updates the above created `FigureWidget` fw with `time` and `values` for the x- and y-axis respectively"""
    print(time)
    print(values)
    fw.data[0].x = np.array(time)
    fw.data[0].y = np.array(values)


Unnamed: 0,update time,edit rate
0,2024-01-29 15:51:04.499654,0.0


hi
[datetime.datetime(2024, 1, 29, 14, 51, 14, 704410)]
[95]
Finished batch at time:  2024-01-29 14:51:19.727810 duration : 0:00:05.023402
average number of edits per second:  18.92375995588971
Starting batch
batchnum:  2 with start time : 2024-01-29 14:51:19.727993


{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/File:CH-NB_Photoglob-Wehrli_EAD-WEHR-15191-B.tif","request_id":"2611cf95-307c-4700-b9e1-7e605f5b61f9","id":"96f829fa-881d-4b40-ba3d-0fd1a3312bbb","dt":"2024-01-29T14:51:19Z","domain":"commons.wikimedia.org","stream":"mediawiki.revision-create","topic":"codfw.mediawiki.revision-create","partition":0,"offset":591105523},"database":"commonswiki","page_id":144725214,"page_title":"File:CH-NB_Photoglob-Wehrli_EAD-WEHR-15191-B.tif","page_namespace":6,"rev_id":846827095,"rev_timestamp":"2024-01-29T14:51:19Z","rev_sha1":"kjl8yws79ggnjo57n954r1jy4o20jc5","rev_minor_edit":false,"rev_len":1152,"rev_content_model":"wikitext","rev_content_format":"text/x-wiki","performer":{"user_text":"Swiss National Library","user_groups":["aut
Unterminated string starting at: line 1 column 817 (char 816)


hi
[datetime.datetime(2024, 1, 29, 14, 51, 14, 704410), datetime.datetime(2024, 1, 29, 14, 51, 19, 727993)]
[95, 115]
Finished batch at time:  2024-01-29 14:51:24.894532 duration : 0:00:05.166541
average number of edits per second:  22.272034110685425
Starting batch
batchnum:  3 with start time : 2024-01-29 14:51:24.894679
hi
[datetime.datetime(2024, 1, 29, 14, 51, 14, 704410), datetime.datetime(2024, 1, 29, 14, 51, 19, 727993), datetime.datetime(2024, 1, 29, 14, 51, 24, 894679)]
[95, 115, 117]
Finished batch at time:  2024-01-29 14:51:30.116424 duration : 0:00:05.221747
average number of edits per second:  22.4213270129465
Starting batch
batchnum:  4 with start time : 2024-01-29 14:51:30.116558
hi
[datetime.datetime(2024, 1, 29, 14, 51, 14, 704410), datetime.datetime(2024, 1, 29, 14, 51, 19, 727993), datetime.datetime(2024, 1, 29, 14, 51, 24, 894679), datetime.datetime(2024, 1, 29, 14, 51, 30, 116558)]
[95, 115, 117, 108]
Finished batch at time:  2024-01-29 14:51:35.137927 duration : 

In [None]:
"""
!pip install -U ipywidgets==7.5
import getpass
import os

password = getpass.getpass()
command = "sudo -S jupyter nbextension enable --py widgetsnbextension --sys-prefix" # can be any command but don't forget -S as it enables input from stdin
os.popen(command, 'w').write(password+'\n')
#!jupyter nbextension enable --py widgetsnbextension --sys-prefix
"""


In [13]:
fw

FigureWidget({
    'data': [{'hovertemplate': 'update time=%{x}<br>edit rate=%{y}<extra></extra>',
              'legendgroup': '',
              'line': {'color': '#636efa', 'dash': 'solid'},
              'marker': {'symbol': 'circle'},
              'mode': 'lines+markers',
              'name': '',
              'orientation': 'v',
              'showlegend': False,
              'type': 'scatter',
              'uid': '78ee1fc8-0655-410c-8e8d-0047a5923b52',
              'x': array([datetime.datetime(2024, 1, 29, 15, 51, 4, 499654)], dtype=object),
              'xaxis': 'x',
              'y': array([0.]),
              'yaxis': 'y'}],
    'layout': {'legend': {'tracegroupgap': 0},
               'template': '...',
               'title': {'text': 'The running edit rate (last 10 values)'},
               'xaxis': {'anchor': 'y', 'domain': [0.0, 1.0], 'title': {'text': 'update time'}},
               'yaxis': {'anchor': 'x', 'domain': [0.0, 1.0], 'title': {'text': 'edit rate [1/s]

For reference, here's what your plot could look like.

![edit_rate_viz.png](attachment:2de1fd31-d186-4cd3-a304-5f6c89df63a1.png)

In [14]:
async def visualized_edit_rate(switch: Switch,
                               figure_update_method: typing.Callable[[list[datetime], list[float]], None]):
    with wiki_stream(streams=['page-create', 'revision-create']) as wiki_edits_stream:
        wiki_edits_stream: EventStreams = wiki_edits_stream
        times = []
        vals = []
        print('Listening...')
        num_batches = AUTO_STOP_TIME / BATCH_INTERVAL
        batchnum = 1
        while switch.on and batchnum <= num_batches:
            batch_start_time = datetime.utcnow()
            print('Starting batch')
            num_edits = 0
            print("batchnum: ", batchnum, "with start time :", batch_start_time)                                                             
            for e in wiki_edits_stream:
                if switch.off:
                    break
                num_edits += 1
                if ((datetime.utcnow() - batch_start_time).total_seconds() >= BATCH_INTERVAL.total_seconds()):
                    break
                await asyncio.sleep(0)
            
            print('hi')
            duration = (datetime.utcnow() - batch_start_time)
            times.append(batch_start_time)
            vals.append(num_edits)
            figure_update_method(times, vals)
            print('Finished batch at time: ', datetime.utcnow(), "duration :", (datetime.utcnow() - batch_start_time))
            print('average number of edits per second: ', (num_edits / duration.total_seconds()))
            batchnum += 1
            await asyncio.sleep(0)
    print('Stopping...')

In [15]:
# configuration
AUTO_STOP_TIME = timedelta(seconds=20)
UPDATE_HISTORY = timedelta(seconds=15)
BATCH_INTERVAL = timedelta(seconds=5)
UPDATE_INTERVAL = timedelta(seconds=5)

v_edit_rate_task, v_edit_rate_auto_stop_task, v_edit_rate_switch = run_loop(visualized_edit_rate, AUTO_STOP_TIME,
                                                                            figure_update_method=upd_fw)
#print(times)
#print(vals)

Switching off in 0:00:20
Listening...
Starting batch
batchnum:  1 with start time : 2024-01-29 14:51:14.704410


In [None]:
fw

In [None]:
# check state
v_edit_rate_task

In [None]:
# manually stop
v_edit_rate_switch.turn_off()

## Streaming Algorithms (5 pts)

Next, we're going slightly deeper into the context of these edit events.

We want to see which wiki domains receive a lot of updates.
More precisely, for a parameter $k\in\mathbb{N}$ and with $N\in\mathbb{N}$ being the length of the stream up to this point, return all domains that had at least $N/k$ edits. 
This problem is also referred to as _Heavy Hitter Detection_ and belongs to the family of stream summary algorithms.

k = free parameter



n = length of stream



return all domains with n/k edits

### a)

Though there are specialized approximate algorithms with probabilistic guarantees out there, we simplify the problem a bit and consider the simple _[Misra-Gries Summary](https://en.wikipedia.org/wiki/Misra%E2%80%93Gries_summary)_.

In pseudocode courtesy of the above-linked Wikipedia page:
```
algorithm misra-gries:[4]
    input: 
        A positive integer k
        A finite sequence s taking values in the range 1,2,...,m
    output: An associative array A with frequency estimates for each item in s
    
    A := new (empty) associative array
    while s is not empty:
        take a value i from s
        if i is in keys(A):
            A[i] := A[i] + 1
        else if |keys(A)| < k - 1:
            A[i] := 1
        else:
            for each K in keys(A):
                A[K] := A[K] - 1
                if A[K] = 0:
                    remove K from keys(A)
    return A
```

Reformulate the above algorithm into our streaming setting using the following code skeleton:


In [16]:
# required imports
# do not edit!

import asyncio
import itertools
import typing
from datetime import timedelta, datetime

import numpy as np
import pandas as pd

import plotly.express as px
from plotly.graph_objects import FigureWidget

# The streaming_utils.py file has to be in the scripts/ directory next to this notebook
from scripts.streaming_utils import CreationEvent, wiki_stream, EventStreams, Switch, auto_off, run_loop

In [17]:
def update_misra_gries_summary(running_counts: dict[str, int], new_element: str, k: int) -> None:
    """
    Updates `running_counts` in-place according to the Misra-Gries Summary algorithm.
    :param running_counts: a dictionary of running counts - this is the summary
    :param new_element: the next observed element
    :param k: the parameter $k$ that defines the heavy-hitter threshold  
    """
    if new_element in running_counts:
        # Increment the count if the element is already in the summary
        running_counts[new_element] += 1
    elif len(running_counts) < k - 1:
        # If the summary has less than k-1 elements, add the new element
        running_counts[new_element] = 1
    else:
        # Decrement counts for all elements, and remove elements with count 0
        keys_to_remove = []
        for key in running_counts:
            running_counts[key] -= 1
            if running_counts[key] == 0:
                keys_to_remove.append(key)
        for key in keys_to_remove:
            del running_counts[key]
    return running_counts

### b)

Finally, apply your implementation to the edit stream and update the following `FigureWidget` with the current summary of `k` heavy-hitters every `UPDATE_INTERVAL` seconds. 


In [18]:
# configuration
UPDATE_INTERVAL = timedelta(seconds=5)
k = 10

In [None]:
# recommended to not edit

initialization_dummy_heavy_hitters = pd.DataFrame([['placeholder', 0]], columns=['domain', 'count'])
f = px.bar(initialization_dummy_heavy_hitters, x='domain', y='count', title=f'The k={k} heavy-hitter domains')

fw_heavy_hitters = FigureWidget(f)

def upd_fw_heavy_hitters(summary: dict[str, int]) -> None:
    elements, counts = list(zip(*summary.items()))
    sorted_idx = np.argsort(counts)[::-1]

    fw_heavy_hitters.data[0].x = np.take(elements, sorted_idx)
    fw_heavy_hitters.data[0].y = np.take(counts, sorted_idx)


commons.wikimedia.org
commons.wikimedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
en.wikisource.org
en.wikisource.org
en.wikipedia.org
en.wikipedia.org
de.wikipedia.org
de.wikipedia.org
www.wikidata.org
www.wikidata.org
commons.wikimedia.org
commons.wikimedia.org
www.wikidata.org
www.wikidata.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
commons.wikimedia.org
commons.wikimedia.org
commons.wikimedia.org
commons.wikimedia.org
www.wikidata.org
www.wikidata.org
sr.wikipedia.org
sr.wikipedia.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
es.wikipedia.org
es.wikipedia.org
en.wikipedia.org
en.wikipedia.org
pt.wikipedia.org
pt.wikipedia.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
commons.wikimedia.org
commons.wikimedia.org
commons.wikimedia.org
commons.

{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/Minister_of_Housing_and_Local_Government_(Malaysia)","request_id":"8488b314-3b34-48d4-a305-5d4a68d37d4c","id":"064f238d-b6e9-4c1f-a1a9-277da933b7ba","dt":"2024-01-29T14:52:50Z","domain":"en.wikipedia.org","stream":"mediawiki.revision-create","topic":"codfw.mediawiki.revision-create","partition":0,"offset":591107723},"database":"enwiki","page_id":53476119,"page_title":"Minister_of_Housing_and_Local_Government_(Malaysia)","page_namespace":0,"rev_id":1200456254,"rev_timestamp":"2024-01-29T14:52:50Z","rev_sha1":"2do125sxl6qnqwaf5xqwbi8riixrwlm","rev_minor_edit":false,"rev_len":29289,"rev_content_model":"wikitext","rev_content_format":"text/x-wiki","performer":{"user_text":"Nampong123","user_groups":["extendedconfirmed","*","user","autoconfirmed"],"user_is_bot":false,"user_id":45649963,"user_registration_dt":"2023-03-29T07:27:14Z","user_edit_count":7406},"page_is_redirect":false,"comment":"/* Local go

commons.wikimedia.org
commons.wikimedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
sr.wikipedia.org
sr.wikipedia.org
id.wikipedia.org
id.wikipedia.org
de.wikipedia.org
de.wikipedia.org
commons.wikimedia.org
commons.wikimedia.org
en.wikipedia.org
en.wikipedia.org
commons.wikimedia.org
commons.wikimedia.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
en.wikipedia.org
en.wikipedia.org
mai.wikipedia.org
mai.wikipedia.org
ku.wikipedia.org
ku.wikipedia.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
commons.wikimedia.org
commons.wikimedia.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
www.wikidata.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
en.wikipedia.org
commons.wikimedia.org
commons.wikimedia.org
www.wikidata.org
www.wikidata.org
en.wikipedia.org
en.wikipedia.org
www.wikidata.org
www.wikidata.

In [20]:
fw_heavy_hitters

FigureWidget({
    'data': [{'alignmentgroup': 'True',
              'hovertemplate': 'domain=%{x}<br>count=%{y}<extra></extra>',
              'legendgroup': '',
              'marker': {'color': '#636efa', 'pattern': {'shape': ''}},
              'name': '',
              'offsetgroup': '',
              'orientation': 'v',
              'showlegend': False,
              'textposition': 'auto',
              'type': 'bar',
              'uid': 'c47779f8-ef99-49f2-a558-229421ca88f5',
              'x': array(['placeholder'], dtype=object),
              'xaxis': 'x',
              'y': array([0]),
              'yaxis': 'y'}],
    'layout': {'barmode': 'relative',
               'legend': {'tracegroupgap': 0},
               'template': '...',
               'title': {'text': 'The k=10 heavy-hitter domains'},
               'xaxis': {'anchor': 'y', 'domain': [0.0, 1.0], 'title': {'text': 'domain'}},
               'yaxis': {'anchor': 'x', 'domain': [0.0, 1.0], 'title': {'text': 'coun

For reference, here's what your plot could look like.

![heavy_hitter_domains.png](attachment:51fd7866-9293-4fb8-9c5c-d61d21bfbcf4.png)

In [21]:
async def heavy_hitters(switch: Switch, k_heavy_hitters_update_method: typing.Callable[[dict[str, int], str, int], None],
                        figure_update_method: typing.Callable[[dict[str, int]], None]):
    with wiki_stream(streams=['page-create', 'revision-create']) as wiki_edits_stream:
        wiki_edits_stream: EventStreams = wiki_edits_stream

        print('Listening...')
        heavy_hitters_dict={}
        while switch.on:
            for e in wiki_edits_stream:
                if switch.off:
                    break
                change = typing.cast(CreationEvent, e)
                key = change['meta']['domain']
                print(key)
                if key is not None:
                    print(change['meta']['domain'])
                    heavy_hitters_dict = k_heavy_hitters_update_method(heavy_hitters_dict, key, k)
                figure_update_method(heavy_hitters_dict)
                await asyncio.sleep(0) # do not change!
    print('Stopping...')

In [22]:
AUTO_STOP_TIME = timedelta(seconds=30)
heavy_hitters_dict = {}
heavy_hitters_task, heavy_hitters_auto_stop_task, heavy_hitters_switch = run_loop(heavy_hitters, AUTO_STOP_TIME,
                                                                                  k_heavy_hitters_update_method=update_misra_gries_summary,
                                                                                  figure_update_method=upd_fw_heavy_hitters)

Switching off in 0:00:30
Listening...


In [None]:
# check state
heavy_hitters_task

In [None]:
# manually stop
heavy_hitters_switch.turn_off()

## Heavy-Hitters via Map-Reduce (4pts)
As a thought experiment, let's assume that instead of this approximation you wanted to calculate the exact heavy hitters and were ready to save the streamed data (over a certain time frame) to disk.\
Specifically, we want to consider how these k-heavy-hitters could be computed in a **Map-Reduce** fashion with the data stored in a HDFS.

To warm up, take a look at the following example as a way to structure and express a sequence of map-reduce tasks. Read this thoroughly, so that you can answer the next question properly. Note that though we use python code combined with pseudo-code/comments, the cell is not intended to be executed in this notebook. It is merely to describe the functions as precisely as possible without requiring an extensive technical setup.

Some more details:
Recall the map/reduce definitions presented in the lecture.
$$
\begin{align*}
\mathit{map} &\in K_1 \times V_1 \to (K_2 \times V_2)^* \\
\mathit{reduce} &\in K_2 \times (V_2)^* \to (K_3 \times V_3)^*
\end{align*}
$$

We extend them slightly _with context_ to make them more practical and match the reality of Hadoop Map-Reduce programming.
$$
\begin{align*}
\mathit{map} &\in \mathit{Context} \times (K_1 \times V_1) \to (K_2 \times V_2)^* \\
\mathit{reduce} &\in \mathit{Context} \times (K_2 \times (V_2)^*) \to (K_3 \times V_3)^*
\end{align*}
$$
where $Context$ is a constant size dictionary that allows you to pass in a fixed number of arguments into your functions.

Additionally, we specifiy the input and output files for each map-reduce stage. The files are also merely symbolic: they are not actual files to be created.

The following cell contains a simple example that raises numbers (keyed by an integer index) to a parameterized value that is followed by a summation over all indices.
Note the correspondence of the function signatures to the mathematical definitions. You can use `str` (strings) and `int` (integers) as key/value domains ($K_i, V_j, \ldots$).

In [24]:
# describe file(s) you need
# note that these files are merely symbolical, they are not required to be required as this is not executable code
# "their typing" is just illustrative
# distributed file vector.txt: list[tuple[int, int]]
# contains tuples (i, x): tuple[int, int] where i is an index, and x an integer

# distributed file intermediate.txt: list[tuple[str, int]]
# distributed file result.txt: list[tuple[str, int]]

# describe how the context would have to be initialized. This does not have to be a concrete value; it just has be known/computed beforehand.
Context = {}
Context['n'] = 2 

# note this function signature's equivalence to $\mathit{map_1} \in \mathit{Context} \times (int \times int) \to (int \times int)^*$
def map_1(Context, key_value_pair: tuple[int, int]) -> list[tuple[int, int]]:
    k, v = key_value_pair
    return [(k, v)]

def reduce_1(Context, key_values_pair: tuple[int, list[int]]) -> list[tuple[int, int]]:
    k, vs = key_values_pair
    # if we assume unique indices, this summation is superflous; there's only ever exactly one value in each list
    return [(k, sum(v ** Context['n'] for v in vs))]

# run (map_1, reduce_1) with input vector.txt and output intermediate.txt
# intermediate.txt now contains pairs of (i, $x^n$): tuple[int, int] 

def map_2(Context, key_value_pair: tuple[int, int]) -> list[tuple[str, int]]:
    k, v = key_value_pair
    return [('result', v)]

def reduce_2(Context, key_values_pair: tuple[str, list[int]]) -> list[tuple[str, int]]:
    k, vs = key_values_pair
    return [('result', sum(v for v in vs))]

# run (map_2, reduce_2) with input intermediate.txt and output result.txt

# result.txt now contains just one key-value pair: ('result', $\sum_i x_i^n$): tuple[str, int]
# if you had another stage of map-reduce, you could assign it to the context (pseudo-code) as follows
# ('result', value) = result.txt
# Context['res'] = value
# there is no rigorous syntax to use here, just make sure to be clear and do not make any computations here

### a)

Now that we've warmed up:

Assume that the domains are simply stored along with their recording time (irrelevant here) as the key.\
$k$ is known and present in your context, i.e., `Context['k'] = k`. \
Consider the situation in which you do not know how many events $N$ were processed, i.e., $N$ **is unknown a-priori**.

**Describe the map-reduce stages you need to eventually determine the $k$ heavy-hitter domains. Use the same code + pseudo-code style as above.**

The initial input is:
```
# distributed file domains.txt: list[tuple[str, str]]
# contains tuples (t, d): tuple[str, str] where t is the irrelevant key (as a string), and d is a domain name
```


In [None]:
# distributed file domains.txt: list[tuple[str, str]]
# contains tuples (t, d): tuple[str, str] where t is the irrelevant key (as a string), and d is a domain name

# distributed file intermediate.txt: list[tuple[str, int]]
# distributed file result.txt: list[tuple[str, int]]

#describe how the context would have to be initialized
Context = {}
Context['k'] = k  

def map_1(Context, key_value_pair: tuple[str, str]) -> list[tuple[str, int]]:
    _, domain = key_value_pair
    return [(domain, 1)]

def reduce_1(Context, key_values_pair: tuple[str, list[int]]) -> list[tuple[str, int]]:
    domain, counts = key_values_pair
    return [(domain, sum(counts))]

#run (map_1, reduce_1) with input domains.txt and output intermediate.txt
#intermediate.txt now contains pairs of (domain, count): tuple[str, int]

def map_2(Context, key_value_pair: tuple[str, int]) -> list[tuple[str, int]]:
    domain, count = key_value_pair
    return [('result', (domain, count))]

def reduce_2(Context, key_values_pair: tuple[str, list[tuple[str, int]]]) -> list[tuple[str, int]]:
    _, domain_counts = key_values_pair
    sorted_domain_counts = sorted(domain_counts, key=lambda x: x[1], reverse=True)[:Context['k']]
    return [('result', sorted_domain_counts)]

#run (map_2, reduce_2) with input intermediate.txt and output result.txt
#result.txt now contains just one key-value pair: ('result', [(domain_1, count_1), (domain_2, count_2), ..., (domain_k, count_k)])


This approach first counts the occurrences of each domain in the input data and gives it a count of 1 (dropping the useless timestamp information.). Then, it adds the domain counts together, and then sorts domains based on their counts, and finally outputs the top-k heavy-hitter domains along with their counts.





