# Big Data Platform
## Assignment 2: MapReduce

**By:**  

Omri Newman, 806646    

<br><br>

**The goal of this assignment is to:**
- Understand and practice the details of MapReduceEngine

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented. 
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading a Jupyter notebook.
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.
  
  
- In case of identical code submissions - both groups will get a Zero. 
- Some groups might be selected randomly to present their code.

**Requirements:**  
- Python 3.6 should be used.  
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q1 - 5 points - Initial Steps
- Q2 - 50 points - MapReduceEngine
- Q3 - 30 points - Implement the MapReduce Inverted index of the JSON documents
- Q4 - 5 points - Testing Your MapReduce
- Q5 - 10 points - Final Thoughts 

`Total: 100`

**Prerequisites**

In [1]:
# example
# !pip install --quiet zipfile36

**Imports**

In [2]:
# general
import os
import shutil
import time
import random
import warnings
import threading # you can use easier threading packages
import concurrent
import csv
import sqlite3

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display

**Hide Warnings**

In [3]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [4]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [5]:
random.seed(123)

<br><br><br><br>
# Question 1
# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records. 

The schema is `(‘firstname’,’secondname’,city’)`  

Values should be randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [6]:
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']
secondname = ['Taggar', 'Newman', 'Berl', 'Fogel', 'Shapira', 'Levi', 'Cohen'] # please use some version of random

records = 10
for i in range(1, 21):
    pd.DataFrame(data={'firstname': np.random.choice(firstname, records),
                 'secondname': np.random.choice(secondname, records),
                 'city': np.random.choice(city, records)}).to_csv(f'myCSV{i}.csv', index=False)

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [7]:
os.mkdir('mapreducetemp')
os.mkdir('mapreducefinal')

<br><br><br>
# Question 2
## MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [8]:
conn = sqlite3.connect('mydb.db')

In [9]:
c = conn.execute('''CREATE TABLE IF NOT EXISTS temp_results(
                    key VARCHAR(20),
                    value VARCHAR(20)
                    )''')
conn.commit()

Checking the scheme:

In [10]:
pd.read_sql('SELECT * FROM temp_results', conn).head()

Unnamed: 0,key,value


1. **Create a Python class** `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br><br>

**Implement** the following functionality in the `execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br><br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br><br>
3. Keep the list of all threads and check whether they are completed.
<br><br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **Write SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`  
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br><br><br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed` 



**Answer** - We choose to initiate the threads using `concurrent.features`. The `with` statement ensures all threads will finish before exiting and continuing to the next step. Still, we define the `check_threads` function which will iterate over all threads and check if they successfuly finished their tasks. Each of the task functions (write_tmp, write_final) should return `True`, hence checking if the threads finished successfuly is equivalent to checking if each thread holds the value `True`. We check for the threads after the mapping step and after the reducing step.

We have an optional argument (`logs`) which will print a live log of the threads. We included a sample of that in a bonus section in Q4.

In [11]:
# implement all of the class here

class MapReduceEngine():
    def write_tmp(inputs, thread_n, logs=False):
        pd.DataFrame(inputs).to_csv(f'mapreducetemp/part-tmp-{thread_n}.csv', index=False, header=['key', 'value'])
        if logs:
            print(f'thread {thread_n} created tmp ')
        return True
    
    def write_final(inputs, thread_n, logs=False):
        pd.DataFrame({'key': [inputs[0]], 'value': [inputs[1]]}).to_csv(f'mapreducefinal/part-{thread_n}-final.csv', index=False)
        if logs:
            print(f'thread {thread_n} created final ')
        return True
    
    def check_threads(threads):
        for thread in threads:
            if not thread.result():  # Should be True because write_tmp() and write_final() returns True
                return 'MapReduce Failed'
    
    @staticmethod
    def execute(input_data, map_function, reduce_function, conn, logs=False):
        with concurrent.futures.ThreadPoolExecutor() as executor:
            threads = []
            for i, csv in enumerate(input_data, 1):
                file = executor.submit(map_function, csv)
                threads.append(executor.submit(MapReduceEngine.write_tmp, file.result(), i, logs))
        MapReduceEngine.check_threads(threads)
        if logs:
            print('**all threads finished tmp**\n')
        
        for file in os.listdir('mapreducetemp/'):
            pd.read_csv(f'mapreducetemp/{file}').to_sql('temp_results', conn, if_exists='append', index=False)
            
        sql = '''SELECT key, GROUP_CONCAT(value)
         FROM temp_results
         GROUP BY key
         ORDER BY key'''
        reduce_input = list(pd.read_sql(sql, conn).to_records(index=False))
        with concurrent.futures.ThreadPoolExecutor() as executor:
            threads_2 = []
            for i, reduce in enumerate(reduce_input, 1):
                file = executor.submit(reduce_function, reduce[0], reduce[1])
                threads_2.append(executor.submit(MapReduceEngine.write_final, file.result(), i, logs))
        MapReduceEngine.check_threads(threads_2)
        return '**MapReduced Completed**'

<br><br><br><br>

# Question 3
## Implement the MapReduce Inverted index of the JSON documents

Implement a function `inverted_map(document_name)` which reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:  
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:  
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

**Answer** - We first store the title row of the file, then iterate over the remaining rows, mapping each of a row's entries to the correspondeing element in the title using a dictionary. Only then we append the required tuple into a list.

In [12]:
def inverted_map(document_name):
    lst = []
    with open(document_name, 'r') as f:
        title_row = f.readline().strip().split(',')
        for line in f:
            dic = dict(zip(title_row, line.strip().split(',')))
            lst.extend([(f'{key}_{value}', document_name) for key, value in dic.items()])
    return lst

Write a reduce function `inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

For example,  
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`   
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

**Answer** - In the following implementation, we used the `set` properties to filter out repeating values. Keep in mind that using sets doesn't maintain the original order. In that case though, we don't mind. Also, we removed the spaces by mapping the `str.strip` method on documents input.

The reason for the spacing removal is to make the output of `'myCSV2.csv, myCSV2.csv'` to be the same as `'myCSV2.csv,myCSV2.csv'` for example.

In [13]:
def inverted_reduce(value, documents):
    lst = [value]
    lst.append(','.join(set(map(str.strip, list(documents.split(','))))))
    return lst

<br><br><br><br>
# Question 4
## Testing Your MapReduce

**Create Python list** `input_data` : `[‘myCSV1.csv’,.. ,‘myCSV20.csv’]`

In [14]:
input_data = [f'myCSV{i}.csv' for i in range(1,21)]

**Submit MapReduce as follows:**

In [15]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, conn)  # You can try with logs=True
print(status)

**MapReduced Completed**


Make sure that `MapReduce Completed` should be printed and `mapreducefinal` folder should contain the result files.

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [18]:
shutil.rmtree('mapreducetemp/', ignore_errors=True)
os.mkdir('mapreducetemp')
conn.close()
os.remove('mydb.db')

### Bonus - not required
We do the same thing, but with `logs=True` to see the threads in action. We can actually see that the order which the threads finish isn't neccessarly consistent (e.g thread 3 can finish before thread 2). That really makes it feel like different machines.

In [19]:
conn = sqlite3.connect('mydb.db')
c = conn.execute('''CREATE TABLE IF NOT EXISTS temp_results(
                    key VARCHAR(20),
                    value VARCHAR(20)
                    )''')
conn.commit()

In [20]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, conn, logs=True)
print(status)

# Deleting again
shutil.rmtree('mapreducetemp/', ignore_errors=True)
os.mkdir('mapreducetemp')
conn.close()
os.remove('mydb.db')

thread 1 created tmp 
thread 2 created tmp 
thread 3 created tmp 
thread 4 created tmp 
thread 5 created tmp 
thread 6 created tmp thread 7 created tmp 

thread 8 created tmp 
thread 9 created tmp 
thread 10 created tmp 
thread 11 created tmp 
thread 12 created tmp 
thread 13 created tmp 
thread 14 created tmp 
thread 15 created tmp 
thread 16 created tmp 
thread 17 created tmp 
thread 18 created tmp 
thread 19 created tmp 
thread 20 created tmp 
**all threads finished tmp**

thread 1 created final thread 2 created final 

thread 3 created final 
thread 6 created final thread 4 created final 
thread 5 created final 

thread 7 created final 
thread 8 created final 
thread 9 created final 
thread 10 created final 
thread 12 created final thread 11 created final 

thread 13 created final 
thread 14 created final 
thread 16 created final thread 15 created final 

thread 17 created final 
thread 18 created final thread 19 created final 

thread 20 created final 
thread 22 created final 
thr

<br><br><br><br>

# Question 5
# Final Thoughts

The phase where `MapReduceEngine` reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the **shuffle step**.

Please explain **clearly** what would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all, meaning reducers will directly read responses from the mappers.

**Answer**:

The main problem when processing Big Data via MapReduce without shuffle is you may have some repetition of (key, value) pairs after the reduce step. That's because the shuffle step should map every unique key to a single reducer.  

For example if you wish to compute unique word counts in a document without shuffle, then several reducers may hold values for the same key. An additional aggregation would be required to yield total word count for the repeated keys which in turn would increase latency.

Shuffling keys between reducers is crucial for guaranteeing the reduce step will have completely aggregated all keys in the data.

<br><br><br><br>
Good Luck :)