<center><h1>BUSS6002 - Data Science in Business</h1></center>

#### Pre-Tutorial Checklist

1. Complete all tasks from week 9
2. Read up to exercise 1
3. Install mockr library


# Tutorial 10 - MapReduce


## What is MapReduce? 

MapReduce is a programming model that allows us to perform calculations in parallel rather than in series, so that instead of requiring a supercomputer to iterate through all calculations extremely fast, we can instead have lots of regular processors working on parts of a problem at the same time. MapReduce is composed of two steps. The Mapping phase breaks the problem into smaller independent calculations that can be solved individually and maps them to queues. The Reducing phase combines the outcomes of these smaller computations to arrive at a solution for the original task. MapReduce is most often used for processing data, however it is flexible enough that it can be used for more complex tasks such as linear regression, which we will demonstrate.

A MapReduce program consists of two primary steps:

**Map**, performs filtering and sorting into queues

**Reduce**, performs a summary operation on a queue

<img src="img/mapreduce.gif">

*Image sourced from https://www.ibm.com/developerworks/cloud/library/cl-openstack-deployhadoop/*

## Why Do We Need MapReduce?

As the volume of data acquired by businesses and individuals continues to grow it becomes problematic to store and process this data rapidly enough for useful insights. Most naive algorithms cannot deal with such volumes of data, either due to requiring too much computation time or memory.

The "divide and conquer" style of MapReduce has the ability to resolve this issue. By carefully decomposing the problem we can distribute the processing load over many machines. This is known as horizontal scaling. This distrubuted computing style is more robust as we are no longer reliant on one large computer for our processing. MapReduce implementations can be designed in a fault tolerant way so that machines can be disabled or disconnected without affecting results. Additionally the hardware requirements of each machine is reduced.

## MapReduce in Python

There are many MapReduce libraries in Python. We will use "mockr", which is a very simple library.

mockr allows us to write our map and reduce functions in normal Python. You can install it via:

    pip install mockr
    
<div style="margin-bottom: 0px;"><img width=20 style="display: block; float: left;  margin-right: 20px;" src="img/docs.png"> <h3 style="padding-top: 0px;">Documentation - mockr</h3></div>
https://pypi.org/project/mockr/
    

## Example: Counting Words

Lets start by using the canonical MapReduce example of counting words in a corpus.

First we need to define the map and reduce functions. Then we can run the mapreduce algorithm to count the words.

In [3]:
import re
from mockr import run_stream_job

# This regular expression matches words
# Test it at https://www.regexpal.com
WORD_RE = re.compile(r"[\w']+")

def map_fn(chunk):
    # Use the regex to find all words in each chunk
    # The chunk is a line of text because we are
    # using run_stream_job
    for word in WORD_RE.findall(chunk):
        # Emit a result using the word as the key and
        # the number of times it occured. We emit once
        # for each word so this value is 1.
        yield (word.lower(), 1)

def reduce_fn(key, values):
    # Recieves all the values for each key (unique word)
    # then sums them together for the total count
    yield (key, sum(values))

# "\n" is the newline character which seperates the lines of text
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

# run_stream_job expects a newline delimited string, map function and reduce function
# and returns a list of results
# https://mockr.readthedocs.io/en/latest/api.html#mockrmockmr.run_stream_job
results = run_stream_job(input_str, map_fn, reduce_fn)

print(results)

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


## Yield

The yield keyword returns an ephemeral object. Once it is read by any other code it gets deleted immediately. We use yield so that the data is guaranteed to only be read by one of the reducers.

<div style="margin-bottom: 30px;"><img width=48 style="display: block; float: left;  margin-right: 20px;" src="img/question-mark-button.png"> <h3 style="padding-top: 15px;">Exercise 1 - Wordcount MapReduce</h3></div>

Given a line of text that's n words long with k unique words as input:
- How many keys are returned by the `map_fn` function?
- How many keys are returned after the output from `map_fn` has been passed through `reduce_fn`?

## Explanation

1. Each line of text is sent to a mapper


2. Each mapper:
    - Splits the line into words
    - For each word: yields a key/value pair
    - Key: word
    - Value: number of times it occurs (since we iterate over all words we set this to 1)


3. Mapped key/value pairs are sent to shuffler


4. Key/value pairs are shuffled
    - For each key collect the corresponding list of values


5. Key/values are routed to respective reducers
    - Each reducer only works on a single key


6. Reduce
    - Sum the counts for a key

## Simulation of MapReduce Stages

Below is a simulation of the output at each stage.

### Text is broken into lines

In [4]:
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

# Split the string into lines and store in a list
lines_of_text = input_str.split("\n")

print(lines_of_text)

['Hello!', 'This is a sample string.', 'It is very simple.', 'Goodbye!']


### Mapping

We need to apply map_fn to each line of text and collect the result

In [5]:
# We will store the output of map_fn in here
word_count_lists = []

# For every line of text
for line in lines_of_text:
    # Apply the map function (split and count words)
    # Save the result as a list in our list
    word_count_lists.append(list(map_fn(line)))

# Show the result of mapping
print(word_count_lists)

[[('hello', 1)], [('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1)], [('it', 1), ('is', 1), ('very', 1), ('simple', 1)], [('goodbye', 1)]]


This is a nested list of lists, which we can flatten into a normal list using itertools:

In [9]:
import itertools

# word_count_lists is a list of lists
# Flatten the list of words to make it simpler by chaining lists together
word_count_list_flat = list(itertools.chain.from_iterable(word_count_lists))

print(word_count_list_flat)

[('hello', 1), ('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('is', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


### Shuffle and Sort

This stage is taken care of by mockr but it is important to understand!

We need to group all the counts for each word together. We will store this in a dictionary (key/value data type).

In [10]:
# SHUFFLE/SORT STAGE
from collections import defaultdict

# Create a dictionary where the default value is a list
word_tuple_dict = defaultdict(list)

for kv_pair in word_count_list_flat:
    # For each unique key append the (word, count) tuple to that keys list
    word_tuple_dict[kv_pair[0]].append(kv_pair)

# Print it in a nice format:
for k, v in word_tuple_dict.items():
    print(str(k) +": " + str(v))

hello: [('hello', 1)]
this: [('this', 1)]
is: [('is', 1), ('is', 1)]
a: [('a', 1)]
sample: [('sample', 1)]
string: [('string', 1)]
it: [('it', 1)]
very: [('very', 1)]
simple: [('simple', 1)]
goodbye: [('goodbye', 1)]


### Reduce

Now we need to apply the reduce_fn to our sorted data. Collect all counts for each word into a list and send this to the reduce_function.

In [11]:
# REDUCE STAGE
results = []

for k, v in word_tuple_dict.items():
    # Get the counts from the list of k/v pairs
    vals_list = [t[1] for t in v]
    
    # Apply the reduce_fn to the word and counts pair
    # reduce_fn will yield a (key, value) tuple
    # inside a generator object which we convert to a list
    results.append(list(reduce_fn(k, vals_list)))
    
print(results)

[[('hello', 1)], [('this', 1)], [('is', 2)], [('a', 1)], [('sample', 1)], [('string', 1)], [('it', 1)], [('very', 1)], [('simple', 1)], [('goodbye', 1)]]


Again we flatten the list using itertools

In [12]:
# Flatten the results to make them more readable
results_flat = list(itertools.chain.from_iterable(results))

print(results_flat)

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


## Bigger Data Set

This was a small example. Let's look at a bigger example: counting all the words in Moby Dick.

In [4]:
from mockr import run_stream_job

# Open the MobyDick text file
input_file = open("MobyDick.txt", 'r')

# Read the file as a string
input_str = input_file.read()

# Run map reduce using the same map_fn and reduce_fn that we defined before
results = run_stream_job(input_str, map_fn, reduce_fn)

# Show the first 20 results
print(results[:20])

[('the', 14697), ('project', 91), ('gutenberg', 92), ('ebook', 10), ('of', 6742), ('moby', 89), ('dick', 88), ('or', 797), ('whale', 1101), ('by', 1222), ('herman', 4), ('melville', 4), ('this', 1438), ('is', 1746), ('for', 1643), ('use', 49), ('anyone', 6), ('anywhere', 16), ('at', 1335), ('no', 591)]


## Scaling 

Imagine we want to perform this same process on all the tweets ever generated. Or on all the text on the web. Can we do this with your personal computer in a reasonable time? No! We would need to use a cluster of computers (workers) and map-reduce.

With the previous word count example we could send any line of text to any map worker in the cluster. Then we can have a set of reduce workers dedicated to one or many keys.

## Example: Linear Regression

Many machine learning problems can be decomposed into sub problems that can be solved independently. This means we can use map-reduce to solve them.

Recall from week 6 that we can calculate the optimal coefficients for linear regression explicitly using linear algebra. For feature matrix X:

$$
X = \begin{bmatrix}1&x_{11}&x_{12}&...&x_{1p}\\1&x_{21}&x_{22}&...&x_{2p}\\ & &...\\1&x_{n1}&x_{n2}&...&x_{np}\end{bmatrix}
$$

And label vector y:

$$
y = \begin{bmatrix}y_1\\y_2\\...\\y_n\end{bmatrix}
$$

We can use the formula

$$ \beta = \left( X^T X\right)^{-1} X^T \mathbf y$$

The trick to using MapReduce for this problem is noticing that $X^TX$ and $X^Ty$ for the whole dataset can be written as the sum of $X_i^TX_i$ and $X_i^Ty_i$ respectively, where the subscript i refers to individual observations, or subsets of observations.

We can decompose Linear Regression by dividing the data into subsets and computing the major matrix multiplication on each of the set. These are later recombined and inverted. See the slides for more details.

To pass data in mapreduce it must be in plaintext. So we have to convert our usual DataFrame to a text format such as JSON between each step.

The following code outputs the estimated parameters of a Linear Regression on the BatonRouge dataset.

In [6]:
import pandas as pd
import numpy as np

def map_linear_fn(chunk):
    # Get the dependant variable y
    y = chunk['Price'].values

    # Get the independant/feature variables
    # which is everything except the price column
    X_vals = chunk[chunk.columns.difference(['Price'])].values

    # Get the number of data points
    m = chunk.shape[0]

    # Insert a column of "1"s for the intercept term 
    X = np.column_stack((np.ones(m), X_vals))

    # Convert to matrix to make multiplication easier
    X = np.asmatrix(X)

    # Calculate required multiplications
    XtX = X.T*X
    Xty = X.T * y.reshape(m,1)

    # Yield the result
    yield("result", [XtX, Xty])

def reduce_linear_fn(key, values):

    # Create lists to accumulate the matrices/vectors in
    XtX_list = []
    Xty_list = []

    for result_list in values:
        XtX_list.append(result_list[0])
        Xty_list.append(result_list[1])

    # Sum up all the XtX matrices
    XtX = np.asmatrix(sum(XtX_list))

    # Sum up all the Xty vectors
    Xty = sum(Xty_list)

    # Solve the linear regression objective
    betas = np.linalg.inv(XtX) * Xty

    yield (key, betas)

In [7]:
from mockr import run_pandas_job

df = pd.read_excel("BatonRouge.xls")

results = run_pandas_job(df, map_linear_fn, reduce_linear_fn, n_chunks = 4)

print(results)

[('result', matrix([[-4.91975489e+04],
        [-4.40598830e+02],
        [ 3.98305235e+04],
        [-2.56557205e+04],
        [-2.13140882e+01],
        [-2.94581106e+03],
        [ 7.81085610e+03],
        [-1.63093297e+03],
        [ 8.54451540e+01],
        [ 1.06961744e+03],
        [ 5.62042730e+04]]))]


In [29]:
# Simplify the result to make it more readable
params_mr = np.array(results[0][1]).ravel()

print(params_mr)

[-4.91975489e+04 -4.40598830e+02  3.98305235e+04 -2.56557205e+04
 -2.13140882e+01 -2.94581106e+03  7.81085610e+03 -1.63093297e+03
  8.54451540e+01  1.06961744e+03  5.62042730e+04]


### Comparison with sklearn parameters

Note that even though we seperated the data into multiple chunks the final result is exactly the same!

In [28]:
from sklearn.linear_model import LinearRegression

lr_obj = LinearRegression()

features = df[df.columns.difference(['Price'])]
target = df['Price']

lr_obj.fit(features, target)

params_sk = np.append(np.array(lr_obj.intercept_), lr_obj.coef_)

print(params_sk)

[-4.91975489e+04 -4.40598830e+02  3.98305235e+04 -2.56557205e+04
 -2.13140882e+01 -2.94581106e+03  7.81085610e+03 -1.63093297e+03
  8.54451540e+01  1.06961744e+03  5.62042730e+04]


<div style="margin-bottom: 30px;"><img width=48 style="display: block; float: left;  margin-right: 20px;" src="img/question-mark-button.png"> <h3 style="padding-top: 15px;">Exercise 2 - Linear Regression and MapReduce</h3></div>

In the equation $ \beta = \left( X^T X\right)^{-1} X^T \mathbf y$:

- What are the dimensions of $X^TX$?
- What about $X^Ty$?

2. Check using Numpy that the product $$X^TX = \begin{bmatrix}1&2&3\\ 4&5&6\end{bmatrix}\begin{bmatrix}1&4\\ 2&5\\ 3&6\end{bmatrix}$$ can be written as the sum 

$$\begin{bmatrix}3\\ 6\end{bmatrix} \begin{bmatrix}3& 6\end{bmatrix} + \begin{bmatrix}2\\ 5\end{bmatrix} \begin{bmatrix}2& 5\end{bmatrix} + \begin{bmatrix}1\\ 4\end{bmatrix} $$