## Introduction

In this lab, you will design and implement MapReduce algorithms for a variety of common data processing tasks.

The MapReduce programming model (and a corresponding system) was proposed in a 2004 paper from a team at Google as a simpler abstraction for processing very large datasets in parallel. The goal of this assignment is to give you experience “thinking in MapReduce.” We will use small datasets that you can inspect directly to determine the correctness of your results and to internalize how MapReduce works. In the next assignment, you will have the opportunity to use a MapReduce-based system to process the very large datasets for which it was designed.

You may want to review the material in module 2.16 to make sure you understand the programming model.

## Setup

Download the data files we will use in this assignment from Canvas: Files > Module 2 > lab2_python_mapreduce > data

Optionally, download the correct output to check your answers: Files > Module 2 > lab2_python_mapreduce > correct_output

To keep things simple, place all files in the same directory as this notebook.

## A Simplified MapReduce Framework

Our goal is to practice thinking in terms of mapreduce without grappling with the systems issues associated with installing and configuring a complete distributed MapReduce environment.

To that end, we're providing you a simple Python MapReduce framework that returns results like the real thing, but does not involve parallel processing.  We're writing programs that could be parallelized using MapReduce, but instead we're going to run them locally in Python.

MapReduce is a framework, not a programming language.  The concepts can be used in any programming language.  We're using Python here for convenience.

The next cell provides a class definition that implements a simple MapReduce framework.

In [14]:
import json

class MapReduce:
    def __init__(self):
        self.intermediate = {}
        self.result = []

    def emit_intermediate(self, key, value):
        self.intermediate.setdefault(key, [])
        self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 

    def execute(self, data, mapper, reducer):
        for line in data:
            record = json.loads(line)
            mapper(record)

        for key in self.intermediate:
            reducer(key, self.intermediate[key])

        #jenc = json.JSONEncoder(encoding='latin-1')
        jenc = json.JSONEncoder()
        for item in self.result:
            print(jenc.encode(item))

## Example: Word Count

The next cell contains an example of using the class above.

In Part 0, set up the name of the file.  Modify this if you downloaded the assignment data to a different location.

In Part 1, we create a MapReduce object that is used to pass data between the map function and the reduce function; you won't need to use this object directly.

In Part 2, the mapper function tokenizes each document and emits a key-value pair. The key is a word formatted as a string and the value is the integer 1 to indicate an occurrence of word.

In Part 3, the reducer function sums up the list of occurrence counts and emits a count for word. Since the mapper function emits the integer 1 for each word, each element in the list_of_values is the integer 1.

The list of occurrence counts is summed and a (word, total) tuple is emitted where word is a string and total is an integer.

When you run this cell, you should see the results printed to the screen: a sequence of pairs (word, count) for every word found in the dataset. 

In [15]:
import sys

# Part 0
filename = "books.json"  # modify this if needed

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, 1)

    
# Part 3
def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts
    total = 0
    for v in list_of_values:
      total += v
    mr.emit((key, total))

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

["[", 11]
["Paradise", 1]
["Lost", 1]
["by", 12]
["John", 1]
["Milton", 1]
["1667", 1]
["]", 11]
["Book", 2]
["I", 25]
["Of", 4]
["Man", 3]
["'", 12]
["s", 7]
["first", 4]
["disobedience", 1]
[",", 101]
["and", 38]
["the", 58]
["fruit", 1]
["that", 13]
["forbidden", 1]
["tree", 1]
["whose", 1]
["mortal", 1]
["taste", 1]
["Brought", 1]
["death", 2]
["into", 1]
["World", 1]
["all", 4]
["our", 2]
["woe", 1]
["With", 1]
["loss", 1]
["of", 35]
["Eden", 1]
["till", 1]
["one", 3]
["greater", 1]
["Restore", 1]
["us", 3]
["regain", 1]
["blissful", 1]
["seat", 1]
["Sing", 3]
["Heavenly", 1]
["Muse", 1]
["on", 4]
["secret", 1]
["top", 1]
["Oreb", 1]
["or", 4]
["Sinai", 1]
["didst", 1]
["inspire", 1]
["That", 4]
["shepherd", 1]
["who", 2]
["taught", 1]
["chosen", 1]
["seed", 1]
["In", 3]
["beginning", 2]
["how", 1]
["heavens", 1]
["earth", 4]
["Rose", 1]
["out", 1]
["Chaos", 1]
[":", 14]
["if", 1]
["Sion", 1]
["hill", 1]
["Delight", 1]
["thee", 2]
["more", 2]
["Siloa", 1]
["brook", 1]
["flowed", 1


## Problem 1: Inverted Index
🎒<font color='red'>(5 points)</font>

    
Create an inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.


### Mapper Input

The input is a two-element list: [document_id, text], where document_id is a string representing a document identifier and text is a string representing the text of the document. The document text may have words in upper or lower case and may contain punctuation. You should treat each token as if it was a valid word; that is, you can just use value.split() to tokenize the string.


### Reducer Output

The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings.

The correct output can be found in inverted_index.json

In [4]:
# Part 0
filename = "books.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, key) 
    
# Part 3
def reducer(key, list_of_values):
    # key: word
    # value: list of document ids containing word
    doc_id = []
    for i in range(len(list_of_values)):
        doc_id.append(list_of_values[i])
    mr.emit((key, doc_id))

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

["[", ["milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "chesterton-ball.txt", "bible-kjv.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "whitman-leaves.txt", "melville-moby_dick.txt"]]
["Paradise", ["milton-paradise.txt"]]
["Lost", ["milton-paradise.txt"]]
["by", ["milton-paradise.txt", "milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "austen-emma.txt", "chesterton-ball.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "melville-moby_dick.txt", "melville-moby_dick.txt"]]
["John", ["milton-paradise.txt"]]
["Milton", ["milton-paradise.txt"]]
["1667", ["milton-paradise.txt"]]
["]", ["milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "chesterton-ball.txt", "bible-kjv.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "whitman-leaves.txt", "melville-moby_dick.txt"]]
["Book", ["milton-parad

## Problem 2: Relational Join
🎒<font color='red'>(5 points)</font>

Implement a relational join as a MapReduce query.

Consider the following query:

    SELECT * 
    FROM Orders, LineItem 
    WHERE Order.order_id = LineItem.order_id

 
This query produces all pairs of records from a table Orders and a table LineItem such that the order_id of the roder matches the order_id of the LineItem. This *join* operation is ubiquitous in databases.    

Your MapReduce query should produce a result similar to this SQL query would return when executed against an appropriate database.

You can consider the two input tables, Order and LineItem, as one big concatenated bag of records that will be processed by the Map function record by record.  That is, instead of two files, each with a separate table, you are given one big file with both tables inside it.  Each record is labeled with which table it came from.  For example, here are two lines in records.json:



    ["order", "2", "78002", "O", "46929.18", "1996-12-01", "1-URGENT", "Clerk#000000880", "0", " foxes. pending accounts at the pending, silent asymptot"]
    ["line_item", "2", "106170", "1191", "1", "38", "44694.46", "0.00", "0.05", "N", "O", "1997-01-28", "1997-01-14", "1997-02-02", "TAKE BACK RETURN", "RAIL", "ven requests. deposits breach a"]


The first element of each record indicates which table the record is stored in.  The other elements are the attribute values for this record.  This example shows a key difference between databases and MapReduce: there is no schema in most MapReduce systems!  You can put in any kind of crazy data you wish, but the tradeoff is that the database can't help protect you from bad data -- without a schema, it can't tell what's good and what's bad.


### Map Input

Each input record is a list of strings representing a tuple in the database. Each list element corresponds to a different attribute of the table

The first item (index 0) in each record is a string that identifies the table the record originates from. This field has two possible values:

"line_item" indicates that the record is a line item.
"order" indicates that the record is an order.

The second element (index 1) in each record is the order_id.

LineItem records have 17 attributes including the identifier string.

Order records have 10 elements including the identifier string.



### Reduce Output

The output should be a joined record: a single list of length 27 that contains the attributes from the order record followed by the fields from the line item record. Each list element should be a string.


The correct output can be found in join.json

In [13]:
# Part 0
filename = "records.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: None
    # value: a complete record, either an order or a line item
    
    key = record[1]
    value = list(record)
    
    mr.emit_intermediate(key, record)

# Part 3
def reducer(key, list_of_values):

    for i in range(1, len(list_of_values)):
        mr.emit(list_of_values[0] + list_of_values[i])

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "155190", "7706", "1", "17", "21168.23", "0.04", "0.02", "N", "O", "1996-03-13", "1996-02-12", "1996-03-22", "DELIVER IN PERSON", "TRUCK", "egular courts above the"]
["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "67310", "7311", "2", "36", "45983.16", "0.09", "0.06", "N", "O", "1996-04-12", "1996-02-28", "1996-04-20", "TAKE BACK RETURN", "MAIL", "ly final dependencies: slyly bold "]
["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "63700", "3701", "3", "8", "13309.60", "0.10", "0.02", "N", "O", "1996-01-29", "1996-03-05", "1996-01-31", "TAKE BACK RETURN", "REG AIR", "riously. regular, express dep"]
["order", "1", "36901", "O", "173665.47", "1996-01-02"

## Problem 3: Counting Friends
🎒<font color='red'>(5 points)</font>

Consider a simple social network dataset consisting of a set of key-value pairs (person, friend) representing a friend relationship between two people. Describe a MapReduce algorithm to count the number of friends for each person.



### Map Input

Each input record is a pair (personA, personB) where personA is a string representing the name of a person and personB is a string representing the name of one of personA's friends. Note that it may or may not be the case that the personA is a friend of personB.




### Reduce Output

The output should be a pair (person, friend_count) where person is a string and friend_count is an integer indicating the number of friends associated with person.

The correct output can be found in friend_count.json

In [11]:
# Part 0
filename = "friends.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: personA 
    # value: personB
    key = record[0]
    value = record[1]
    
    mr.emit_intermediate(key, 1)
    
# Part 3
def reducer(key, list_of_values):
    total = 0
    
    for v in list_of_values:
        total += v
        
    mr.emit((key, total))

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

["Myriel", 5]
["Napoleon", 1]
["MlleBaptistine", 3]
["MmeMagloire", 1]
["Champtercier", 1]
["Valjean", 16]


## Problem 4: Aymmetric Followers

🎒<font color='red'>(5 points)</font>

The relationship "friend" is often symmetric, meaning that if I am your friend, you are my friend. Implement a MapReduce algorithm to check whether this property holds. Generate a list of all non-symmetric friend relationships.




### Map Input

Each input record is a two-element list [personA, personB] where personA is a string representing the name of a person and personB is a string representing the name of one of personA's friends. Note that it may or may not be the case that the personA is a friend of personB.




### Reduce Output

The output should be all pairs (friend, person) such that (person, friend) appears in the dataset but (friend, person) does not.


Correct output: asymmetric_friendships.json





In [53]:
# Part 0
filename = "friends.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: personA 
    # value: personB
    key = record[0]
    value = record[1]
    
    mr.emit_intermediate(key, value)
    
# Part 3
def reducer(key, list_of_values):
    
      for i in list_of_values:  
           if i not in mr.intermediate.keys():
                mr.emit((i, key))  
           else:
                if key not in mr.intermediate[i]:
                     mr.emit((key, i))  

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

["Geborand", "Myriel"]
["Count", "Myriel"]
["OldMan", "Myriel"]
["Napoleon", "Myriel"]
["MlleBaptistine", "Myriel"]
["MlleBaptistine", "Valjean"]
["MlleBaptistine", "MmeMagloire"]
["MmeMagloire", "Myriel"]
["Valjean", "MmeMagloire"]
["Labarre", "Valjean"]
["Marguerite", "Valjean"]
["MmeDeR", "Valjean"]
["Isabeau", "Valjean"]
["Fantine", "Valjean"]
["Cosette", "Valjean"]
["Simplice", "Valjean"]
["Woman1", "Valjean"]
["Judge", "Valjean"]
["Woman2", "Valjean"]
["Gillenormand", "Valjean"]
["MlleGillenormand", "Valjean"]
["Babet", "Valjean"]
["Montparnasse", "Valjean"]


## Problem 5: Genetic data
🎒<font color='red'>(5 points)</font>

Consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides, e.g., GCTTCCGAAATGCTCGAA....

Design a MapReduce algorithm to remove the last 10 characters from each string of nucleotides, then remove any duplicates generated.




### Map Input

Each input record is a two-element list (sequence id, nucleotides) where sequence id is a string representing a unique identifier for the sequence and nucleotides is a string representing a sequence of nucleotides




### Reduce Output

The output from the reduce function should be the unique trimmed nucleotide strings.


Correct output: unique_trims.json


In [59]:
# Part 0
filename = "dna.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: sequence_id 
    # value: nucleotide
    key = record[0]
    value = record[1]
    
    mr.emit_intermediate(key, value)
    
# Part 3
def reducer(key, list_of_values):
    
    for v in list_of_values:
        mr.emit(v[:-10])

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)

"GGGGTGGCTACCCAGAGGCATGCTCCTCACCCAGCTCCACTGTCCCTACCTGCTGCTGCTGCTGGTGGTGCTGTCATGTCTGGTGAGTGCCGTGCACCCCACAGCACCTGCATGGAGGAGGGTTGGCTGCTCTGTACACAAGTGCTGAGAGCTCTCTGGTTGCTTGCCTACCTGTTTCCCAGCCAAAGGCACCCTCTGCCCAGGTAATGGACTTTTTGTTTGAGAAGTGGAAGCTCTATAGTGACCAGTGCCACCACAACCTAAGCCTGCTGCCCCCACCTACTGGTGAGTCCCACCAAAGACTCCTGTGTCCTGACACCCCGCCTGGAGGTACACTCAGAGACCTTATGGGGATGTAATAGTAATGGCTGCTTTATAATGCCCAGCCACTTGCCCCCAGTTACAGACTGACCTCCAGAGGCAGTGGCTTCCCTAAGGCTGTATGGTCAGGAAACAGTAGAAATGCAGAACTGCCTCAGGGCTGCCCTCATCCCCAGCCAGCTGATGTCTGCTGTCACCGCTCACACTGGGCAGACAGTGAATAGGGACAGGGCAGGGCAGAGAGACTGGGTCTTCCCAGTCTCAGTTGAGGGGGATGAGTGCCTGGGAGGGAGGGAGAAGGATGAGGGAGCTATGCTACGGCTGGGCCTGGAAAAGGTGCCAGCCAAACTGGAGTCTGACATCTGACAAGGAATGTATTACCAGGCAGGAGGGGCCAGTGAGATGGCTCAGCAGATAATGGCCCTTTCCTCTGGGACTCAGTGAAAATGGATAATGATCCTTGCAAGTTGTCCTATGATCCTGTATGCTGGTATACACACTTGTATGCTTCTGAGTGCATGCACATGTGCGCGCGCGCGCACGCGCGCACACACACACACACACACACTAAATAAATGAACAGATAAATGTAAAAAGCTTTTTACAAATTTTTATAAAAGATACATAGAGGAAAGACACAAAATGGGGTCTGTGCACATTTGGGATGGGATATCTTGG

## Problem 6: Matrix Multiply (Tricky!)
🎒<font color='red'>(5 points)</font>

Assume you have two matrices A and B in a sparse matrix format, where each record is of the form i, j, value. Design a MapReduce algorithm to compute the matrix multiplication A x B.



### Map Input

The input to the map function will be a row of a matrix represented as a list. Each list will be of the form (matrix, i, j, value) where matrix is a string and i, j, and value are integers.

The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values: "a" indicates that the record is from matrix A and "b" indicates that the record is from matrix B.




### Reduce Output

The output from the reduce function will also be a row of the result matrix represented as a tuple. Each tuple will be of the form (i, j, value) where each element is an integer.

Now you will have an opportunity to apply what you have learnt so far.



Correct output: multiply.json



In [None]:
# Part 0
filename = "matrix.json"

# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: row of a matrix (matrix, i, j, value)
    # value: nucleotide
    
    key = record[]
    value = 

# Part 3
def reducer(key, list_of_values):
    # WRITE YOUR CODE HERE

# Part 4
inputdata = open(filename)
mr.execute(inputdata, mapper, reducer)