## General instructions

Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel/runtime** (Colab: in the menubar, select *Runtime*$\rightarrow$*Factory Reset Runtime*; Jupyter: in the menubar, select *Kernel*$\rightarrow$*Restart*) and then **run all cells** (Colab: in the menubar, select *Runtime*$\rightarrow$*Run all*; Jupyter: in the menubar, select *Cell*$\rightarrow$*Run All*).

Make sure you fill in any place that says `YOUR CODE HERE` or `"YOUR ANSWER HERE"`, as well as the list of the group members in the following cell.

Enter here the *Group Name* and the list of *Group Members*.

`GROUP NAME`

`GROUP MEMBERS`

In order to be able to have an evaluation DO NOT delete/cut the cells with code and answers. Once you have finished you can downolad the notebook (Colab: in the menubar, select *File*$\rightarrow$*Download .ipynb*; Jupyter: in the menubar, select *File*$\rightarrow$*Download as*$\rightarrow$*Notebook (.ipynb)*) and upload as an assignment on the e-learning platform.

The following cell will load the Google Drive extension for the current notebook, when the variable `MOUNT` is `True`. This allow you to mount the Google Drive filesystem for file persistence. The mountpoint will be `/content/gdrive`.
Furthermore, it will set the `PATH` variable, from now on, so that if you have to refer to external files you could do that by writing:

```python
os.path.join(PATH, filename)
```

This will append the filename after the specific PATH.

In [None]:
import os
MOUNT = False
if 'google.colab' in str(get_ipython()) and MOUNT:
    from google.colab import drive
    drive.mount('/content/gdrive')
    PATH = '/content/gdrive/MyDrive'
else:
    PATH = '.'

# Important warning

**⚠️ avoid copying, removing or modifying test cells, if you do that your assignment might be graded wrongly ⚠️**

---

## Map/Reduce *advanced* simulator

The library `map_reduce.py` provides a simulator of the *map/reduce* paradigm, which simulates the *map/reduce* steps by performing them in parallel.

You can give a look at the file content, if you're interested in the implementation.

The `map_reduce` module can be loaded by issuing the command:

```python
from map_reduce import map_reduce
```

provided that the `map_reduce.py` is in **the same** directory as this notebook (or the notebook using it). It is not a systemwide library but a custom one.

The `map_reduce` module is a higher-order function that takes two functions as its arguments, namely the `mapper` and the `reducer`, and returns another function (call it `apply`) that has an iterable as its argument. The `apply` function will perform all the steps of the *map/reduce* pipeline, namely:

1. it will transform each item of the input iterable (in parallel) by applying the `mapper()` function to that item;
2. it will partition the results of the *map* application into blocks having the same key value;
3. it will send each block to a single `reducer()` function that performs the reduction in parallel.

You will see an example of adapting the map/reduce functions for counting words and an usage of the map reduce simulator in the following cells. The final output format is slightly different from the one above and is more similar to an actual output of a map/reduce pipeline (i.e., a list of tuples instead of a dictionary).

The signatures of the two functions is as follows:

* `mapper(item) -> list of tuples (k, v)`
* `reducer(k, list of values) -> list of any`

In [None]:
# This will install the typeguard library that performs runtime checking of the function times
# it's needed in colab because every time you restart it a new environment will be created
!pip3 install typeguard

## Exercise 1

We aim at determining the top-$k$ salaries of a company. The employee data is stored in multiple files, one for each department, and has the following format:

```
<first_name>;<last_name>;<salary>
```

Since the company is really big, we cannot store in memory the full dataset consisting of all the files and we have to resort to the *map/reduce* paradigm to compute those values company-wide, that is to find the top-$k$ salaries among the *whole* company.

A possible modeling of the problem in terms of the Map/Reduce paradigm is to provide each mapper with the name of a single file and let it read its content and compute the top-$k$ salaries of that file.

As for the reducer, instead, we aim at having just a single reducer (*how can you ensure that a single reducer will be used?*) that selects among the list of top-$k$s that were determined by each mapper.

```
data_00.txt -----> mapper(): top-k of data_00.txt  ------+
                                                          \
data_01.txt -----> mapper(): top-k of data_01.txt  --------+---> reducer : determines the top-k out of the top-ks
                                                          /
                                            ...          /
                                                        /
data_n-1.txt -----> mapper(): top-k of data_n-1.txt ---+
```

Write the mapper and the reducer function to this aim.

We want to have the result in form of the top-$k$ tuples, that is having also the names of the overall top-$k$ employees. Assume that $k$ is set, initially, as a global variable.

In [None]:
from map_reduce import map_reduce

k = 5

def mapper(filename):
    # process the file in order to extract the employees having the top k salaries
    # you should ensure that the output goes to the same reducer
    # YOUR CODE HERE
    raise NotImplementedError()

def reducer(key, top_k_items):
    # process the list of top k items to extract the top k out of the top k
    # you might check what's coming from the partitioner by means of print(top_k_items)
    # and process it in the correct way
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# the glob library allows to search for files using the OS wildcards in a given directory, that is it creates
# an iterator of all filenames whose pattern is specified

import glob

apply = map_reduce(mapper, reducer)

result = apply(glob.glob('data_*.txt'))

print(result)

# this is the check against the sequential algorithm

def process_line(l):
    l = l.strip().split(';')
    l[2] = float(l[2])
    return tuple(l)

top_k = []
for filename in glob.glob('data_*.txt'):
    with open(filename) as f:
        data = sorted(map(process_line, f), key=lambda t: -t[2])
    top_k += data[:k]
top_k = sorted(top_k, key=lambda t: -t[2])
assert top_k[:k] == result

## Exercise 2

We want to generalize the mapper / reducer to be able to work with different values of $k$.

To this aim we want to make use of functional programming tools and:

1. add an argument `k` to the mapper and the reducer;
2. encapsulate the creation and application of the `apply` function in a function `compute_top(k)` that wraps the whole computation and returns the results of the process.

In [None]:
from functools import partial

def compute_top_k(k):
    # the solution should use the partial binding
    # YOUR CODE HERE
    raise NotImplementedError()
    
apply = compute_top_k(5)
result = apply(glob.glob('data_*.txt'))

result