# Assignment 1: MapReduce

__The goal of this assignment is to implement 5 functions related to MapReduce and create 5 queries with them.__

MapReduce is based on functional programming, an approach that encourages the use of functions to build program.

While MapReduce can effectively process large amount of data, functional programming is a more general framework.

The purpose of this tutorial is to introduce you to functional programming through the use of MapReduce functions.

In the following course, we will use other functions to build higher-level abstraction based on Spark and RDD.

__The MapReduce functions are explained in the example section of this notebook.__

__Grade scale__: 20 points
- __correct function/query__: 2 points
- __incorrect function/query__: 2 points

__Further documentations__:
* https://learnxinyminutes.com/docs/python/
* https://en.wikipedia.org/wiki/MapReduce
* https://en.wikipedia.org/wiki/Functional_programming
* https://www.dataquest.io/blog/introduction-functional-programming-python/

# Core

In [2]:
# define some helper functions with lambda
# lambdas are functions that must fit on 1 line 

inc = lambda x: x + 1
dec = lambda x: x - 1
square = lambda x: x * x

add = lambda x, y: x + y
sub = lambda x, y: x - y

ispos = lambda x: x > 0
isneg = lambda x: x < 0 

isodd = lambda x: x % 2 == 1
iseven = lambda x: x % 2 == 0

ident = lambda x: (x, x)
sumall = lambda x: ('sum', x)
lenall = lambda x: ('len', len(x))
leneach = lambda x: (x, len(x))
countall = lambda x: ('count', 1)
counteach = lambda x: (x, 1)

# Examples

In [3]:
# map(f: Callable, coll: Iterable) -> Iterable: apply f to every object of coll
# - range(x: int) -> List[int]: generate a list of number from 0 to x (excluded)
# - Python's map mus be converted to a list so it can be printed on the screen

list(map(inc, range(10)))

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [4]:
# filter(f: Callable[..., bool], coll: Iterable) -> Iterable: keep every object of coll that satisfy f
# - we call f a predicate as this functions must return either True of False (boolean)
# - f is satified when its returns value is True

list(filter(iseven, range(10)))

[0, 2, 4, 6, 8]

In [5]:
# reduce(f: Callable, coll: Iterable, init: Any) -> Any: apply f on 2 objects cumulatively starting with init
# - example: an addition is a reduce function applied to a list of number where f is the + sign
# - we must import reduce from functools as this function is not built anymore in Python 3

from functools import reduce

reduce(add, range(10), 5)

50

# Functions

__You must not use Python builtins functions (i.e map, filter, reduce) !__  

I used my secret teacher assistant power to make the test fail if you do.

Instead, you must use for loops to create and return a new collection.

In [6]:
def mymap(f, coll):
    """Apply f to every object of coll."""
    ### BEGIN SOLUTION
    result = []
    
    for c in coll:
        result.append(f(c))
        
    return result
    ### END SOLUTION

In [7]:
def myfilter(f, coll):
    """Keep every object x of coll that satisfies f(x) (f(x) returns True)."""
    ### BEGIN SOLUTION
    result = []
    for c in coll:
        if f(c) != True:
            continue
        else:
            result.append(f(c))
            
    return result
    ### END SOLUTION

In [8]:
def myreduce(f, coll, init):
    """Apply f every two object of coll cumulatively starting with init."""
    ### BEGIN SOLUTION
    result = init
    for c in coll:
        result = f(result, c)
    
    return result
    ### END SOLUTION

In [9]:
# you don't need to implement this function, just to understand it
# this function is required by MapReduce to distribute over reducers
# myshuffle will be called by MapReduce between mapper and reducer calls

def myshuffle(tuples):
    """Group tuple values (second entry) by their key (first entry)."""
    shuffled = dict()
    
    for key, value in tuples:
        if key not in shuffled:
            shuffled[key] = list()
            
        shuffled[key].append(value)
    
    return sorted(shuffled.items())

In [10]:
# mapper(x: Any) -> Iterable[Tuple[key, value]] (where Key = Value = Any)

def mymapreduce(mapper, reducer, coll, init):
    """Perform the following steps on coll:
       1. apply mapper on every object of coll
       2. group the output of mapper with myshuffle function
       3. run reducer with init on shuffled values and for each shuffled group
       4. return the key and its reducer output for each shuffled group in a new coll
    """
    ### BEGIN SOLUTION
    result = []
    shuffledData = myshuffle(tuple(mymap(mapper, coll)))

    for col in shuffledData:
        result.append((col[0], myreduce(reducer, col[1], 0)))
    
    return result
    ### END SOLUTION

In [11]:
# this function is hard to implement and not required for the rest of the assignment !

def myparmapreduce(pool, mapper, reducer, coll, init):
    """Parallel implementation of mapreduce:
       1. mapper is run in parallel by pool.map()
       2. reducers are run in parallel with pool.map()
    """
    ### BEGIN SOLUTION
    
    
    ### END SOLUTION

__1. Count word occurrences in the given sentence__
- __hint__: use `sentense.split(' ')` to convert a string to a list of words

In [12]:
sentence = "a problem is a big problem when data is big"

def Q1(s):
    ### BEGIN SOLUTION
    chars = s.split(' ')
        
    result = mymapreduce(counteach, add, chars, 0)
    return result
    ### END SOLUTION
    
Q1(sentence)

[('a', 2), ('big', 2), ('data', 1), ('is', 2), ('problem', 2), ('when', 1)]

__2. Count the number of non-blank characters in the given sentence__
- __hint__: aggregate all mapper values in a single key

In [13]:
sentence = "a problem is a big problem when data is big"

def Q2(s):
    ### BEGIN SOLUTION
    chars = list(s)
    mapper = lambda s: ('count', 1) if (s != " ") else ('count', 0)

    result = mymapreduce(mapper, add, chars, 0)
    return result
    ### END SOLUTION
    
Q2(sentence)

[('count', 34)]

__3. Group the given list of record by their age__
- __hint__: you can concatenate Python lists with `+`

In [164]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q3(coll):
    ### BEGIN SOLUTION
    mapper = lambda s: (s['age'], s['name'])
    reducer = lambda x,y: [y]+[x] if x != 0 else y
    
    result = mymapreduce(mapper, reducer, coll, 0)
    return result
    ### END SOLUTION

Q3(persons)

[(20, 'Eva'), (22, ['Bob', 'Alice']), (24, ['Lina', 'Max']), (30, 'Ethan')]

__4. Find the youngest person in the following list__
- __hint__: you can only compare items two by two

In [102]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q4(coll):
    ### BEGIN SOLUTION
    mapper = lambda s: (s['age'], s['name'])
    reducer = lambda x, y: y
    result = mymapreduce(mapper, reducer, coll, 0)
    return result[0]
    ### END SOLUTION
    
Q4(persons)

(20, 'Eva')

__5. Retrieve the persons who are younger than 23 year old__

In [261]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q5(coll):
    ### BEGIN SOLUTION
    
    mapper = lambda s: ('persons', [s['name']]) if s['age'] < 23 else ('', '')
    reducer = lambda x,y: y+x if x != 0 else y
    result = mymapreduce(mapper, reducer, coll, 0).pop()
    return result
    ### END SOLUTION
    
Q5(persons)

('persons', ['Eva', 'Bob', 'Alice'])