# Big Data Platform
## Assignment 2: MapReduce

**The goal of this assignment is to:**
- Understand and practice the details of MapReduceEngine

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented.
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading a Jupyter notebook.
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.


- In case of identical code submissions - both groups will get a Zero.
- Some groups might be selected randomly to present their code.

**Requirements:**
- Python 3.6 should be used.
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q1 - 5 points - Initial Steps
- Q2 - 50 points - MapReduceEngine
- Q3 - 30 points - Implement the MapReduce Inverted index of the JSON documents
- Q4 - 5 points - Testing Your MapReduce
- Q5 - 10 points - Final Thoughts

`Total: 100`

**Prerequisites**

In [None]:
# example
!pip install --quiet zipfile36

**Imports**

In [None]:
# general
import os
import time
import random
import warnings
import threading # you can use easier threading packages

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display

**Hide Warnings**

In [None]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [None]:
random.seed(123)

<br><br><br><br>
# Question 1
# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records.

The schema is `(‘firstname’,’secondname’,city’)`

Values should be randomly chosen from the lists:
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`
- `secondname`: any value

In [None]:
def generate_random_name():
    """
    Generate a random name: First character is uppercase and
    alphabetic and the rest is series of lower case characters of length
    1 - 12. Giving us an output string of a length 2 - 13

    Returns:
    str: Randomly generated name
    """
    return random.choice(string.ascii_uppercase) + \
           ''.join(random.choices(string.ascii_lowercase, k=random.choice(range(1,12))))

In [None]:
import string

firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']
# Use list comprehension in order to create a list of 8 randomly generated names
secondname = [generate_random_name() for _ in range(8)] # please use some version of random

print(secondname)

['Bckcxa', 'Riweiigalc', 'Tbilxcdu', 'Axogvt', 'Kufpnvikun', 'Vqrxmrjate', 'Creikbloi', 'Gcqajfnsxqz']


In [None]:
def generate_csvs(number_of_csvs):
    """
    Generate number_of_csvs csv files with 10 randomly generate rows for the
    defined columns by randomly picking a value from the lists of possible
    values for each column.

    Parameters:
    number_of_csvs (int): number of csv files to generate
    """
    for i in range(1,number_of_csvs+1):
        df = pd.DataFrame(columns=['firstname', 'secondname', 'city'])
        df['firstname'] = np.random.choice(firstname, 10)
        df['secondname'] = np.random.choice(secondname, 10)
        df['city'] = np.random.choice(city, 10)
        df.to_csv(f'myCSV{i}.csv', index=False)

In [None]:
generate_csvs(20)

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [None]:
os.makedirs("mapreducetemp", exist_ok=True)
os.makedirs("mapreducefinal", exist_ok=True)

<br><br><br>
# Question 2
## MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`
`schema: (key:TEXT,value:TEXT)`

In [None]:
import sqlite3 as sql

In [None]:
def create_connection(db_file):
    """
    Create an sql connection to the db by the given db file

    Parameters:
    db_file (str): Path of the db file to use to make a connection to the db

    Returns:
    conn (sql.Connection): Connection to db
    """
    conn = sql.connect(db_file)
    return conn

def create_table(conn, create_table_sql):
    """
    Create a table in the db with the given sql execution code

    Parameters:
    conn (sql.Connection): connection to db
    create_table_sql (str): sql execution code for table creation
    """
    c = conn.cursor()
    c.execute(create_table_sql)
    conn.commit()
    c.close()

In [None]:
sql_create_table = """ CREATE TABLE IF NOT EXISTS temp_results (
                           key TEXT,
						   value TEXT
                           ); """

# create a database connection
sql_conn = create_connection("mydb.db")

# create table
create_table(sql_conn, sql_create_table)

1. **Create a Python class** `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value)
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br><br>

**Implement** the following functionality in the `execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key)
<br><br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br><br>
3. Keep the list of all threads and check whether they are completed.
<br><br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite
    `data = pd.read_csv(path to csv)`
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **Write SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br><br><br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)`
<br>
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file
<br>
8. Keep list of all threads and check whether they are completed
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed`



In [None]:
from tqdm.notebook import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
from threading import current_thread
import logging

In [None]:
# Setup logger file
logging.basicConfig(filename='mapreduce.log', filemode='w+',
                    format='%(asctime)s | %(threadName)s | %(levelname)s | %(message)s',
                    level=logging.INFO)

In [None]:
def thread_wrapper(function, save_path, value):
    """
    A wrapper to run the threads with logging capability. If the thread fails
    raise the exception.

    Parameters:
    function: function to run by thread
    save_path (str): path to save the csv
    value (tuple): value to run the function on and thread_id
    """
    data, thread_id = value
    current_thread().name = f"{function.__name__} thread {thread_id}"
    logging.info('Started')
    try:
        # If tuple then it means it's the reduce function
        if type(data) is tuple:
            res = function(*data)
            df = pd.DataFrame(res)
            df.to_csv(f'{save_path}/part-{thread_id}-final.csv',
                        index=False)
        # The map function
        else:
            res = function(data)
            df = pd.DataFrame(res, columns=['key', 'value'])
            df.to_csv(os.path.join(save_path,
                                   f'part-tmp-{thread_id}.csv'),
                      index=False)
    except Exception as e:
        logging.error(e)
        raise e

    logging.info('Ended')

def pool_executor_wrapper(function, data, pbar, num_of_threads, save_path, is_map=True):
    """
    A wrapper to execute the pool of threads

    Parameters:
    function: function to let the thread run
    data: data to run the function on
    pbar (tqdm): the progress bar we want to update as threads complete
    num_of_threads (int): the max number of threads we want to run
    save_path (str): path where we want to save our results
    is_map (bool): boolean whether we are running threads on the map function

    Returns:
    str: if a thread fails returns a fail message otherwise returns nothing
    """
    # Define the pool executor and start the threads
    with ProcessPoolExecutor(max_workers=num_of_threads) as executor:
        futures = list(map(lambda x: executor.submit(thread_wrapper, function, save_path, x),
                           list(zip(data, range(1, num_of_threads + 1)))))

        # Go over the list of threads and check the completion status of each
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                return "MapReduce Failed"

            pbar.update(1)


In [None]:
# implement all of the class here

class MapReduceEngine():
    def execute(self, input_data, map_function, reduce_function):
        """
        Execute the mapreduce steps: map, shuffle and sort, reduce
        Parallelize the map and reduce steps by creating threads to run the
        map and reduce function in parallel on the partitioned data

        Parameters:
        input_data (dict): Dictionary to apply map_function on
        map_function: Map function to apply on input_data
        reduce_function: Reduce function on the shuffled and sorted output of
                         map_function

        Returns:
        str: Completion status message
        """

        # Start a thread for each key in input_data and run map_function on it
        num_of_threads = len(input_data)
        mapreducetemp_path = "mapreducetemp"

        # Define a progress bar to visualize progress of threads
        with tqdm(total=num_of_threads, desc = 'Map Progress Bar') as pbar:
            map_status = pool_executor_wrapper(map_function, input_data, pbar,
                                               num_of_threads, mapreducetemp_path)

        if map_status is not None:
            return map_status

        # Load content of all CSV files into temp_results table
        for file in os.listdir(mapreducetemp_path):
            if file.endswith(".csv"):
                data = pd.read_csv(os.path.join(mapreducetemp_path, file))
                data.to_sql('temp_results', sql_conn, if_exists='append', index=False)

        # SQL statement to generate a sorted list by key
        sql_generate_sorted_list = """
                                   SELECT key, GROUP_CONCAT(value)
                                   FROM temp_results
                                   GROUP BY key
                                   ORDER BY key;
                                   """
        sorted_list = sql_conn.execute(sql_generate_sorted_list).fetchall()

        # Start a thread for each value from the generated list
        num_of_threads = len(sorted_list)

        # Define a progress bar to visualize progress of threads
        with tqdm(total=num_of_threads, desc = 'Reduce Progress Bar') as pbar:
            reduce_status = pool_executor_wrapper(reduce_function, sorted_list, pbar,
                                                  num_of_threads, 'mapreducefinal',
                                                  is_map=False)

        if reduce_status is not None:
            return reduce_status

        return "MapReduce Completed"

<br><br><br><br>

# Question 3
## Implement the MapReduce Inverted index of the JSON documents

Implement a function `inverted_map(document_name)` which reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

In [None]:
def inverted_map(document_name):
    """
    Reads the csv document from the local disk and return a list that contains
    entries of the form (key_value, document name)

    Parameters:
    document_name (csv): csv document name to run the map function on
    column_index (int): index of column to create list for

    Returns:
    entries (lst): list of tuples of the defined form
    """
    entries = []
    df = pd.read_csv(document_name)
    for _, row in df.iterrows():
        for col in df.columns:
            entries.append(("_".join([col, row[col]]), document_name))
    return entries

Write a reduce function `inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.
This list might have duplicates.
Reduce function will return new list without duplicates.

For example,
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

In [None]:
def inverted_reduce(value, documents):
    """
    Take the two parameters given and put them together in a list and get rid of
    the duplicates in the parameter documents

    Parameters:
    value (str): column name and value in the form of columnname_value
    documents(str): list of all csv documents per given value

    Returns:
    lst: concatenation of value and the list of csv documents with no duplicates
    """
    return [value] + list(set(documents.split(",")))

<br><br><br><br>
# Question 4
## Testing Your MapReduce

**Create Python list** `input_data` : `[‘myCSV1.csv’,.. ,‘myCSV20.csv’]`

In [None]:
input_data = [f'myCSV{i}.csv' for i in range(1,21)]

**Submit MapReduce as follows:**

In [None]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce)
print(status)

Map Progress Bar:   0%|          | 0/20 [00:00<?, ?it/s]

Reduce Progress Bar:   0%|          | 0/24 [00:00<?, ?it/s]

MapReduce Completed


Make sure that `MapReduce Completed` should be printed and `mapreducefinal` folder should contain the result files.

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [None]:
mapreducetemp_path = "mapreducetemp"
for file in os.listdir(mapreducetemp_path):
    os.remove(os.path.join(mapreducetemp_path, file))
os.remove("mydb.db")

<br><br><br><br>

# Question 5
# Final Thoughts

The phase where `MapReduceEngine` reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the **shuffle step**.

Please explain **clearly** what would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all, meaning reducers will directly read responses from the mappers.

The shuffling process collects similar work into one unit. Data from all mappers are grouped by the key, sorted by the key and split among reducers. Each reducer obtains all values associated with the same key. The shuffle step is actually the most important step of the MapReduce process.

For example, in a word count process, let’s say you have a list of (key, value) pairs like this:

(hot, 1), (cold, 1), (hot, 1), (rain, 1), (cold, 1)
<br/>
<br/>
The shuffling process will sort and group by the pairs by the keys:

(cold, 1), (cold, 1), (hot, 1), (hot, 1), (rain, 1)
<br/>
<br/>
Now each reducer will work on (one or more) different keys and the final result will be:

Reducer 1 - (cold, 2)

Reducer 2 - (hot, 2)

Reducer 3 - (rain, 1)

Without the shuffling process, same keys might go to different reducers and the count will not be correct, for example the result could be:

Reducer 1 - (cold, 2)

Reducer 2 - (hot, 1)

Reducer 3 - (hot, 1)

Reducer 4 - (rain, 1)

The shuffle step occurs to guarantee that the results from mapper which have the same key (of course, they may or may not be from the same mapper) will be sent to the same reducer. So, the reducer can further reduce the result set to a proper result.





<br><br><br><br>
Good Luck :)