<a href="https://colab.research.google.com/github/namitakalra-google/Big-Data-Workshop/blob/main/%5BLab%5D_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><h1>Introduction to Map Reduce</h1></center>

### Overview

1. Recap of functional programming in Python
2. Python's `map` and `reduce` functions
3. Writing parallel code using `map`
4. The Map-Reduce programming model

## Functional programming

Write code for the following functions that take in a list of numbers & return a list with each number multiplied with -

a) 2

b) 4

In [None]:
def double_everything_in(data):
    ## write your code here ##
    return

def quadruple_everything_in(data):
    ## write your code here ##
    return

In [None]:
double_everything_in([1, 2, 3, 4, 5])

In [None]:
quadruple_everything_in([1, 2, 3, 4, 5])

### DRY - Fundamental Programming Concept

- The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.

- How can rewrite the code so that it avoids duplication?

In [None]:
##### Fill your code here #######

- Now consider the following code that returns -

a) Each number squared

b) Each number doubled

In [None]:
## returns square of the given number "x"
def squared(x):
    ## fill your code here ##
    return

## returns double of the given number "x"
def double(x):
    ## fill your code here ##
    return

## returns a list of squared numbers from "data"
def square_everything_in(data):
    ## fill your code here ##
    return

## returns a list of doubled numbers from "data"
def double_everything_in(data):
    ## fill your code here ##
    return

In [None]:
square_everything_in([1, 2, 3, 4, 5])

In [None]:
double_everything_in([1, 2, 3, 4, 5])

### DRY - Fundamental Programming Concept
- The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.

- How can rewrite the code so that it avoids duplication?

- Hint: Functions can be passed to other functions as values.


In [None]:
def apply_f_to_everything_in(f, data):
  ## fill your code here ##
  return

### Lambda expressions

- We can use anonymous functions to save having to define a function each time we want to use map.

In [None]:
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])

# Python's `map` function

- Python has a built-in function `map` which is much faster than our version.



In [None]:
map(lambda x: x*x, [1, 2, 3, 4, 5])

<map at 0x7b51a987bb50>

In [None]:
list(map(lambda x: x*x, [1, 2, 3, 4, 5]))

[1, 4, 9, 16, 25]

## Implementing reduce

- The `reduce` function is an example of a [fold](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29).

- There are different ways we can fold data.

- The following implements a *left* fold.


In [None]:
def foldl(f, data, z):
    if (len(data) == 0):
        print (z)
        return z
    else:
        head = data[0]
        tail = data[1:]
        print ("Folding", head, "with", tail, "using", z)
        partial_result = f(z, data[0])
        print ("Partial result is", partial_result)
        return foldl(f, tail, partial_result)

In [None]:
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15


15

## Python's `reduce` function.

- Python's built-in `reduce` function is a *left* fold.

In [None]:
from functools import reduce
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])

15

# Functional programming and parallelism

- Functional programming lends itself to [parallel programming](https://computing.llnl.gov/tutorials/parallel_comp/#Models).

- The `map` function can easily be parallelised through [data-level parallelism](https://en.wikipedia.org/wiki/Data_parallelism),
    - provided that the function we supply as an argument is *free from* [side-effects](https://en.wikipedia.org/wiki/Side_effect_%28computer_science%29)
        - (which is why we avoid working with mutable data).

- We can see this by rewriting it so:


In [None]:
def perform_computation(f, result, data, i):
    print ("Computing the ", i, "th/st/nd result...")
    # This could be scheduled on a different CPU
    result[i] = f(data[i])

def my_map(f, data):
    result = [None] * len(data)
    for i in range(len(data)):
        perform_computation(f, result, data, i)
    # Wait for other CPUs to finish, and then..
    return result

In [None]:
my_map(lambda x: x * x, [1, 2, 3, 4, 5])

Computing the  0 th/st/nd result...
Computing the  1 th/st/nd result...
Computing the  2 th/st/nd result...
Computing the  3 th/st/nd result...
Computing the  4 th/st/nd result...


[1, 4, 9, 16, 25]

## A multi-threaded `map` function

In [None]:
from threading import Thread

def schedule_computation_threaded(f, result, data, threads, i):
    # Each function evaluation is scheduled on a different core.
    def my_job():
        print ("Processing data:", data[i], "... ")
        result[i] = f(data[i])
        print ("Finished job #", i)
        print ("Result was", result[i])
    threads[i] = Thread(target=my_job)

## Use this schedule_computation_threaded function to parallelise the above `my_map` function. Let's call the parallel function - `my_map_multithreaded`

In [None]:
def my_map_multithreaded(f, data):
  ## fill your code here ##
  return

In [None]:
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])

## Map Reduce

- Map Reduce is a _programming model_ for scalable parallel processing.
- Scalable here means that it can work on big data with very large compute clusters.
- There are many implementations: e.g. Apache Hadoop and Apache Spark.
- We can use Map-Reduce with any programming language:
    - Hadoop is written in Java
    - Spark is written in Scala, but has a Python interface.
- *Functional programming* languages such as Python or Scala fit very well with the Map Reduce model:
    - However, we don't *have* to use functional programming.

## Word Count Example

- In this simple example, the input is a set of URLs, each record is a document.

- Problem: compute how many times each word has occurred across data set.

## Word Count: Map


- The input to $\operatorname{map}$ is a mapping:

- Key: URL
- Value: Contents of document

$\left< document1, to \; be \; or \; not \; to \; be \right>$  
    

- In this example, our $\operatorname{map}$ function will process a given URL, and produces a mapping:

- Key: word
- Value: 1

- So our original data-set will be transformed to:
  
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$
  $\left< or, 1 \right>$
  $\left< not, 1 \right>$
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$

In [None]:
## Add code for "Map" phase here

## Word Count: Shuffle

Description: Make sure the values for the same key are grouped.

Output to be equivalent to -
$\left< to, [1, 1] \right>$
  $\left< be, [1, 1] \right>$
  $\left< or, 1 \right>$
  $\left< not, 1 \right>$

In [None]:
###### Add code for "Shuffle" here ######

## Word Count: Reduce


Description: Reduce values to sum of counts.

Output to be equivalent to -
$\left< to, 2 \right>$
  $\left< be, 2 \right>$
  $\left< or, 1 \right>$
  $\left< not, 1 \right>$
  

In [None]:
###### Add your code for "Reduce" here #######

# Map Reduce in Hadoop

In [None]:
#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace from `line`
    """
    PLACEHOLDER - 1
    """

    # split the line into "words"
    """
    PLACEHOLDER - 2
    """

    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # "word" with tab-delimited "word count"; the trivial word count is 1
        """
        PLACEHOLDER - 3
        """

## Test your Mapper

`echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py`

In [None]:
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py into "word" & "count" variables
    """
    PLACEHOLDER - 4
    """

    # typecast count (currently a string) to int
    try:
        """
        PLACEHOLDER - 5
        """
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

## Running your MapReduce implementation in Hadoop

```
hadoop jar hadoop-streaming-2.7.3.jar \
-input <input_file> \
-output <output_location> \
-mapper mapper.py \
-reducer reducer.py
```