# Notebook to apply MapReduce and move data to HDFS 

- In Pseudo-Distributed Mode;
- We go to work with dataset in <a href="https://grouplens.org/datasets/movielens/"> MovieLens </a> for this tutorial.
- I downloaded this dataset and put in repository, enjoy!
    - ml-100k.zip;
    - log_server.log.zip;
    - OrgulhoePreconceito.txt
    - amigos_facebook.csv
- Make download this dataset and move to HDFS.

## TASK 1: count how many movies by rating

### 1. Initializing HDFS and verify this

In [None]:
print("Initializing HDFS...")
!start-dfs.sh
print("Verifing if HDFS started:")
!jps

### 2. Initializing YARN and verify this

In [None]:
print("Initializing Yarn to gerenciate Jobs and Resources...")
!start-yarn.sh

### 3. Looking to HDFS

In [None]:
!hdfs dfs -ls /

### 4. Create directory to work with MapReduce

In [None]:
!hdfs dfs -mkdir /mapred

- Check if directory is created

In [None]:
!hdfs dfs -ls /

### 5. Unzip file "ml-100k.zip" e import file "u.data"
- First column is ID;
- Second column is ID_MOVIE;
- Third column is Rating;
- Fourth column is Timestamp.

In [None]:
!unzip ml-100k.zip

### 6. Put this file in hdfs

In [None]:
!cd ml-100k & hdfs dfs -put u.data /mapred
!hdfs dfs -ls /mapred

### 7. Applying MapReduce to count how many movies by rating

- generate file with code in python and execute in cluster hadoop

In [None]:
# If you don't have mrjob package, install with command below
# !pip install mrjob
# configuration file mrjob: '/home/hadoop/.mrjob.conf' if need

In [None]:
%%writefile MovieEvaluateMR.py
from mrjob.job import MRJob

class MovieEvaluateMR(MRJob):
    def mapper(self, key, line):
        (ID, ID_MOVIE, rating, Timestamp) = line.split('\t')
        yield rating, 1
    
    def reducer(self, rating, occurences):
        yield rating, sum(occurences)
        
if __name__ == '__main__':
    MovieEvaluateMR.run()

- move file to dir /movie_evaluate

In [None]:
!mv MovieEvaluateMR.py ./movie_evaluate/

- Execute job MapReduce

In [None]:
!python /filePy/MovieEvaluateMR.py hdfs:///mapred/u.data -r hadoop > /movie_evaluate/result.txt

## TASK 2: avg friend by age

### 1. put this file in hdfs and check

In [None]:
!hdfs dfs -put amigos_facebook.csv /mapred
!hdfs dfs -ls /mapred
!jps

### 2. apply mapreduce to count avg friends by age

In [None]:
%%writefile FriendsAgeMR.py
from mrjob.job import MRJob

class FriendsAgeMR(MRJob):
    def mapper(self, _, line):
        (ID, name, age, nFriends) = line.split(',')
        yield age, float(nFriends)
    
    def reducer(self, age, nFriends):
        count = 0
        total = 0
        for x in nFriends:
            count += 1
            total += x
        
        yield age, (total / count)
        
if __name__ == '__main__':
    FriendsAgeMR.run()

- move file to dir /friends_by_age

In [None]:
!mv FriendsAgeMR.py ./friends_by_age/

- execute job mapreduce

In [None]:
!python ./friends_by_age/FriendsAgeMR.py hdfs:///mapred/amigos_facebook.csv -r hadoop > ./friends_by_age/result.txt

## TASK 3: count words (without pre process)

### 1. put this file in hdfs and check

In [None]:
!hdfs dfs -put OrgulhoePreconceito.txt /mapred
!hdfs dfs -ls /mapred
!jps

## 2. apply mapreduce to count words

In [None]:
%%writefile WordCount.py
from mrjob.job import MRJob

class WordCountMR(MRJob):
    
    def mapper(self, _, line):
        words = line.split()
        for word in words:
            yield word.lower(), 1
    
    def reducer(self, word, occurences):
        yield word, sum(occurences)
        
if __name__ == '__main__':
    WordCountMR.run()

- move file to word_count

In [None]:
!mkdir -p word_count
!mv WordCount.py ./word_count/

- execute job mapreduce

In [None]:
!python ./word_count/WordCount.py hdfs:///mapred/OrgulhoePreconceito.txt -r hadoop > ./word_count/result.txt

- result

In [None]:
!cat ./word_count/result.txt

## TASK 4: count words (with pre process)

In [None]:
%%writefile WordCountWithRegex.py
from mrjob.job import MRJob
import re

word_regex = re.compile(r"[\w']+")

class WordCountWithRegexMR(MRJob):
    
    def mapper(self, _, line):
        words = word_regex.findall(line)
        for word in words:
            yield word.lower(), 1
    
    def reducer(self, word, occurences):
        yield word, sum(occurences)
        
if __name__ == '__main__':
    WordCountWithRegexMR.run()

- move file to dir /word_count


In [None]:
!mkdir -p word_count
!mv WordCountWithRegex.py ./word_count/

- execute job mapreduce

In [None]:
!python ./word_count/WordCountWithRegex.py hdfs:///mapred/OrgulhoePreconceito.txt -r hadoop > ./word_count/result_with_regex.txt

- result

In [None]:
!cat ./word_count/result_with_regex.txt

## TASK 5: count words (with pre process and sort)

In [None]:
%%writefile WordCountWithRegexOrderMR.py
from mrjob.job import MRJob, MRStep
import re

# two map and reduce

word_regex = re.compile(r"[\w']+")

class WordCountWithRegexOrderMR(MRJob):
    
    def steps(self):
        return [
            MRStep(mapper = self.mapper_get_words, reducer = self.reducer_count_words),
            MRStep(mapper = self.mapper_make_counts_key, reducer = self.reducer_output_words)
        ]
    
    def mapper_get_words(self, _, line):
        words = word_regex.findall(line)
        for word in words:
            yield word.lower(), 1
    
    def reducer_count_words(self, word, occurences):
        yield word, sum(occurences)
        
    def mapper_make_counts_key(self, word, count):
        yield '%04d'%int(count), word

    def reducer_output_words(self, count, words):
        for word in words:
            yield count, word
   
        
if __name__ == '__main__':
    WordCountWithRegexOrderMR.run()

- move file to dir /word_count


In [None]:
!mkdir -p word_count
!mv WordCountWithRegexOrderMR.py ./word_count/

- execute job mapreduce

In [None]:
!python ./word_count/WordCountWithRegexOrderMR.py hdfs:///mapred/OrgulhoePreconceito.txt -r hadoop > ./word_count/result_with_regex_order.txt

- result

In [None]:
!cat ./word_count/result_with_regex_order.txt

# the end