<a href="https://colab.research.google.com/github/mas-oliveira/SIC-Machine-Learning/blob/main/lab2/SPBD_Labs_mapreduce1_exercise_solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Install Hadoop on Google Colab
!curl -s https://raw.githubusercontent.com/smduarte/spbd-2324/main/lab1/install_hadoop.sh | bash

# Python MapReduce Exercise

In the notebook, you should create a map-reduce program that counts the number of occurrences of each word.

In this exercise, hadoop runs in standalone mode and reads data from the local filesystem.


### Download the dataset

In [2]:
!wget -q -O os_maias.txt https://www.dropbox.com/s/n24v0z7y79np319/os_maias.txt?dl=0

## WordCount Example
Read the words from input and count the number of occurrences of each word.


### Mapper
Complete with the code for the mapper.

In [3]:
%%file mapper_words.py
#!/usr/bin/env python

# import sys
import sys
# import string library function
import string

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # remove punctuation characters
    line = line.translate(str.maketrans('', '', string.punctuation+'«»'))
    # split the line into words
    words = line.split()
    for w in words:
        print('%s\t1' % w)

Writing mapper_words.py


### Reducer

In [4]:
%%file reducer_words.py
#!/usr/bin/env python

import sys

lastWord = None
lastCounter = 0;

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    count = int(count)

    if word != lastWord:
        if lastWord:
            print('%s\t%d' % (lastWord, lastCounter))
        lastWord = word
        lastCounter = count
    else:
        lastCounter += count

if lastWord:
    print('%s\t%d' % (lastWord, lastCounter))

Writing reducer_words.py


### Hadoop standalone mode execution


The output directory needs to be cleared...

In [5]:
!rm -rf results_words

#### Submitting the job

The _hadoop_ command is used to submit the mapreduce job to the cluster...

In [6]:
!hadoop jar /usr/local/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-*streaming*.jar -files mapper_words.py,reducer_words.py -mapper mapper_words.py -reducer reducer_words.py -input os_maias.txt -output results_words

2023-09-20 14:50:57,392 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-09-20 14:50:57,703 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-09-20 14:50:57,703 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-09-20 14:50:57,740 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-09-20 14:50:58,332 INFO mapred.FileInputFormat: Total input files to process : 1
2023-09-20 14:50:58,383 INFO mapreduce.JobSubmitter: number of splits:1
2023-09-20 14:50:59,155 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1995652993_0001
2023-09-20 14:50:59,155 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-09-20 14:51:00,037 INFO mapred.LocalDistributedCacheManager: Localized file:/content/mapper_words.py as file:/tmp/hadoop-root/mapred/local/job_local1995652993_0001_d5719a48-1e5f-4404-8862-ce1203f26366/mapper_words.py
2023-09-20 14:51:00,083 INFO mapred.LocalDistri

#### Checking the results
The result is stored in directory results.

In [7]:
!cat results_words/part-*

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
praticar	1
praticas	1
pratico	8
praticável	1
prato	23
pratos	10
praxes	1
prazer	25
prazeres	3
praça	18
praças	2
precata	1
precaução	2
precauções	1
preceito	1
preceptor	2
preciosa	5
preciosamente	3
preciosidade	2
preciosidades	4
precioso	3
precipitada	1
precipitadamente	6
precipitandose	2
precipitara	1
precipitarse	1
precipitava	1
precipitação	1
precipitouse	4
precisa	10
precisado	1
precisamente	2
precisamos	3
precisando	1
precisas	2
precisava	9
precisavam	3
precisavase	1
preciso	8
precisos	2
precisou	1
precisão	3
precoce	1
preconceitos	1
precursor	1
predestinado	1
prefere	1
preferem	1
preferia	9
preferiam	1
preferido	1
preferiria	4
preferiu	1
preferível	5
prefiro	3
prega	2
pregada	3
pregadas	1
pregados	1
pregalhe	1
pregando	2
pregar	3
pregas	6
pregavam	1
preghiera	1
prego	6
pregou	1
preguiça	9
preguiçando	2
preguiças	2
preguiçavam	1
preguiçosa	4
preguiçosamente	7
preguiçoso	1
pregões	1
prejudicarao	1
prejudicava	1

## Sorting
The results are not sorted. Let's sort them by frequency (the words with higher occurrence first).

### Mapper
Complete with the code for the mapper.

In [8]:
%%file mapper_sort.py
#!/usr/bin/env python

# to be completed

# import sys
import sys

max=10000

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into word and frequency
    word, freq = line.split('\t', 1)
    # output the frequency as the key and the word as the value
    print('%04d\t%s' % (max-int(freq), word))

Writing mapper_sort.py


### Reducer

In [9]:
%%file reducer_sort.py
#!/usr/bin/env python

# import sys
import sys

max=10000

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into word and frequency
    freq, word = line.split('\t', 1)
    # print the word and frequency (sthe secret is that the mapper gets the keys sorted by increasing order)
    print('%s\t%d' % (word, max-int(freq)))

Writing reducer_sort.py


### Hadoop standalone mode execution


The output directory needs to be cleared...

In [10]:
!rm -rf results_sort

#### Submitting the job

The _hadoop_ command is used to submit the mapreduce job to the cluster...

Note that the results from previous map reduce step are going to be the input for the sorting step.

In [11]:
!hadoop jar /usr/local/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-*streaming*.jar -files mapper_sort.py,reducer_sort.py -mapper mapper_sort.py -reducer reducer_sort.py -input results_words/part-* -output results_sort

2023-09-20 14:51:06,183 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-09-20 14:51:06,365 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-09-20 14:51:06,365 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-09-20 14:51:06,392 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-09-20 14:51:06,692 INFO mapred.FileInputFormat: Total input files to process : 1
2023-09-20 14:51:06,722 INFO mapreduce.JobSubmitter: number of splits:1
2023-09-20 14:51:07,038 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local844281911_0001
2023-09-20 14:51:07,038 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-09-20 14:51:07,482 INFO mapred.LocalDistributedCacheManager: Localized file:/content/mapper_sort.py as file:/tmp/hadoop-root/mapred/local/job_local844281911_0001_c5961013-b57e-43eb-870f-ce5c62088cfd/mapper_sort.py
2023-09-20 14:51:07,518 INFO mapred.LocalDistribute

#### Checking the results
The result is stored in directory results_sort.

In [12]:
!head -10 results_sort/part-*

de	8311
a	6736
o	6615
que	4986
e	4533
um	3026
com	2794
do	2571
da	2202
uma	2170
