# Metody Programowania Równoległego
## Temat: Map Reduce

**Wykonał: Paweł Kruczkiewicz**

## Algorytm sekwencyjny
### Implementacja

W celu efektywnej implementacji użyto biblioteki `smart_open` stworzonej specjalnie do przetwarzania danych z EC2. Użyty kod:

```python
#!/usr/bin/env python
"""word_count_whole_file.py"""

import sys
import time
from smart_open import open

if __name__=="__main__":
    for filename in ["s3://mpr-balis/gutenberg-1G.txt","s3://mpr-balis/gutenberg-5G.txt","s3://mpr-balis/gutenberg-10G.txt"]:
        for i in range(4):
            word_count = {}
            time1 = time.time()
            
            for line in open('s3://mpr-balis/gutenberg-1G.txt', encoding="latin-1"):
                words = line.strip().split()
                for word in words:
                    if word not in word_count:
                        word_count[word] = 0
                    word_count[word] += 1
                
            time_elapsed = round(time.time() - time1, 2)
            print(f"{i+1}\t{filename}\t{time_elapsed}[s]")
```


### Konfiguracja

Powyższy kod dla 3 typów wykonano na maszynie EC2 o typie instancji `m4.large`.

![ec2](ec2_instance_type.png)


Wyniki przedstawiono w dalszej części sprawozdania.


## Algorytm z użyciem paradygmatu *Map Reduce*


### Implementacja

Użyto standardowej implementacji  `mapper.py`

```python
#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
```

oraz `reducer.py`

```python

#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
    
```

### Konfiguracja

Powyższy kod zastosowano w paradygmacie `Map Reduce` na maszynie `EMR` w `Amazon Cloud Service`. Dokładne konfiguracje zostały opisane niżej.

Aby uruchomić i sprawdzić powyższy kod *dla każdego klastra*:
  1. Przesłano plik `gutenberg-1g.txt` za pomocą `scp`.
  2. Rozpakowano go i wrzucono na hadoopa za pomocą komend `hdfs dfs -touchz` oraz `hdfs dfs -appendFileTo`.
  3. Stworzono pliki `mapper.py` oraz `reducer.py` z treścią jak wyżej.
  4. Zamieszczono i użyto skryptu `script.sh` przechwytującego `time` do pliku `time.txt`. 
  5. Przeanalizowano plik `time.txt` i dodano odpowiednie linie do pliku `result.csv` zawierającego wyniki eksperymentu w formie pliku `csv`.

Plik `script.sh`
```bash
for size in 1 5 10
do
        input="books-input/${size}g.txt"
        for test_case_num in {1..3}
        do
                { time hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
                                -files mapper.py,reducer.py\
                                 -mapper mapper.py -reducer reducer.py \
                                -input ${input} -output books-output ; } 2>>time.txt
                hdfs dfs -rm -r books-output
        done
done
```

Wielkość konfiguracji to użyte sumarycznie max 12 core'ów.

#### Pierwsza konfiguracja

Pierwsza konfiguracja to **3 x 4 core'y** (3 instancje typu `m4.xlarge`).

![emr_1](EMR_1.png)

#### Druga konfiguracja

Druga konfiguracja to **6 x 2 core'y** (6 instancji typu `m4.large`).

![emr_2](EMR_2.png)

## Wyniki

### Plik CSV

Dane z poszczególnych plików time przeanalizowano i dodano do pliku `result.csv`.

In [8]:
import pandas as pd
import matplotlib.pyplot as plt

results = pd.read_csv("result.csv")
results.head()

Unnamed: 0,nCores,confId,dataSize,time
0,1,0,1,90.68
1,1,0,1,89.56
2,1,0,1,89.83
3,1,0,1,89.96
4,1,0,5,466.19


Dane następnie zgrupowano wg typu konfiguracji (`confId`) oraz wielkości danych (`time`).  

In [15]:
grouped_res = results.groupby(['confId', 'dataSize'])
mean_res = grouped_res.mean()

mean_res

Unnamed: 0_level_0,Unnamed: 1_level_0,nCores,time
confId,dataSize,Unnamed: 2_level_1,Unnamed: 3_level_1
0,1,1.0,90.0075
0,5,1.0,467.306667
0,10,1.0,953.816667
1,1,3.0,228.81
2,1,6.0,225.45


In [16]:
std_res = grouped_res.std()
std_res

Unnamed: 0_level_0,Unnamed: 1_level_0,nCores,time
confId,dataSize,Unnamed: 2_level_1,Unnamed: 3_level_1
0,1,0.0,0.478287
0,5,0.0,1.664702
0,10,0.0,9.075992
1,1,,
2,1,,


Następnie policzono speedup w zależności od użytych core'ów dla wszystkich trzech konfiguracji. Jako wartość bazowa (w mianowniku) posłużył *czas wykonania algorytmu sekwencyjnego na ec2*

In [24]:
#wyiczenie speedupu

s1 = mean_res.values[0][1]
s5 = mean_res.values[1][1]
s10 = mean_res.values[2][1]


953.8166666666666

In [None]:
# Dodanie kolumny ze speedupem

### Wykresy

#### W zależności od wielkości danych

# wykres dla 1 Gb

# wykres dla 5 GB

# wykres dla 10 GB

### W zależności od konfiguracji

#### Konfiguracja sekwencyjna

#### Konfiguracja równoległa

### Komentarz

COST wyszedł na mocną niekorzyść. EC2 poradziło sobie zdecydowanie lepiej. Spowodowane najprawdopdobniej dodatkowym nakładem na synchronizację danych. Można jedynie zauważyć, że confi