# MapReduce

Map-reduce is a programming model that has its roots in functional programming.  In addition to often producing short, elegant code for problems involving lists or collections, this model has proven to be very useful for large-scale highly parallel data processing. Here we will think of map and reduce as operating on lists for concreteness, but they are appropriate to any collection (sets, etc.).

- **Map** operates on a list of values in order to produce a new list of values, by applying the same computation to each value.

-  **Reduce** operates on a list of values to collapse or combine those values into a single value (or some number of values), again by applying the same computation to each value.






## Functional Programming in Python

Let us consider the following code first:


In [1]:
def double_everything_in(data):
    result = []
    for i in data:
        result.append(2 * i)
    return result

def quadruple_everything_in(data):
    result = []
    for i in data:
        result.append(4 * i)
    return result

In [2]:
double_everything_in([1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

In [3]:
quadruple_everything_in([1, 2, 3, 4, 5])

[4, 8, 12, 16, 20]

- The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.

- How can rewrite the code so that it avoids duplication?

In [4]:
def multiply_by_x_everything_in(x, data):
    result = []
    for i in data:
        result.append(x * i)
    return result

In [5]:
multiply_by_x_everything_in(2, [1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

In [6]:
multiply_by_x_everything_in(4, [1, 2, 3, 4, 5])

[4, 8, 12, 16, 20]

- Now consider the following code:

In [7]:
def squared(x):
    return x*x

def double(x):
    return x*2

def square_everything_in(data):
    result = []
    for i in data:
        result.append(squared(i))
    return result

def double_everything_in(data):
    result = []
    for i in data:
        result.append(double(i))
    return result

In [8]:
square_everything_in([1, 2, 3, 4, 5])

[1, 4, 9, 16, 25]

In [9]:
double_everything_in([1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

- The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.

- How can rewrite the code so that it avoids duplication?

### Using Functions as Parameters


In [10]:
def apply_f_to_everything_in(f, data):
    result = []
    for x in data:
        result.append(f(x))
    return result

In [11]:
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5])

[1, 4, 9, 16, 25]

In [12]:
apply_f_to_everything_in(double, [1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

### Lambda Expressions

- We can use anonymous functions so we don't have to define a function each time.

In [13]:
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])

[1, 4, 9, 16, 25]

### Python's `map` Function

- Python has a built-in function `map` which is much faster than our version.
- The `map(func, seq)` Python function applies the function func to all the elements of the sequence seq. It returns a new list with the elements changed by func.


In [15]:
list(map(lambda x: x*x, [1, 2, 3, 4, 5]))

[1, 4, 9, 16, 25]

In [6]:
def f(x):
    return x * x

rdd = [2, 6, -3, 7]
res = map(f, rdd )
res  # Res is an iterator

<map at 0x1712c922220>

In [2]:
# Use * to dereference the iterator
print(*res)

4 36 9 49


In [4]:
from operator import mul
rdd1, rdd2 = [2, 6, -3, 7], [1, -4, 5, 3]
res = map(mul, rdd1, rdd2 ) # element wise multiplication of rdd1 and rdd2 

print(*res)

2 -24 -15 21


### Implementing Reduce

- The `reduce` function is an example of a [fold](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29).

- There are different ways we can fold data.

- The following implements a *left* fold.


In [10]:
# f is the operation function, data is a list, and z is the initial operand
def foldl(f, data, z):
    if (len(data) == 0):
        print(z)
        return z
    else:
        head = data[0]
        tail = data[1:]
        print("Folding", head, "with", tail, "using", z)
        partial_result = f(z, data[0])
        print("Partial result is", partial_result)
        return foldl(f, tail, partial_result)  

In [11]:
def add(x, y):
    return x + y

foldl(add, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15


15

In [12]:
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15


15

In [13]:
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with [5] using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15


-15

In [14]:
(((((0 - 1) - 2) - 3) - 4) - 5)

-15

- Subtraction is neither [commutative](https://en.wikipedia.org/wiki/Commutative_property) nor [associative](https://en.wikipedia.org/wiki/Associative_property), so the order in which apply the fold matters:

In [15]:
(1 - (2 - (3 - (4 - (5 - 0)))))

3

In [24]:
# This is a right fold
def foldr(f, data, z):
    if (len(data) == 0):
        return z
    else:
        return f(data[0], foldr(f, data[1:], z))                

In [25]:
foldl(lambda x, y: x - y,  [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with [5] using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15


-15

In [26]:
foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

3

### Python's `reduce` Function.

- Python's built-in `reduce` function is a *left* fold.

In [17]:
from functools import reduce

reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])

15

In [18]:
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

-15

In [19]:
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with [5] using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15


-15

### Exercise 

- Write functions to compute n! using reduce

In [24]:
from functools import reduce

def factorial(n):
    return reduce(lambda x, y: x*y, list(range(1, n+1)), 1)


1

### Weighted mean and Variance

If the generator of random variable $X$ is discrete with probability mass function $x_1 \mapsto p_1, x_2 \mapsto p_2, \ldots, x_n \mapsto p_n$ then

$$\operatorname{Var}(X) = \left(\sum_{i=1}^n p_i x_i ^2\right) - \mu^2,$$

where $\mu$ is the average value, i.e.

$$\mu = \sum_{i=1}^n p_i x_i. $$

In [26]:
X = [5, 1, 2, 3, 1, 2, 5, 4]
P = [0.05, 0.05, 0.15, 0.05, 0.15, 0.2, 0.1, 0.25]

### Exercise 

- Write functions to compute the average value and variance using for loops

### Exercise 

- Write functions to compute the average value and variance using `map` and `reduce`

In [None]:
# Start your code here:




Note: Exercises above are just made to help to understand map-reduce process. This is a bad way to code a variance in Python. You should use Numpy instead.

## MapReduce Programming Model

In this model, users implement interface of two functions: map and reduce.

### Map

- Records from the data source (lines out offiles, rows of a database, etc) are fed intothe map function as key-value pairs: e.g., (filename, line).
- map() produces one or more intermediatevalues along with an output key from the input.





![MapReduce](images/map.jpg)

### Reduce

- After the map phase is over, all the intermediate values for a given output key are combined together into a list.
- reduce() combines those intermediate values into one or more final values for that same output key. Sort may be needed here.

![MapReduce](images/reduce.jpg)

![MapReduce](images/mapreduce.jpg)

### Parallelism

- map() functions run in parallel, creating different intermediate values from different input data sets.
- reduce() functions also run in parallel, each working on a different output key.
- All values are processed independently.
- Bottleneck: reduce phase can’t start until map phase is completely finished.

![MapReduce](images/mapreduce_scenarios.jpg)