 # <font color='darkblue'>Designing MapReduce Algorithms</font>

## <font color='darkblue'>Author : Manas Thakre</font>

#### <font color='darkblue'>Date : 1-May-2018</font>

<br>

<font color='darkgreen'> _Note: All the datafiles as well as the MapReduce.py module should be placed in the same folder _ </font>
<br>
_________________________________

In [1]:
import imp, sys, os
sys.path;

In [2]:
MapReduce_path = os.path.abspath(os.path.join('..'))
#print(MapReduce_path)
if MapReduce_path not in sys.path:
    sys.path.append(MapReduce_path)
sys.path;

In [3]:
from MapReduce import MapReduce  # Importing Mapreduce module (refer the repository's MapReduce.py file)

## <font color='darkgreen'><u>Situation 1: Inverted Dictionary</u></font>


Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.

**Mapper Input**
The input is a 2 element list: [document_id, text], where document_id is a string representing a document identifier and text is a string representing the text of the document. The document text may have words in upper or lower case and may contain punctuation. You should treat each token as if it was a valid word; that is, you can just use value.split() to tokenize the string.

**Reducer Output**
The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings.



In [4]:
mr = MapReduce()

In [5]:
def mapper(record):
    # key: document identifier
    # value: document contents split in words
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
        mr.emit_intermediate(w, key)

In [6]:
def reducer(key, list_of_values):
    # key: word
    # value: list of documents
    lis = []
    for v in list_of_values:
        if v not in lis:
            lis.append(v)
    mr.emit((key, lis))

In [7]:
inputdata_1 = open("books.json")
mr.execute(inputdata_1, mapper, reducer)

["[", ["milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "chesterton-ball.txt", "bible-kjv.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "melville-moby_dick.txt"]]
["Paradise", ["milton-paradise.txt"]]
["Lost", ["milton-paradise.txt"]]
["by", ["milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "chesterton-ball.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "melville-moby_dick.txt"]]
["John", ["milton-paradise.txt"]]
["Milton", ["milton-paradise.txt"]]
["1667", ["milton-paradise.txt"]]
["]", ["milton-paradise.txt", "edgeworth-parents.txt", "austen-emma.txt", "chesterton-ball.txt", "bible-kjv.txt", "chesterton-thursday.txt", "blake-poems.txt", "shakespeare-caesar.txt", "whitman-leaves.txt", "melville-moby_dick.txt"]]
["Book", ["milton-paradise.txt", "bible-kjv.txt"]]
["I", ["milton-paradise.txt", "austen-emma.txt", "chesterton-ball.txt", "chesterton-

["expression", ["chesterton-ball.txt"]]
["men", ["chesterton-ball.txt", "chesterton-thursday.txt"]]
["be", ["chesterton-ball.txt", "bible-kjv.txt"]]
["stars", ["chesterton-ball.txt"]]
["professor", ["chesterton-ball.txt"]]
["himself", ["chesterton-ball.txt"]]
["invented", ["chesterton-ball.txt"]]
["machine", ["chesterton-ball.txt"]]
["also", ["chesterton-ball.txt"]]
["everything", ["chesterton-ball.txt"]]
["King", ["bible-kjv.txt"]]
["James", ["bible-kjv.txt"]]
["Bible", ["bible-kjv.txt"]]
["Old", ["bible-kjv.txt"]]
["Testament", ["bible-kjv.txt"]]
["First", ["bible-kjv.txt"]]
["Moses", ["bible-kjv.txt"]]
["Called", ["bible-kjv.txt"]]
["Genesis", ["bible-kjv.txt"]]
["1", ["bible-kjv.txt"]]
["created", ["bible-kjv.txt"]]
["heaven", ["bible-kjv.txt"]]
["2", ["bible-kjv.txt"]]
["And", ["bible-kjv.txt", "blake-poems.txt"]]
["without", ["bible-kjv.txt", "shakespeare-caesar.txt"]]
["form", ["bible-kjv.txt"]]
["void", ["bible-kjv.txt"]]
["darkness", ["bible-kjv.txt"]]
["upon", ["bible-kjv.txt

## <font color='darkgreen'><u>Situation 2: Relational Join</u></font>

Implement a relational join as a MapReduce query.Consider the following query:

<font color='red'>
SELECT * <br>
FROM Orders, LineItem <br>
WHERE Order.order_id = LineItem.order_id<br>
</font>

Your MapReduce query should produce the same result as this SQL query executed against an appropriate database. You can consider the two input tables, Order and LineItem, as one big concatenated bag of records that will be processed by the map function record by record.

**Mapper Input**
Each input record is a list of strings representing a tuple in the database. Each list element corresponds to a different attribute of the table. The first item (index 0) in each record is a string that identifies the table the record originates from. This field has two possible values:

"line_item" indicates that the record is a line item.
"order" indicates that the record is an order.
The second element (index 1) in each record is the order_id.

LineItem records have 17 attributes including the identifier string.Order records have 10 elements including the identifier string.

**Reducer Output**
The output should be a joined record: a single list of length 27 that contains the attributes from the order record followed by the fields from the line item record. Each list element should be a string.

In [8]:
mr = MapReduce()
def mapper(record):
    # key: table identifier
    # value: contents of table
    source = record[0]
    order_id = record[1]
    mr.emit_intermediate(order_id, record)

In [9]:
def reducer(key, list_of_values):
    orders = []
    line_items = []

    for record in list_of_values:
        source = record[0]
        if source == "order":
            orders.append(list(record))
        elif source =="line_item":
            line_items.append(list(record))    

    for order in orders:
        for item in line_items:
            mr.emit(order + item)

In [10]:
inputdata_2 = open("records.json")
mr.execute(inputdata_2, mapper, reducer)

["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "155190", "7706", "1", "17", "21168.23", "0.04", "0.02", "N", "O", "1996-03-13", "1996-02-12", "1996-03-22", "DELIVER IN PERSON", "TRUCK", "egular courts above the"]
["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "67310", "7311", "2", "36", "45983.16", "0.09", "0.06", "N", "O", "1996-04-12", "1996-02-28", "1996-04-20", "TAKE BACK RETURN", "MAIL", "ly final dependencies: slyly bold "]
["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among ", "line_item", "1", "63700", "3701", "3", "8", "13309.60", "0.10", "0.02", "N", "O", "1996-01-29", "1996-03-05", "1996-01-31", "TAKE BACK RETURN", "REG AIR", "riously. regular, express dep"]
["order", "1", "36901", "O", "173665.47", "1996-01-02"



## <font color='darkgreen'><u>Situation 3: Count the number of friends</u></font>

Consider a simple social network dataset consisting of a set of key-value pairs (person, friend) representing a friend relationship between two people. Describe a MapReduce algorithm to count the number of friends for each person.

**Mapper Input**
Each input record is a 2 element list [personA, personB] where personA is a string representing the name of a person and personB is a string representing the name of one of personA's friends. Note that it may or may not be the case that the personA is a friend of personB.

**Reducer Output**
The output should be a pair (person, friend_count) where person is a string and friend_count is an integer indicating the number of friends associated with person.

In [11]:
mr = MapReduce()
def mapper(record):
    key = record[0]
    mr.emit_intermediate(key, 1)

In [12]:
def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts
    mr.emit((key, len(list_of_values)))


In [13]:
inputdata_3 = open("friends.json")
mr.execute(inputdata_3, mapper, reducer)

["Myriel", 5]
["Napoleon", 1]
["MlleBaptistine", 3]
["MmeMagloire", 1]
["Champtercier", 1]
["Valjean", 16]


## <font color='darkgreen'><u>Situation 4: Assymetric Friendships</u></font>

The relationship "friend" is often symmetric, meaning that if I am your friend, you are my friend. Implement a MapReduce algorithm to check whether this property holds. Generate a list of all non-symmetric friend relationships.

**Mapper Input**
Each input record is a 2 element list [personA, personB] where personA is a string representing the name of a person and personB is a string representing the name of one of personA's friends. Note that it may or may not be the case that the personA is a friend of personB.

**Reducer Output**
The output should be all pairs (friend, person)

In [14]:
mr = MapReduce()
def mapper(record):
    # key: person
    # value: friend 
    person = record[0]
    friend = record[1]
    mr.emit_intermediate((person, friend), 1)
    mr.emit_intermediate((friend, person), 1)


In [15]:
def reducer(key, list_of_values):
    if len(list_of_values)<2:
         mr.emit(key)


In [16]:
inputdata_4 = open("friends.json")
mr.execute(inputdata_4, mapper, reducer)

["Myriel", "Geborand"]
["Geborand", "Myriel"]
["Myriel", "Count"]
["Count", "Myriel"]
["Myriel", "OldMan"]
["OldMan", "Myriel"]
["Napoleon", "Myriel"]
["Myriel", "Napoleon"]
["MlleBaptistine", "Myriel"]
["Myriel", "MlleBaptistine"]
["MlleBaptistine", "Valjean"]
["Valjean", "MlleBaptistine"]
["MlleBaptistine", "MmeMagloire"]
["MmeMagloire", "MlleBaptistine"]
["MmeMagloire", "Myriel"]
["Myriel", "MmeMagloire"]
["Valjean", "MmeMagloire"]
["MmeMagloire", "Valjean"]
["Valjean", "Labarre"]
["Labarre", "Valjean"]
["Valjean", "Marguerite"]
["Marguerite", "Valjean"]
["Valjean", "MmeDeR"]
["MmeDeR", "Valjean"]
["Valjean", "Isabeau"]
["Isabeau", "Valjean"]
["Valjean", "Fantine"]
["Fantine", "Valjean"]
["Valjean", "Cosette"]
["Cosette", "Valjean"]
["Valjean", "Simplice"]
["Simplice", "Valjean"]
["Valjean", "Woman1"]
["Woman1", "Valjean"]
["Valjean", "Judge"]
["Judge", "Valjean"]
["Valjean", "Woman2"]
["Woman2", "Valjean"]
["Valjean", "Gillenormand"]
["Gillenormand", "Valjean"]
["Valjean", "MlleGil

## <font color='darkgreen'><u>Situation 5: Unique Trimmed Nucleotide sequences</u></font>

Consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides, e.g., GCTTCCGAAATGCTCGAA....Write a MapReduce query to remove the last 10 characters from each string of nucleotides, then remove any duplicates generated.

**Mapper Input**
Each input record is a 2 element list [sequence id, nucleotides] where sequence id is a string representing a unique identifier for the sequence and nucleotides is a string representing a sequence of nucleotides

**Reducer Output**
The output from the reduce function should be the unique trimmed nucleotide strings.

In [17]:
mr = MapReduce()
def mapper(record):
    # key: sequence id
    # value: nucleotide sequence
    sequenceid = record[0]
    nucleotide = record[1]

    mr.emit_intermediate(sequenceid, nucleotide)

In [18]:
def reducer(key, list_of_values):
    nonrepeated = []
    for elem in list_of_values:
        if elem not in nonrepeated:
            nonrepeated.append(elem)
    
    trimmed=[]
    for nuc in nonrepeated:
        trimmed.append(nuc[:-10])        
        
    return mr.emit(list(set(trimmed)))


In [19]:
inputdata_5 = open("dna.json")
mr.execute(inputdata_5, mapper, reducer)

["GGGGTGGCTACCCAGAGGCATGCTCCTCACCCAGCTCCACTGTCCCTACCTGCTGCTGCTGCTGGTGGTGCTGTCATGTCTGGTGAGTGCCGTGCACCCCACAGCACCTGCATGGAGGAGGGTTGGCTGCTCTGTACACAAGTGCTGAGAGCTCTCTGGTTGCTTGCCTACCTGTTTCCCAGCCAAAGGCACCCTCTGCCCAGGTAATGGACTTTTTGTTTGAGAAGTGGAAGCTCTATAGTGACCAGTGCCACCACAACCTAAGCCTGCTGCCCCCACCTACTGGTGAGTCCCACCAAAGACTCCTGTGTCCTGACACCCCGCCTGGAGGTACACTCAGAGACCTTATGGGGATGTAATAGTAATGGCTGCTTTATAATGCCCAGCCACTTGCCCCCAGTTACAGACTGACCTCCAGAGGCAGTGGCTTCCCTAAGGCTGTATGGTCAGGAAACAGTAGAAATGCAGAACTGCCTCAGGGCTGCCCTCATCCCCAGCCAGCTGATGTCTGCTGTCACCGCTCACACTGGGCAGACAGTGAATAGGGACAGGGCAGGGCAGAGAGACTGGGTCTTCCCAGTCTCAGTTGAGGGGGATGAGTGCCTGGGAGGGAGGGAGAAGGATGAGGGAGCTATGCTACGGCTGGGCCTGGAAAAGGTGCCAGCCAAACTGGAGTCTGACATCTGACAAGGAATGTATTACCAGGCAGGAGGGGCCAGTGAGATGGCTCAGCAGATAATGGCCCTTTCCTCTGGGACTCAGTGAAAATGGATAATGATCCTTGCAAGTTGTCCTATGATCCTGTATGCTGGTATACACACTTGTATGCTTCTGAGTGCATGCACATGTGCGCGCGCGCGCACGCGCGCACACACACACACACACACACTAAATAAATGAACAGATAAATGTAAAAAGCTTTTTACAAATTTTTATAAAAGATACATAGAGGAAAGACACAAAATGGGGTCTGTGCACATTTGGGATGGGATATCTTG

## <font color='darkgreen'><u>Situation 6: Matrix Multiplication</u></font>

Assume you have two matrices A and B in a sparse matrix format, where each record is of the form i, j, value. Design a MapReduce algorithm to compute the matrix multiplication A x B

**Mapper Input**
The input to the map function will be a row of a matrix represented as a list. Each list will be of the form [matrix, i, j, value] where matrix is a string and i, j, and value are integers.The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values: "a" indicates that the record is from matrix A and "b" indicates that the record is from matrix B

**Reduce Output**
The output from the reduce function will also be a row of the result matrix represented as a tuple. Each tuple will be of the form (i, j, value) where each element is an integer.

In [20]:
mr = MapReduce()
def mapper(record):    
    if record[0]=='a':
        for k in range(5):  # 5 rows
            key = (record[1],k)
            value = (0, record[2], record[3]) 
            mr.emit_intermediate(key, value)
    else:
        for i in range(5):  
            key = (i,record[2])
            value = (1, record[1], record[3])
            mr.emit_intermediate(key,value)

In [21]:
def reducer(key, list_of_values):
    dict1 = {}
    dict2 = {}
    for item in list_of_values:
        if item[0]==0: 
            dict1[item[1]] = item[2]
        else:
            dict2[item[1]] = item[2]

    result = 0
    for j in range(5):    
        try:
            result = result + dict1[j]*dict2[j]
        except:
            pass
    mr.emit(tuple(list(key)+[result]))

In [22]:
inputdata_6 = open("matrix.json")
mr.execute(inputdata_6, mapper, reducer)

[0, 0, 11878]
[0, 1, 14044]
[0, 2, 16031]
[0, 3, 5964]
[0, 4, 15874]
[1, 0, 4081]
[1, 1, 6914]
[1, 2, 8282]
[1, 3, 7479]
[1, 4, 9647]
[2, 0, 6844]
[2, 1, 9880]
[2, 2, 10636]
[2, 3, 6973]
[2, 4, 8873]
[3, 0, 10512]
[3, 1, 12037]
[3, 2, 10587]
[3, 3, 2934]
[3, 4, 5274]
[4, 0, 11182]
[4, 1, 14591]
[4, 2, 10954]
[4, 3, 1660]
[4, 4, 9981]


__________________________________________________

Please cite and refer this notebook if you use the contents