### Config

Add to `/etc/hosts` the following:
```
127.0.0.1       datanode
```

We do this because the hadoop *namenode* (that we talk to for HDFS) returns the hostname of the datanode (i.e., `datanode`) but this returned hostname is inside the docker-compose network. This happens internally in the `kazoo` library hence this is the most straight-forward solution.

# Ecosystem

Please use `light` theme.

![system-architecture](images/docker_compose.png)

We have a `docker-compose` network of many different services. They are all managed by `mapreduce.cluster.LocalCluster` using different `docker-compose` commands for different purposes (for instance, `.scale`, `.clear`, `.shutdown_cluster`, etc.). Moreover, as users we submit jobs to the system using `.mapreduce` and get back `concurrent.future`s, but more to that later... In order to use `LocalCluster` one must authenticate with `Auth` (just use `username`='admin', `password`='admin' 😊).

The only requirement for the MapReduce system to work is accessible (`IP`, `PORT`) to `Zookeeper` and `HDFS` (in order to submit a job) and thats it!

Services:

1. `HDFS` : The Hadoop Distributed Filesystem, as the name suggests, works as a distributed Filesystem. The reason we chose `HDFS` is because we wanted to be able to effortlessly deploy it in a real cluster (asssumptions on a shared filesystem would make things more difficult in deployment).
2. `Zookeeper` :  The `Zookeeper` service (replicated 3 times) is mainly used for its extremely helpful recipes. We found quite handy following:
   1. Distributed Mutual Exclusion recipe
   2. Setting up `Watcher` callbacks for different purposes (`ChildrenWatch` - watches for z-node children updates, `DataWatch` - watches for specific z-node data updates).
   3. Distributed sequential ID generator recipe
3. `Worker` : The workers perform the Tasks. We have three endpoints, i.e., `/map-task`, `/shuffle-task` and `/reduce-task`.
4. `Master` : The masters, each able to handle MapReduce jobs in parallel (`threading`), are responsible for the proper execution of the Jobs. They send asynchronously `POST` requests to the workers' endpoints and wait for results. Moreover, they are responsible for the fault-tolerancy of the distributed system (handle `worker` deaths and other `master` deaths). Notably, the code for the masters is <ins>entirely</ins> `callbacks`. <ins>Things happen, and the master handles them accordingly</ins> (in coordination with other masters through Zookeeper recipes)! 

(Note: Yes, the `GIL` exists, but each master is I/O bound (spends most of the time waiting for things to happen). Hence, the `threading` is totally fine if not better than alternatives)

### Authenticate

Use the `Auth` in-between interface for fast authentication.

In [1]:
from mapreduce.authentication.auth import Auth

In [2]:
auth = Auth(username='admin', password='admin')

In [3]:
auth.is_authenticated()

True

### Initialize the docker-compose network 

As mentioned before, we use `LocalCluster` as the docker-compose network management and job-submission service.

In [4]:
from mapreduce.cluster.local_cluster import LocalCluster

In [5]:
cluster = LocalCluster(
    auth=auth,
    n_workers=4,
    n_masters=1,
    initialize=True,
    verbose=False
)

For sanity check, we can use `docker ps` in a terminal.

The `LocalCluster` has a `LocalMonitoring` instance which has methods for printing the state of the cluster in a more beautiful manner.

Let's use it and print the `Zookeeper` z-nodes current state and filesystem.

In [6]:
cluster.local_monitoring.print_zoo()


----------------- Zoo Masters -----------------
Master 79351b322643 :  MasterInfo(state='nothing')

----------------- Zoo Workers -----------------
Worker a7ea090e4453 :  WorkerInfo(state='idle')
Worker dfe82c5cec86 :  WorkerInfo(state='idle')
Worker c8865f541685 :  WorkerInfo(state='idle')
Worker 5e3c9efb87dd :  WorkerInfo(state='idle')

----------------- Zoo Map Tasks -----------------

----------------- Zoo Shuffle Tasks -----------------

----------------- Zoo Reduce Tasks -----------------

----------------- Zoo Jobs ---------------------


------------- Dead Worker Tasks ---------------------


For HDFS (will contain nothing right now):

In [7]:
cluster.local_monitoring.print_hdfs('jobs')

## MapReduce first job submission

We need to first define the input data, the map function and the reduce function. The assumption is that the map-reduce functions follow the following:

`map([x1, x2, ...]) -> [(k1, v2), (k2, v2), ...]`

`reduce([v1, v2, ...]) -> y`

where every element is arbitrary (any data structure)

Let's assume that our objective is to count how many times each character appears in the a list of words.

In [8]:
data = ['dasdsagf', 'mike', 'george', 'gertretr123', 'dsadsajortriojtiow']

def map_func(data):
    result = []
    for string in data:
        for char in string:
            result.append((char, 1))
    return result

def reduce_func(data):
    return sum(data)

For a quick sanity check

In [9]:
map_func(['dasdsagf'])

[('d', 1),
 ('a', 1),
 ('s', 1),
 ('d', 1),
 ('s', 1),
 ('a', 1),
 ('g', 1),
 ('f', 1)]

We are now ready to submit the job onto the MapReduce distributed system. We will use `.mapreduce` from `LocalCluster`. Note that we will return a `concurrent.futures` future object which represents a computation that hasn't necessarily completed yet. It's essentially a <ins>promise</ins> to hold the result of a computation that might still be ongoing - hence the name "future".

In [10]:
future = cluster.mapreduce(
    data=data, 
    map_func=map_func, 
    reduce_func=reduce_func, 
    requested_n_workers=4
)

In [11]:
future

<Future at 0x7f8ccebe4150 state=running>

Let's inspect what is happening behind the scenes.

In [12]:
cluster.local_monitoring.print_zoo()


----------------- Zoo Masters -----------------
Master 79351b322643 :  MasterInfo(state='nothing')

----------------- Zoo Workers -----------------
Worker a7ea090e4453 :  WorkerInfo(state='idle')
Worker dfe82c5cec86 :  WorkerInfo(state='idle')
Worker c8865f541685 :  WorkerInfo(state='idle')
Worker 5e3c9efb87dd :  WorkerInfo(state='idle')

----------------- Zoo Map Tasks -----------------
Task 0_0 :  Task(state='completed', worker_hostname='a7ea090e4453', received=True)
Task 0_1 :  Task(state='completed', worker_hostname='dfe82c5cec86', received=True)
Task 0_2 :  Task(state='completed', worker_hostname='c8865f541685', received=True)
Task 0_3 :  Task(state='completed', worker_hostname='5e3c9efb87dd', received=True)

----------------- Zoo Shuffle Tasks -----------------
Task 0 :  Task(state='completed', worker_hostname='a7ea090e4453', received=True)

----------------- Zoo Reduce Tasks -----------------
Task 0_9_10_11_12 :  Task(state='completed', worker_hostname='c8865f541685', received=

In [13]:
cluster.local_monitoring.print_hdfs('jobs')

job_0/
  data.pickle
  map_func.pickle
  map_results/
    0.pickle
    1.pickle
    2.pickle
    3.pickle
  map_tasks/
    0.pickle
    1.pickle
    2.pickle
    3.pickle
  reduce_func.pickle
  reduce_results/
    0_1_2_3_4.pickle
    13_14_15_16.pickle
    5_6_7_8.pickle
    9_10_11_12.pickle
  shuffle_results/
    0.pickle
    1.pickle
    10.pickle
    11.pickle
    12.pickle
    13.pickle
    14.pickle
    15.pickle
    16.pickle
    2.pickle
    3.pickle
    4.pickle
    5.pickle
    6.pickle
    7.pickle
    8.pickle
    9.pickle


We can get the result of the computation using `.result()`

In [14]:
future.result()

[('1', 1),
 ('2', 1),
 ('3', 1),
 ('a', 4),
 ('d', 4),
 ('r', 6),
 ('s', 4),
 ('t', 4),
 ('w', 1),
 ('e', 5),
 ('f', 1),
 ('g', 4),
 ('i', 3),
 ('j', 2),
 ('k', 1),
 ('m', 1),
 ('o', 4)]

## Heavy-Load Computation

Let's submit the system to a lot of concurrent jobs and tasks. But, first we must scale the cluster. (Obviously, alive services are not impacted by this - they continue their tasks as normal)

In [15]:
cluster.scale(
    n_masters=3,
    n_workers=10
)

In [16]:
cluster.local_monitoring.print_zoo()


----------------- Zoo Masters -----------------
Master 79351b322643 :  MasterInfo(state='nothing')
Master 01c3a1488d10 :  MasterInfo(state='nothing')
Master 20e7f8650bd1 :  MasterInfo(state='nothing')

----------------- Zoo Workers -----------------
Worker 690436a17ca8 :  WorkerInfo(state='idle')
Worker fce6a7259310 :  WorkerInfo(state='idle')
Worker ff3814d135e7 :  WorkerInfo(state='idle')
Worker 5f34af5dd5d6 :  WorkerInfo(state='idle')
Worker 5e3c9efb87dd :  WorkerInfo(state='idle')
Worker bed77c297462 :  WorkerInfo(state='idle')
Worker a7ea090e4453 :  WorkerInfo(state='idle')
Worker f4b2c1266033 :  WorkerInfo(state='idle')
Worker dfe82c5cec86 :  WorkerInfo(state='idle')
Worker c8865f541685 :  WorkerInfo(state='idle')

----------------- Zoo Map Tasks -----------------
Task 0_0 :  Task(state='completed', worker_hostname='a7ea090e4453', received=True)
Task 0_1 :  Task(state='completed', worker_hostname='dfe82c5cec86', received=True)
Task 0_2 :  Task(state='completed', worker_hostname=

Let's create the same `data` list of strings but increase it in size a bit.

In [17]:
import random
import string

def generate_random_string(str_len):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for _ in range(str_len))

def generate_random_string_list(n, str_len):
    return [generate_random_string(str_len) for _ in range(n)]

# Generate 100k random strings of length 20
data = generate_random_string_list(n=100_000, str_len=20)

In [18]:
len(data)

100000

In [19]:
data[:10]

['ucaqdxmvfnytdbbxhndh',
 'rdzaemfankgtooejjfux',
 'ipfupzhhyeldvlyqersi',
 'cbcfpnwjzwjsrlrybgkt',
 'wwnpznpidueawznczqkz',
 'yzacoszsnnyzjadbjhpn',
 'jjaommdgpkqrnjbyymul',
 'bczuiizcoabxsclmuoim',
 'iioabdvnveordbfwbcyr',
 'lhwmzjoqzuvguwfcrahq']

Now it is time to submit a few jobs. We will submit the same `data`, `map_func` and `reduce_func` for ease. We will submit 10 such jobs and we expect the system to handle them concurrently. Note that us, as `host`, must upload the data (*map func*, *reduce* func, and *data*) onto HDFS so this is why the following will not finish immediately.

In [20]:
futures = []

for i in range(10):
    print(f"Job {i+1} submitted.", end='\r')
    future = cluster.mapreduce(data, map_func, reduce_func, requested_n_workers=2)
    futures.append(future)

Job 10 submitted.

We will print the futures to see if they are `running` or `finished`.

In [25]:
for future in futures:
    print(future)

<Future at 0x7f8cbc900150 state=finished returned list>
<Future at 0x7f8cbc921090 state=finished returned list>
<Future at 0x7f8cbce1ba50 state=finished returned list>
<Future at 0x7f8c9e1f6050 state=finished returned list>
<Future at 0x7f8c9e1f65d0 state=finished returned list>
<Future at 0x7f8c9e1f6e90 state=finished returned list>
<Future at 0x7f8cbc903250 state=finished returned list>
<Future at 0x7f8cbc9233d0 state=finished returned list>
<Future at 0x7f8c9e1f7d90 state=finished returned list>
<Future at 0x7f8c9cffc810 state=finished returned list>


In [26]:
cluster.local_monitoring.print_zoo()


----------------- Zoo Masters -----------------
Master 79351b322643 :  MasterInfo(state='nothing')
Master 01c3a1488d10 :  MasterInfo(state='nothing')
Master 20e7f8650bd1 :  MasterInfo(state='nothing')

----------------- Zoo Workers -----------------
Worker 690436a17ca8 :  WorkerInfo(state='idle')
Worker fce6a7259310 :  WorkerInfo(state='idle')
Worker ff3814d135e7 :  WorkerInfo(state='idle')
Worker 5f34af5dd5d6 :  WorkerInfo(state='idle')
Worker 5e3c9efb87dd :  WorkerInfo(state='idle')
Worker bed77c297462 :  WorkerInfo(state='idle')
Worker a7ea090e4453 :  WorkerInfo(state='idle')
Worker f4b2c1266033 :  WorkerInfo(state='idle')
Worker dfe82c5cec86 :  WorkerInfo(state='idle')
Worker c8865f541685 :  WorkerInfo(state='idle')

----------------- Zoo Map Tasks -----------------
Task 0_0 :  Task(state='completed', worker_hostname='a7ea090e4453', received=True)
Task 0_1 :  Task(state='completed', worker_hostname='dfe82c5cec86', received=True)
Task 1_0 :  Task(state='completed', worker_hostname=

Let's inspect some random results.

In [27]:
futures[1].result()

[('a', 76795),
 ('b', 77274),
 ('c', 76862),
 ('d', 77362),
 ('e', 77463),
 ('f', 76822),
 ('g', 77391),
 ('h', 77010),
 ('i', 77213),
 ('j', 76717),
 ('k', 76443),
 ('l', 76504),
 ('m', 76669),
 ('n', 77162),
 ('o', 76691),
 ('p', 76984),
 ('q', 76731),
 ('r', 77029),
 ('s', 77251),
 ('t', 77328),
 ('u', 76474),
 ('v', 76705),
 ('w', 76546),
 ('x', 76599),
 ('y', 77197),
 ('z', 76778)]

In [28]:
#cluster.clear()

See all results per row.

In [29]:
for row in zip(*[future.result() for future in futures]):
    print(row)
    print()

(('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795), ('a', 76795))

(('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274), ('b', 77274))

(('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862), ('c', 76862))

(('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362), ('d', 77362))

(('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463), ('e', 77463))

(('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822), ('f', 76822))

(('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391), ('g', 77391))

(('h',

### Shutdown

Shutdown the cluster and cleanup the persistent HDFS.

In [None]:
cluster.shutdown_cluster(cleanup=True)