# MrJob

MrJob è un framework MapReduce in grado di essere eseguito su:

 - Locally using multiprocessing
 - Hadoop Cluster
 - Amazon EMR
 
Si tratta di una potente astrazione fra cluster che si occupa di gestire il file system dell'infrastruttura e nodi che si occupano di processare i dati. Non si dovrà apportare nessuna modifica al codiche che scriveremo se passeremo dall'ambiente locale, a Hadoop o EMR.

MrJob può essere installato usando:
    
    $ pip install mrjob
    
nel vostro environment python.

## Protocols

Grazie ai **Protocols** implementati nel framework, sarà possibile in maniera semplice gestire l'I/O come richiesto Hadoop Streaming. Ogni qualvolta si creerà un processo MrJob si potrà andare a definire encode/decode dei dati attravarso i protocolli ``RawValueProtocol`` (che sfrutta lo standard dei protocolli Hadoop) o ``JSONProtocol`` che utilizza un JSON Encoding per rappresentare dati piu complessi rispetto a semplice testo.

## MrJob Steps


MrJob da la possibilità di creare steps multipli anzichè il classico singolo step MapReduce.

Lo step più classico è il cosidetto **combiner** che viene eseguito subito dopo il mapper ma prima del reducer. Questo step può combinare valori multipli emessi da diversi mapper in un singolo output. Solitamente è identico allo step del **reducer** ma viene eseguito solo un subset degli output dei mapper anzichè da tutti.


Ecco la lista di tutti i possibili steps:

### Main Steps ###
- mapper()
- combiner() --> permette su una stessa macchina inizia a fare l'operazione di reduce senza aspettare tutti
- reducer()

### Initialization ### servono per fare operazione prima della map reduce
- mapper_init()
- combiner_init()
- reducer_init()

### Finalization ### servono per fare operazione dopo la map reduce
- mapper_final()
- combiner_final()
- reducer_final()

### Filtering ###
- mapper_pre_filter()
- combiner_pre_filter()
- reducer_pre_filter()



# MrJob In Practice

Per creare un processo MrJob sarà necessario definire una classe che eredità da ``mrjob.job.MRJob`` e dichiarare i metodi necessari per lo step MapReduce. Ogni metodo deve *emettere* una coppia chiave, valore tramite l'uso di ``yield``.

Questo serve per far si che ogni step MapReduce non debba ricordare il valore emesso, funziona come generatore che restituisce ogni volta un nuovo valore allo step successivo ogni qual volta viene richiamato.

Siccome si sfrutta il paradigma *Hadoop Streaming* ogni processo MrJob dovrà essere un modulo python in grado di partire da linea di comando. In Python per realizzare uno script avviabile da terminale si può utilizzare la seguente best practice:

```
if __name__ == '__main__':
   do_something()
```

Grazie a questo snippet di codice si renderà eseguibile il fie (``__name__`` è uguale a __main__ solo quando lo script python .py è eseguito da linea di comando). Di conseguenza quando lo script verrà eseguito da linea di comando la funzione ``do_something()`` verrà eseguita.

Nel caso di un processo MrJob dovremo sostituire il *do_something()* con ``MrJobProcessName.run()`` per far partire il nostro processo.

La ragione di tutto questo è perchè il nostro file **.py** sarà copiato sul cluster Hadoop per essere eseguito e quindi deve poter essere eseguito come standalone software per essere compatibile con Hadoop Streaming.

# WordCount 

Ecco un primo esempio di WordCount in "pure python". Dato un testo contiamo le occorrenze di ogni parola.

In [None]:
my_text = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque molestie lacus a iaculis tempus.\
Nam lorem nulla, viverra non pulvinar ut, fermentum et tortor. Cras vitae libero sed purus venenatis posuere. \
Proin commodo risus augue, vitae suscipit lectus accumsan sit amet. Praesent eu erat sem. \
Pellentesque interdum porta libero, et ultrices nunc eleifend sit amet. \
In in mauris nec elit ullamcorper ultrices at ac ante. Suspendisse potenti. \
Aenean eu nisl in ante adipiscing imperdiet. Ut pulvinar lectus quis feugiat adipiscing. \
Nunc vulputate mauris congue diam ultrices aliquet. Nulla pharetra laoreet est quis vestibulum. \
Quisque feugiat pharetra sagittis. Phasellus nulla massa, sodales a suscipit blandit, facilisis eu augue. \
Cras mi massa, ullamcorper nec tristique at, convallis quis eros. \
Mauris non fermentum lacus, vitae tristique tellus. In volutpat metus augue, nec laoreet ante hendrerit vitae. \
Vivamus id lacus nec orci tristique vulputate.
'''

# Usiamo una Regular Expression per identificare le parole
import re
WORD_REGEXP = re.compile(r"[\w']+")

def wordCount(input, result):
    '''
    Funzione per calcolare le occorrenze di parole in un testo
    PARAM:
        input: Testo su cui cercare le parole
        result: Dizionario in cui scrivere il risultato
    '''
    # Cerchiamo tutte le words
    words = WORD_REGEXP.findall(input)
    # Per ogni Word
    for word in words:
        # Verificare se abbiamo gia contato occorrenze di quella parola
        if word not in result:
            # Parola non ancora analizzata-> Init chiave nel dizionario
            result[word] = 0
        # Somma dell'occorrenza per la parola specifica
        result[word] += 1

# Dizionario su cui scriveremo i risultati
result = {}   
# Chiamo la funzione
wordCount(my_text, result)
# Stampo per ogni parola il numero di occorrenze
for word, occurences in result.items():
    print word, occurences
    


Risolviamo lo stesso esercizio utilizzando MrJob (**Attenzione**: eseguire lo script tramite linea di comando)

In [None]:
# Importiamo la classe MrJob
from mrjob.job import MRJob
import re
WORD_REGEXP = re.compile(r"[\w']+")

# Inizializziamo la classe MRWordFreqCount che eredita da MRJob
class MRWordFreqCount(MRJob):
    
    # Dichiariamo il mapper
    def mapper(self, _, line):
        # Nel nostro caso non avremo una chiave ma solo un value, che corrispondera a una riga del testo in input
        words = WORD_REGEXP.findall(line)
        for word in words:
            # Per ogni parola restituiamo la parola in minuscolo e il numero di occorrenze, ovviamente 1
            yield word.lower(), 1

    # Dichiariamo il reducer
    def reducer(self, word, counts):
        # Per ogni parola contiamo il numero delle occorrenze
        yield word, sum(counts)


if __name__ == '__main__':
    try:
        MRWordFreqCount.run()
    except TypeError:
        print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'
        


# Starting MrJob

Supponendo di aver salvato il precedente script in un file **wordcount.py** si può eseguire lo script da terminale:

```
$ python wordcount.py lorem.txt
```

Dove lorem.txt è un file di testo contenente il testo usato in input:

```
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque molestie lacus a iaculis tempus. Nam lorem nulla, viverra non pulvinar ut, fermentum et tortor. Cras vitae libero sed purus venenatis posuere. Proin commodo risus augue, vitae suscipit lectus accumsan sit amet. Praesent eu erat sem. Pellentesque interdum porta libero, et ultrices nunc eleifend sit amet. In in mauris nec elit ullamcorper ultrices at ac ante. Suspendisse potenti. Aenean eu nisl in ante adipiscing imperdiet. Ut pulvinar lectus quis feugiat adipiscing.
Nunc vulputate mauris congue diam ultrices aliquet. Nulla pharetra laoreet est quis vestibulum. Quisque feugiat pharetra sagittis. Phasellus nulla massa, sodales a suscipit blandit, facilisis eu augue. Cras mi massa, ullamcorper nec tristique at, convallis quis eros. Mauris non fermentum lacus, vitae tristique tellus. In volutpat metus augue, nec laoreet ante hendrerit vitae. Vivamus id lacus nec orci tristique vulputate.
```

Quando si esegue lo script vedremo qualcosa del tipo:

```
Creating temp directory /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161010.152629.080892
Running step 1 of 1...

```

E l'output finale:

```
Streaming final output from /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161010.152629.080892/output...
"a"	2
"ac"	1
"accumsan"	1
"adipiscing"	3
"aenean"	1
"aliquet."	1
```

Da notare che MRJob si occupa di copiare input e output dei vari step all'interno di cartelle temporanee (in questo caso sul mio pc siccome non ho eseguito lo script su un cluster hadoop) e poi di rispondere con l'output.

Si può inoltre notare che l'encoding dell'output è nel formato Hadoop Streaming, le parole e il numero di occorrenze sono separate fra loro da un carattere TAB "\t"

## Aiuto

Per salvare in automatico il risultato all'interno di un file possiamo usare il semplice redirect shell:

```
$ python wordcount.py lorem.txt > result
```

# Cosa può essere emesso?

Al momento stiamo contando le parole, cioè il classico esempio per spiegare come funziona MapReduce, ma cosa capita se vogliamo emettere un output diverso che non derivi strettamente dall'input?

MapReduce non fa nessuna assunzione sul dato emesso, ne le chiavi ne i valori devono essere correlati all'input in nessun modo. Per esempio, il mapper può restituire qualsiasi dato esso voglia.

Quindi, se vogliamo calcolare qualche statistica sul test come parole, caratteri e frasi, possiamo farlo molto semplicemente:

In [None]:
from mrjob.job import MRJob
import re
WORD_REGEXP = re.compile(r"[\w']+")

class MRTextInfo(MRJob):
    def mapper(self, _, line):
        for phrase in line.split('.'):
            yield 'phrases', 1
            words = WORD_REGEXP.findall(phrase)
            for word in words:
                yield 'words', 1
                yield 'characters', len(word)

    def reducer(self, key, counts):
        yield key, sum(counts)


if __name__ == '__main__':
    try:
        MRTextInfo.run()
    except TypeError:
        print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'

L'output sarà il seguente:

Creating temp directory /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161010.153948.901276

Running step 1 of 1...

Streaming final output from /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161010.153948.901276/output...

    "characters"	791
    "phrases"	39
    "words"	140

Removing temp directory /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161010.153948.901276...




## MultiStep Jobs

Ci sono casi in cui conviene eseguire step multipli su un singolo input per fornire l'output desiderato.

Se si vogliono creare step multipli, anzichè creare un metodo mapper e reducer, si andrà a specificare quali step eseguire tramite il metodo ``steps()`` che verrà chiamato in automatico da MrJob per eseguire metodi custom invece che i due classici utilizzati in precedenza.

Nel prossimo esempio creeremo un processo MrJob per scoprire le parole piu frequenti all'interno del testo. Il primo step sarà esattamente identico a quello del wordcounter, successivamente filtreremo le parole per selezionare la piu usata.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_REGEXP = re.compile(r"[\w']+")


class MRMostFreqWord(MRJob):
    # Dichiaro i vari Step
    # Una lista di oggetti MRStep()
    def steps(self):
        return [
            MRStep(mapper=self.mapper_wordcount,
                    reducer=self.reducer_wordcount),
            MRStep(mapper=self.mapper_freq,
                    reducer=self.reducer_freq),
            MRStep(mapper=self.mapper_most,
                    reducer=self.reducer_most)
        ]

    def mapper_wordcount(self, _, line):
        words = WORD_REGEXP.findall(line)
        # Conto le parole di lunghezza maggiore a 3
        for w in words:
            if len(w)>3:
                yield w.lower(), 1

    def reducer_wordcount(self, word, counts):
        yield word, sum(counts)  # Conto le occorrenze di ogni parola per ricavare la frequenza

    def mapper_freq(self, word, total):
        if total > 1:  # Valuto solo le parole che compaiono piu di una volta
            yield total, word  # Le raggruppo per numero di occorrenze

    def reducer_freq(self, total, words):
        # Avra' all'interno di total il numero di occorrenze
        # e all'interno di words un generatore che resituira le singole parole
        yield total, words.next()  # .next() restituisce il primo elemento, emetto quindi una sola parola per ogni frequenza

    def mapper_most(self, freq, word):
        yield 'most_used', [freq, word]  # Raggruppo le parole per una lista di tuple (frequenza, parola)

    def reducer_most(self, _, freqs):
        yield 'most_used', max(freqs)  # restituisco il max della lista di tuple
        # in questo caso il max verra calcolato su tutti i primi elementi delle tuple
        # di conseguenza il max() dara come risultato la frequenza piu alta e la parola associata


if __name__ == '__main__':
    try:
        MRMostFreqWord.run()
    except TypeError:
        print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'

Siccome stiamo eseguendo uno processo multi step avremo un output del tipo:

    Creating temp directory /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/script.alexcomu.20161011.132906.601852
    Running step 1 of 3...
    Running step 2 of 3...
    Running step 3 of 3...

E avremo come risultato:

```
"most_used"	[4, "vitae"]
```


# Let's Play! 

### Esercizio 1

Utilizzando il file **lorem.txt** calcolare quante parole iniziano per ciascuna lettera dell'alfabeto.

### Esercizio 2

Utilizzando il file **lorem.txt** calcolare quante parole iniziano per ciascuna lettera dell'alfabeto e contare il max e il min.

### Esercizio 3

Scrivere un processo MapReduce che restituisca un report delle parole piu frequenti raggruppati per lunghezza della parola.

Esempio:

```
5 Caratteri -> hello -> 8 occorrenze
3 Caratteri -> cat -> 4 occorrenze
2 Caratteri -> at -> 7 occorrenze
```

# Eseguire job programmaticamente

Eseguire i job da linea di comando è un ottimo modo per testare i nostri algoritmi, molto spesso però è necessario visualizzare i dati calcolati. Dobbiamo quindi essere in grado di eseguire job MapReduce direttamente dal nostro software, gestire quindi input / output e inviare gli eventuali risultati al layer HTML per la visualizzazione.

Tutto questo è fattibile tramite i **MrJob Runners**, i quali permettono l'esecuzione programmatica di job MapReduce. Attenzione sempre a mantenere in due file separati **.py** il processo MrJob e il runner.

Quindi il runner sarà all'interno della nostra soluzione software, mentre la classe MrJob sarà un modulo separato che potrà essere inviato ad Hadoop per l'esecuzione.

Ecco un esempio di runner per il nostro WordFreqCount:



In [None]:
from wordcount import MRWordFreqCount

mr_job = MRWordFreqCount()
mr_job.stdin = open('lorem.txt').readlines()

with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        print 'Word:', key, 'Count:', value


Il parametro ``stdin`` verrà utilizzato per immagazzinare i file di input.
Siccome Hadoop gestisce l'input come linee separate abbiamo bisogno di passare una lista di stringhe, motivo per cui usiamo il metodo **readlines()**.

Il runner metterà a disposizione il metodo ``stream_output``, il quale restituirà un generatore che ritornerà come singolo elemtno un output Hadoop Streaming. L'output dovrà essere analizzato secondo il protocollo, quindi dovremo chiamare la funzione ``parse_output_line`` **chiave** e **valore**.

A questo punto potremo utilizzare i valore come meglio preferiamo, nel nostro caso li stiamo stampando in output.
Si può notare che inoltre l'output non avrà nessun log derivato da MrJob ma sarà solo il risultato finale dell'operazione.

Quindi se faremo partire il runner senza stampare nulla non avremo nessun output.

# Esercizio - Atleti olimpiadi


Struttura del dataset:

    age,birthdate,gender,height,name,weight,gold_medals,silver_medals,bronze_medals,total_medals,sport,country

Esempio di record:

    17,1996-04-12,Male,1.72,Aaron Blunck,68,0,0,0,0,Freestyle Skiing,United States
    
## Esercizio 4

Utilizzando il dataset, calcolare per ogni età il numero di atleti.

## Esercizio 5

Partendo dall'esercizio precedente, restituire come output l'eta' con il numero maggiore di occorrenze.

## Esercizio 6

Calcolare il numero di uomini e donne per ogni eta'.

## Esercizio 7

Calcolare per ogni nazione il numero di medaglie totali e per ciascuna tipologia, rispettivamente per oro, argento e bronzo. Successivamente stampare in output la nazione con piu medaglie in totale.



# Exercise - MovieData DB

* Download MovieData DB (100K) from http://grouplens.org/datasets/movielens/

I already downloaded the dataset inside the folder ``/PATH/TO/MRJOB/ROOT/examples/_dataset``. Unzip the folder and let's start!

We have (from ``u.info`` file):

    943 users
    1682 films
    100000 ratings
    
We'll use the file u.data which contains (splitted by TAB):

    user id | film id | rating | timestamp
       299     144        4      877881320


### Exercise 8 - Rating Counter

Count occurrences of rating value from movie DB.

### Exercise 9 - Most Rated Movie

Count occurrences of each movie rating from movie DB and find the most rated.

### Exercise 10 - Quick Lookup

Add to the exercise 2 the information about the movie. (use file ``u.item``)



# Exercise - Fake Friends DB

Inside folder ``/PATH/TO/MRJOB/ROOT/examples/_dataset`` you will find a csv file called ``fakefriends.csv``. Inside this file there is fake list of users with the relative friends. This is the format:

    ID, Name, Age, Number of Friends
    0,Will,33,385

### Exercise 11 - User with max friends

Find the user which has the MAX number of friends and to the same for the MIN.

### Exercise 12 - Friends Avarage per Age

Calculate For each Age the Avarage of friends.