# BUSS6002 Week 10: MAP REDUCE

Project notes from class: https://www.evernote.com/shard/s175/sh/a49f29a6-8e46-4f42-b801-fa446f41f380/41d5809bb5c8df3efd9ed8492c09133d

In [1]:
# RE: Regular Expressions (Regex)
# This helps us grab parts of a string by specifying a rule ('expression')
import re

# This is used to run the MAP REDUCE job
# It will allocate and distribute jobs to different workers automatically
# You have to write the map and reduce steps yourself
from mockr import run_stream_job

In [2]:
# This regular expression matches words
# Test it at https://www.regexpal.com
# From the website we can see that the expression below will help us search for all words in a sentence (and words include ')
WORD_RE = re.compile(r"[\w']+")  # This is now an object that will find words for us

# In this case, we will be splitting our string up into chunks of lines.
# Example: "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!" will become
#   Chunk 1: "Hello!"
#   Chunk 2: "This is a sample string."
#   Chunk 3: "It is very simple."
#   Chunk 4: "Goodbye!"
# Example: WORD_RE.findall("This is a sample string.")
#   This will give you ["This", "is", "a", "sample", "string"]
def map_fn(chunk):
    # Use the regex to find all words in each chunk
    # The chunk is a line of text because we are
    # using run_stream_job
    for word in WORD_RE.findall(chunk):
        # Emit a result using the word as the key and
        # the number of times it occured. We emit once
        # for each word so this value is 1.
        yield (word.lower(), 1)

def reduce_fn(key, values):
    # Recieves all the values for each key (unique word)
    # then sums them together for the total count
    yield (key, sum(values))

# "\n" is the newline character which seperates the lines of text
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

# run_stream_job expects a newline delimited string, map function and reduce function
# and returns a list of results
# https://mockr.readthedocs.io/en/latest/api.html#mockrmockmr.run_stream_job
results = run_stream_job(input_str, map_fn, reduce_fn)

print(results)

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


# Break down the process above one by one

- This part is not necessary for actually getting the MAP REDUCE job done.
- This is for your understanding only.

First, we want to break the string into separate lines.

In [3]:
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

# Split the string into lines and store in a list
lines_of_text = input_str.split("\n")

print(lines_of_text)

['Hello!', 'This is a sample string.', 'It is very simple.', 'Goodbye!']


Then, we will do the map job:

- First, each line will be split into words.
    - e.g. 'This is a sample string.' will become ['This', 'is', 'a', 'sample', 'string']
- Then, we will collect the result of the map jobs into a big list.

For example, we can have 4 chunks:
- Chunk 1: "Hello!" -> ['Hello'] -> [('hello', 1)]
- Chunk 2: "This is a sample string." -> ['This', 'is', 'a', 'sample', 'string'] -> [('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1)]
- Chunk 3: "It is very simple." -> ['It', 'is', 'very', 'simple'] -> [('it', 1), ('is', 1), ('very', 1), ('simple', 1)]
- Chunk 4: "Goodbye!" -> ['Goodbye'] -> [('goodbye', 1)

In [4]:
# We will store the output of map_fn in here
word_count_lists = []

# For every line of text
for line in lines_of_text:
    # Apply the map function (split and count words)
    # Save the result as a list in our list
    word_count_lists.append(list(map_fn(line)))

# Show the result of mapping
print(word_count_lists)

[[('hello', 1)], [('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1)], [('it', 1), ('is', 1), ('very', 1), ('simple', 1)], [('goodbye', 1)]]


Notice that the lists are still separated by chunks. We now need to reorganize and put all the individual key-value pairs (word, count) into one big list.

This would be the beginning and preparation of the reduce job.

In [5]:
import itertools

# word_count_lists is a list of lists
# Flatten the list of words to make it simpler by chaining lists together
word_count_list_flat = list(itertools.chain.from_iterable(word_count_lists))

print(word_count_list_flat)

[('hello', 1), ('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('is', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


Now, we need to put the data into the right reduce worker.

That is, the data with the same keys should be going to the same reduce worker and stored in the same processor / machine / worker.

In [6]:
# SHUFFLE/SORT STAGE
from collections import defaultdict

# Create a dictionary where the default value is a list
word_tuple_dict = defaultdict(list)

for kv_pair in word_count_list_flat:
    # For each unique key append the (word, count) tuple to that keys list
    # kv_pair[0] specifies the reduce machine's name using the unique word as the key
    # In that machine, you will put in (append) the relevant data (kv_pair)
    word_tuple_dict[kv_pair[0]].append(kv_pair)

# Print it in a nice format:
for k, v in word_tuple_dict.items():
    print(str(k) +": " + str(v))

hello: [('hello', 1)]
this: [('this', 1)]
is: [('is', 1), ('is', 1)]
a: [('a', 1)]
sample: [('sample', 1)]
string: [('string', 1)]
it: [('it', 1)]
very: [('very', 1)]
simple: [('simple', 1)]
goodbye: [('goodbye', 1)]


Here we will perform the actual reduce job, which is to sum all the values in that worker.

In [7]:
# REDUCE STAGE
results = []

# Here, the key value pair is the REDUCE machine, not the data
# e.g. k = 'hello', v = [('hello', 1)]
for k, v in word_tuple_dict.items():
    # Get the counts from the list of k/v pairs
    # For example, if v = [('hello', 1)], then
    #    t = ('hello', 1)
    #    t[1] = 1
    # So the list will just become [1]
    # For example, if v = [('is', 1), ('is', 1)], then
    #    FIRST TIME: t = ('is', 1), SECOND TIME: t = ('is', 1)
    #    FIRST TIME: t[1] = 1, SECOND TIME: t[1] = 1
    # So the list will just become [1, 1]
    vals_list = [t[1] for t in v]
    
    # Apply the reduce_fn to the word and counts pair
    # reduce_fn will yield a (key, value) tuple
    # inside a generator object which we convert to a list
    # For example, [1] will add up to 1
    # For example, [1, 1] will add up to 2
    results.append(list(reduce_fn(k, vals_list)))
    
print(results)

[[('hello', 1)], [('this', 1)], [('is', 2)], [('a', 1)], [('sample', 1)], [('string', 1)], [('it', 1)], [('very', 1)], [('simple', 1)], [('goodbye', 1)]]


In [8]:
# Flatten the results to make them more readable
results_flat = list(itertools.chain.from_iterable(results))

print(results_flat)

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


# Linear Regression with Map Reduce

- Linear Regression with Map Reduce gives you EXACT answers
- So, you achieve speed up for large datasets without compromising accuracy

In [9]:
import pandas as pd
import numpy as np

def map_linear_fn(chunk):
    # Get the dependant variable y
    y = chunk['Price'].values

    # Get the independant/feature variables
    # which is everything except the price column
    X_vals = chunk[chunk.columns.difference(['Price'])].values

    # Get the number of data points
    m = chunk.shape[0]

    # Insert a column of "1"s for the intercept term 
    X = np.column_stack((np.ones(m), X_vals))

    # Convert to matrix to make multiplication easier
    X = np.asmatrix(X)

    # Calculate required multiplications
    XtX = X.T*X
    Xty = X.T * y.reshape(m,1)

    # Yield the result
    yield("result", [XtX, Xty])

def reduce_linear_fn(key, values):

    # Create lists to accumulate the matrices/vectors in
    XtX_list = []
    Xty_list = []

    # Combine all the results from all workers into a big list
    for result_list in values:
        XtX_list.append(result_list[0])
        Xty_list.append(result_list[1])

    # Sum up all the XtX matrices
    XtX = np.asmatrix(sum(XtX_list))

    # Sum up all the Xty vectors
    Xty = sum(Xty_list)

    # Solve the linear regression objective
    betas = np.linalg.inv(XtX) * Xty

    yield (key, betas)

In [15]:
from mockr import run_pandas_job

# Load in your data
df = pd.read_excel("BatonRouge.xls")

# Get rid of any columns that are not numerical
df = df[df.columns.difference(['Style'])]

# Run the mocker job on pandas dataframes
results = run_pandas_job(df, map_linear_fn, reduce_linear_fn, n_chunks = 4)

print(results)

[('result', matrix([[-4.54226376e+04],
        [-4.32338317e+02],
        [ 3.99294120e+04],
        [-2.60221781e+04],
        [-2.15419771e+01],
        [-1.92386741e+03],
        [ 7.56350149e+03],
        [-1.89659638e+03],
        [ 8.57720047e+01],
        [ 5.81019654e+04]]))]


In [16]:
# Simplify the result to make it more readable
params_mr = np.array(results[0][1]).ravel()

print(params_mr)

[-4.54226376e+04 -4.32338317e+02  3.99294120e+04 -2.60221781e+04
 -2.15419771e+01 -1.92386741e+03  7.56350149e+03 -1.89659638e+03
  8.57720047e+01  5.81019654e+04]


In [17]:
from sklearn.linear_model import LinearRegression

lr_obj = LinearRegression()

features = df[df.columns.difference(['Price'])]
target = df['Price']

lr_obj.fit(features, target)

params_sk = np.append(np.array(lr_obj.intercept_), lr_obj.coef_)

print(params_sk)

[-4.54226376e+04 -4.32338317e+02  3.99294120e+04 -2.60221781e+04
 -2.15419771e+01 -1.92386741e+03  7.56350149e+03 -1.89659638e+03
  8.57720047e+01  5.81019654e+04]
