# Big Data Platform
## Assignment 3: ServerLess

**By:**

### ID#1: 011996279
### ID#2: 300123123

<br><br>

**The goal of this assignment is to:**
- Understand and practice the details of Serverless

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented. 
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading (not Zip):
    - Jupyter Notebook
    - 2 Log files
    - Additional local scripts
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.

  
- In case of identical code submissions - both groups will get a Zero. 
- Some groups might be selected randomly to present their code.

**Requirements:**  
- Python 3.6 should be used.  
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q0 - 10 points - Setup
- Q1 - 40 points - Serverless MapReduceEngine
- Q2 - 20 points - MapReduce job to calculate inverted index
- Q3 - 30 points - Shuffle

`Total: 100`

<br><br>

In [2]:
!pip install --quiet zipfile36
!pip install names
!pip install numpy
!pip install scipy
!pip install pandas
!pip install ibm-cos-sdk
!pip install lithops



In [3]:
import ibm_boto3
from ibm_botocore.client import Config, ClientError

from lithops import FunctionExecutor
from lithops import Storage

# general
import os
import time
import logging
import threading
from threading import Thread
import random
import warnings
import threading # you can use easier threading packages

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
# import seaborn as sns
# import matplotlib.pyplot as plt

# notebook
from IPython.display import display

#random last names
import names

#SQL
import sqlite3
from sqlite3 import Error

In [4]:
random.seed(123)

# Question 0
## Setup

1. Navigate to IBM Cloud and open a trial account. No need to provide a credit card
2. Choose IBM Cloud Object Storage service from the catalog
3. Create a new bucket in IBM Cloud Object Storage
4. Create credentials for the bucket with HMAC (access key and secret key)
5. Choose IBM Cloud Functions service from the catalog and create a service


#### Lithops setup
1. By using “git” tool, install master branch of the Lithops project from
https://github.com/lithops-cloud/lithops
2. Follow Lithops documentation and configure Lithops against IBM Cloud Functions and IBM Cloud Object Storage
3. Configure Lithops log level to be in DEBUG mode
4. Run Hello World example by using Futures API and verify all is working properly.


#### IBM Cloud Object Storage setup
1. Upload all the input CSV files that you used in homework 2 into the bucket you created in IBM Cloud Object Storage


<br><br><br>

In [5]:
def hello(name, number):
    return f'hello {name} {number}'


def test():
    with FunctionExecutor() as fexec:
        fut = fexec.call_async(hello, ('World', 1))
        print(fut.result())

In [6]:
test()

2022-01-11 16:30:54,108 [INFO] lithops.config -- Lithops v2.5.8
2022-01-11 16:30:54,109 [DEBUG] lithops.config -- Loading configuration from /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/.lithops_config
2022-01-11 16:30:54,112 [DEBUG] lithops.config -- Loading Serverless backend module: ibm_cf
2022-01-11 16:30:54,117 [DEBUG] lithops.config -- Loading Storage backend module: ibm_cos
2022-01-11 16:30:54,119 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Creating IBM COS client
2022-01-11 16:30:54,120 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Set IBM COS Endpoint to https://s3.eu-de.cloud-object-storage.appdomain.cloud
2022-01-11 16:30:54,120 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Using access_key and secret_key
2022-01-11 16:30:54,171 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de
2022-01-11 16:30:54,171 [DEBUG] lithops.serverless.backends.ibm_cf.ibm_cf -- Creating IBM Cloud Functions client
2022-01-11 16:3

ClientError: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key ID you provided does not exist in our records.

In [7]:
DB_FILE_NAME='mydb.db'
TEMP_FOLDER='./mapreducetemp'
FINAL_FOLDER='./mapreducefinal'
NUM_OF_RECORDS = 10
TEMP_RESULTS_TBL='temp_results'
BUCKET_NAME = "cloud-object-storage-eg-cos-standard-eu0"

# Question 1
## Serverless MapReduceEngine

Modify MapReduceEngine from homework 2 into the MapReduceServerlessEngine where map and reduce tasks executed as a serverless actions, instead of local threads. In particular:
1. Deploy all map tasks as a serverless actions by using Lithops against IBM Cloud Functions.
2. Collect results from all map tasks and store them in the same SQLite as you used in MapReduceEngine and use the same code for the sort and shuffle phase.
3. Deploy reduce tasks by using Lithops against IBM Cloud Functions. Instead of persisting results from reduce tasks, return results back to the MapReduceServerlessEngine and proceed with the same workflow as in MapReduceEngine
4. Return results of reduce tasks to the user

**Please attach:**  
Text file with all log messages Lithops printed to console during the execution. Make
sure log level is set to DEBUG mode.

#### Code:

In [8]:
input_data = []

def seeder(number):
    firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
    city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiev', 'Hamburg']
    secondname = []
    for i in range(10):
        rand_name = names.get_last_name()
        secondname.append(rand_name)
    df = pd.DataFrame()
    df["firstname"] = np.random.choice(firstname, NUM_OF_RECORDS)
    df["secondname"] = np.random.choice(secondname, NUM_OF_RECORDS)
    df["city"] = np.random.choice(city, NUM_OF_RECORDS)
    #     df["id"] = df.index + 1
    curr_file_name = str('MyCSV%s.csv' % number)
    df.to_csv(curr_file_name, index=False)
    print("finished creating MyCSV%s.csv" % number)
    input_data.append('./'+ curr_file_name)
for i in range (20):
    seeder(i)

finished creating MyCSV0.csv
finished creating MyCSV1.csv
finished creating MyCSV2.csv
finished creating MyCSV3.csv
finished creating MyCSV4.csv
finished creating MyCSV5.csv
finished creating MyCSV6.csv
finished creating MyCSV7.csv
finished creating MyCSV8.csv
finished creating MyCSV9.csv
finished creating MyCSV10.csv
finished creating MyCSV11.csv
finished creating MyCSV12.csv
finished creating MyCSV13.csv
finished creating MyCSV14.csv
finished creating MyCSV15.csv
finished creating MyCSV16.csv
finished creating MyCSV17.csv
finished creating MyCSV18.csv
finished creating MyCSV19.csv


In [9]:
try:
    os.mkdir(TEMP_FOLDER)
    os.mkdir(FINAL_FOLDER)
except Exception as e:
    print(f"folder(s) already exist(s): {e}")

folder(s) already exist(s): [Errno 17] File exists: './mapreducetemp'


In [10]:
sql_create_temp_results_table = """CREATE TABLE IF NOT EXISTS temp_results (
                                    key text,
                                    value text
                                    ); """

In [11]:
sql_group_by_key = """SELECT key, GROUP_CONCAT(value)
                      FROM temp_results GROUP BY key ORDER BY (key);"""

In [12]:
sql_drop_all_tables = """DROP TABLE temp_results;"""

In [13]:
def drop_temp_tables(conn):
    try:
        c = conn.cursor()
        c.execute(sql_drop_all_tables)
    except Error as e:
        print(e)

In [14]:
def create_connection(db_file):
    """ create a database connection to a SQLite database """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        print(sqlite3.version)
    except Error as e:
        print(e)
    return conn

In [15]:
def create_table(conn, create_table_sql):
    """ create a table from the create_table_sql statement
    :param conn: Connection object
    :param create_table_sql: a CREATE TABLE statement
    :return:
    """
    try:
        c = conn.cursor()
        c.execute(create_table_sql)
    except Error as e:
        print(e)

In [16]:
def get_grouped_values(conn):
    cur = conn.cursor()
    cur.execute(sql_group_by_key)

    rows = cur.fetchall()

    return rows

In [17]:
class MapReduceServerlessEngine:
    def execute(self, input_data, map_function, reduce_function, params):
        curr_map = 0

        for map_document_path in input_data:
            with FunctionExecutor() as fexec:
                with open(map_document_path, 'r') as curr_file:
                    fut = fexec.call_async(func=map_function, data=(curr_file, params['column'], map_document_path))
                    map_result = fut.result()
                    print(map_result)

                    if map_result is not None:
                        map_result_df = pd.DataFrame(map_result, columns=["key", "value"])
                        map_result_df.to_csv(TEMP_FOLDER + '/part-tmp-%s.csv' % str(curr_map), index=False, header=True)
            curr_map += 1

        for temp_file_name in os.scandir(TEMP_FOLDER):
            csv_df = pd.read_csv(temp_file_name.path)
            csv_df.to_sql(TEMP_RESULTS_TBL, connection, if_exists='append', index=False)

        grouped_values = get_grouped_values(connection)

        curr_reduce = 0
        for reduce_value in grouped_values:
            with FunctionExecutor() as fexec:
                fut = fexec.call_async(reduce_function, (reduce_value[0], reduce_value[1]))
                reduce_result = fut.result()
                print(reduce_result)

                if reduce_result is not None:
                    result_df = pd.DataFrame(reduce_result, columns=["values"])
                    result_df.to_csv(FINAL_FOLDER + '/part-%s-final.csv' % str(curr_reduce), index=False, header=True)
            curr_reduce += 1

        return "MapReduce Completed"


In [18]:
def inverted_map(document_buffer, column_index, document_name):
    values = pd.read_csv(filepath_or_buffer = document_buffer, usecols=[column_index], skiprows=1)

    return [(x[0], document_name) for x in values.to_records(index=False)]

In [19]:
def inverted_reduce(value, documents):
    ret_val = [value]
    temp_set = set(documents.split(','))
    ret_val.extend(temp_set)

    return ret_val

In [20]:
connection = create_connection(DB_FILE_NAME)
create_table(connection, sql_create_temp_results_table)
if connection is not None:
    # create temp_results table
    create_table(connection, sql_create_temp_results_table)
else:
    print("Error! cannot create the database connection.")

2.6.0


In [21]:
mapreduce = MapReduceServerlessEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params={'column': 0})
print(status)

2022-01-11 16:31:47,658 [INFO] lithops.config -- Lithops v2.5.8
2022-01-11 16:31:47,660 [DEBUG] lithops.config -- Loading configuration from /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/.lithops_config
2022-01-11 16:31:47,662 [DEBUG] lithops.config -- Loading Serverless backend module: ibm_cf
2022-01-11 16:31:47,663 [DEBUG] lithops.config -- Loading Storage backend module: ibm_cos
2022-01-11 16:31:47,664 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Creating IBM COS client
2022-01-11 16:31:47,664 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Set IBM COS Endpoint to https://s3.eu-de.cloud-object-storage.appdomain.cloud
2022-01-11 16:31:47,665 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Using access_key and secret_key
2022-01-11 16:31:47,669 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de
2022-01-11 16:31:47,670 [DEBUG] lithops.serverless.backends.ibm_cf.ibm_cf -- Creating IBM Cloud Functions client
2022-01-11 16:3

ClientError: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key ID you provided does not exist in our records.

In [22]:
for file_name in os.scandir(TEMP_FOLDER):
    os.remove(file_name.path)
drop_temp_tables(connection)

# Task 2
## Submit MapReduce job to calculate inverted index
1. Use input_data: `cos://bucket/<path to CSV data>`
2. Submit MapReduce job with reduce and map functions as you used in homework 2, as follows

    `mapreduce = MapReduceServerlessEngine()`  
    `results = mapreduce.execute(input_data, inverted_map, inverted_index)`   
    `print(results)`

**Please attach:**  
Text file with all log messages Lithops printed to console during the execution. Make
sure log level is set to DEBUG mode.

#### Code:

In [23]:
# Delete all result files in the final folder
for file_name in os.scandir(FINAL_FOLDER):
    os.remove(file_name.path)

# Delete all .csv or .db files in the current directory
for file_name in os.scandir('.'):
    name, extension = os.path.splitext(file_name)
    if extension == '.csv' or extension == '.db':
        os.remove(file_name)

In [27]:
def seeder(number):
    firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
    city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiev', 'Hamburg']
    secondname = []
    for i in range(10):
        rand_name = names.get_last_name()
        secondname.append(rand_name)
    df = pd.DataFrame()
    df["firstname"] = np.random.choice(firstname, NUM_OF_RECORDS)
    df["secondname"] = np.random.choice(secondname, NUM_OF_RECORDS)
    df["city"] = np.random.choice(city, NUM_OF_RECORDS)

    # Writing the generated DataFrame to a csv file
    df.to_csv('./MyCsv%s.csv' % number, index=False)

    abspath = os.path.abspath('./MyCsv%s.csv' % number)
    key_name = os.path.basename(abspath)

    # Uploading the csv file to the bucket
    upload_csv_to_bucket(abspath, key_name)


def create_cos_client():
    global cos
    # Constants for IBM COS values
    COS_ENDPOINT = "https://s3.eu-de.cloud-object-storage.appdomain.cloud"
    COS_API_KEY_ID = ""
    COS_INSTANCE_CRN = "crn:v1:bluemix:public:cloud-object-storage:global:a/f59790e39df946bfb79f67a0d7ba6626:77c77432-e666-476b-add9-c579609bd099:bucket:cloud-object-storage-eg-cos-standard-eu0"
    # Create client
    cos = ibm_boto3.client("s3",
                           ibm_api_key_id=COS_API_KEY_ID,
                           ibm_service_instance_id=COS_INSTANCE_CRN,
                           endpoint_url=COS_ENDPOINT,
                           config=Config(signature_version="oauth")
                           )

create_cos_client()

for i in range (20):
    seeder(i)

Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv0.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv1.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv2.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv3.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv4.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv5.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv6.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv7.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv8.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv9.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv10.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/MyCsv11.csv
Uploaded: /Users/phogelbookpro2019/PycharmProjects/BDP_Assignm

In [25]:
def upload_csv_to_bucket(path, key_name):
    try:
        cos.upload_file(Filename=path, Bucket=("%s" % BUCKET_NAME), Key='input/' + str(key_name))
        print(f"Uploaded: {path}")
    except Exception as e:
        print("Unable to create text file: {0}".format(e))

In [26]:
try:
    os.mkdir(TEMP_FOLDER)
    os.mkdir(FINAL_FOLDER)
except Exception as e:
    print(f"folder(s) already exist(s): {e}")

folder(s) already exist(s): [Errno 17] File exists: './mapreducetemp'


In [28]:
sql_create_temp_results_table = """CREATE TABLE IF NOT EXISTS temp_results (
                                    key text,
                                    value text
                                    ); """

In [29]:
sql_group_by_key = """SELECT key, GROUP_CONCAT(value)
                      FROM temp_results GROUP BY key ORDER BY (key);"""

In [30]:
sql_drop_all_tables = """DROP TABLE temp_results;"""

In [31]:
def drop_temp_tables(conn):
    try:
        c = conn.cursor()
        c.execute(sql_drop_all_tables)
    except Error as e:
        print(e)

In [32]:
def create_connection(db_file):
    """ create a database connection to a SQLite database """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        print(sqlite3.version)
    except Error as e:
        print(e)
    return conn

In [33]:
def create_table(conn, create_table_sql):
    """ create a table from the create_table_sql statement
    :param conn: Connection object
    :param create_table_sql: a CREATE TABLE statement
    :return:
    """
    try:
        c = conn.cursor()
        c.execute(create_table_sql)
    except Error as e:
        print(e)

In [34]:
def get_grouped_values(conn):
    cur = conn.cursor()
    cur.execute(sql_group_by_key)

    rows = cur.fetchall()

    return rows

In [35]:
class MapReduceServerlessEngine():
    def execute(self, input_data, map_function, reduce_function, params):
        curr_map = 0

        for input_key in input_data:
            with FunctionExecutor() as fexec:
                fut = fexec.call_async(func=map_function, data=(input_key, params['column'])) # {"key":key, "col":0}
                map_result = fut.result()
                print(map_result)

                if map_result is not None:
                    map_result_df = pd.DataFrame(map_result, columns=["key", "value"])
                    map_result_df.to_csv(TEMP_FOLDER + '/part-tmp-%s.csv' % str(curr_map), index=False, header=True)
            curr_map += 1

        for temp_file_name in os.scandir(TEMP_FOLDER):
            csv_df = pd.read_csv(temp_file_name.path)
            csv_df.to_sql(TEMP_RESULTS_TBL, connection, if_exists='append', index=False)

        grouped_values = get_grouped_values(connection)

        curr_reduce = 0
        for reduce_value in grouped_values:
            with FunctionExecutor() as fexec:
                fut = fexec.call_async(reduce_function, (reduce_value[0], reduce_value[1]))
                reduce_result = fut.result()
                print(reduce_result)

                if reduce_result is not None:
                    result_df = pd.DataFrame(reduce_result, columns=["values"])
                    result_df.to_csv(FINAL_FOLDER + '/part-%s-final.csv' % curr_reduce, index=False, header=True)
            curr_reduce += 1

        return "MapReduce Completed"

In [36]:
def inverted_map(key, col):
    storage = Storage()
    buffer = storage.get_object(BUCKET_NAME, key, stream=True)

    values = pd.read_csv(filepath_or_buffer = buffer, usecols=[col], skiprows=1)

    return [(x[0], key) for x in values.to_records(index=False)]

In [37]:
def inverted_reduce(value, documents):
    ret_val = [value]
    temp_set = set(documents.split(','))
    ret_val.extend(temp_set)

    return ret_val

In [38]:
storage = Storage()
input_data =  storage.list_keys(BUCKET_NAME, prefix='input/')

print(input_data)

2022-01-11 16:34:42,577 [DEBUG] lithops.config -- Loading configuration from /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/.lithops_config
2022-01-11 16:34:42,580 [DEBUG] lithops.config -- Loading Storage backend module: ibm_cos
2022-01-11 16:34:42,580 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Creating IBM COS client
2022-01-11 16:34:42,581 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Set IBM COS Endpoint to https://s3.eu-de.cloud-object-storage.appdomain.cloud
2022-01-11 16:34:42,581 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Using access_key and secret_key
2022-01-11 16:34:42,587 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de


ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjectsV2 operation: The AWS Access Key ID you provided does not exist in our records.

In [39]:
connection = create_connection(DB_FILE_NAME)
create_table(connection, sql_create_temp_results_table)

2.6.0


In [40]:
if connection is not None:
    # create temp_results table
    create_table(connection, sql_create_temp_results_table)
else:
    print("Error! cannot create the database connection.")

In [41]:
mapreduce = MapReduceServerlessEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params={'column':0})
print(status)

2022-01-11 16:35:08,543 [INFO] lithops.config -- Lithops v2.5.8
2022-01-11 16:35:08,545 [DEBUG] lithops.config -- Loading configuration from /Users/phogelbookpro2019/PycharmProjects/BDP_Assignment3/.lithops_config
2022-01-11 16:35:08,548 [DEBUG] lithops.config -- Loading Serverless backend module: ibm_cf
2022-01-11 16:35:08,549 [DEBUG] lithops.config -- Loading Storage backend module: ibm_cos
2022-01-11 16:35:08,549 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Creating IBM COS client
2022-01-11 16:35:08,549 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Set IBM COS Endpoint to https://s3.eu-de.cloud-object-storage.appdomain.cloud
2022-01-11 16:35:08,550 [DEBUG] lithops.storage.backends.ibm_cos.ibm_cos -- Using access_key and secret_key
2022-01-11 16:35:08,554 [INFO] lithops.storage.backends.ibm_cos.ibm_cos -- IBM COS client created - Region: eu-de
2022-01-11 16:35:08,555 [DEBUG] lithops.serverless.backends.ibm_cf.ibm_cf -- Creating IBM Cloud Functions client
2022-01-11 16:3

ClientError: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key ID you provided does not exist in our records.

In [42]:
for file_name in os.scandir(TEMP_FOLDER):
    os.remove(file_name.path)
drop_temp_tables(connection)

# Question 3
## Shuffle

MapReduceServerlessEngine deploys both map and reduce tasks as serverless invocations.   
However, once map stage completed, the result are transferred from the map tasks to the SQLite database located on the client machine (laptop in your case), then performed local shuffle and then invoked reduce tasks passing them relevant parameters.

(To support your answers, feel free to use examples, Images, etc.)
<br><br>

**1. Explain why this approach is not efficient and what are cons and pros of such architecture in general. In broader scope you may assume that MapReduceServerlessEngine executed in some powerful machine and not just laptop.**

### Summary:
In general, moving from local to cloud, and local hardware to serverless has many advantages in speed, efficiency, fault-tolerance, and reliability, which is why more & more work has migrated in this direction ove the last 5-10 years. Using serverless invocations for both the Map and Reduce tasks enables the use of cloud computing infrastructure, improving speed and efficiency and reducing the need for local storage.

However in Step 1 we still run the shuffle step locally, on the SQL database located on our machine. This is a crucial step in MapReduce, and downloading all that data from the cloud to the local machine, performing the shuffle, and then invoking the Reduce step and sending the data back to the cloud, really compromises a lot of the benefits of running the Map & Reduce steps on the cloud. This can cause local memory issues and slow down the entire engine and output. Uploading all the data to the cloud and running the operations there, but then needing to download all the data to do the shuffle locally also just doesn't make a whole lot of sense from a workflow standpoint.

### Pros:
- don't need to pay as much, in theory, since the shuffle step is resource-intensive and is being run locally. However, this is only theory, as needing to download the data and run the shuffle locally could just as well end up costing more, either in direct costs of energy / bandwidth / compute, and/or indirect costs of affecting the performance of the local machine and/or other tasks.
- Localizing the shuffle step may provide more control or degrees of freedom to customize in some phases of the engine, if cloud provider doesn't allow certain actions or parameters that are important and specific to the desired design of the shuffle.


### Cons:
- Big Data: the larger the amount of data being handled by the MapReduceEngine, the more strain the local shuffle step puts on the local machine, especially when it's a laptop as it is in our case. Even with the assumption of using some super-powerful local machine, the limits of the amount of data it can handle and the amount of bandwidth for downloading the data and "sending back", are all finite, and pale in comparison to the scalability of utilizing cloud computing infrastructure.
- Failure & Fault-tolerance: regardless of the type of local machine, having the shuffle step handled locally increases the level of communication between the local machine and the cloud computing infrastructure, and more communication and coordination by default increases the possibilities and probabilities of encountering some failures along the way. This speaks to the "cost" savings we discussed above as a potential theoretical pro - as those savings could be trivial compared to the cost of failures.
- Losing benefits of serverless invocations of Map & Reduce, as the overall complexity of the operation will be driven by the slowest part of the chain, meaning no matter how much we gain in efficiency and fault-tolerance by running Map & Reduce on the cloud, the limitations of performing the shuffle locally (and the required transfer of the data to & from the local server) can and likely will drown out the gains from the serverless execution of Map & Reduce. As can be seen in the diagram below, there is no way around the shuffle step - it will by default be a rate-limiting step...




### Images:

![image-4.png](attachment:image-4.png)

![image-5.png](attachment:image-5.png)

![image-2.png](attachment:image-2.png)


<br><br>
**2. Suggest how can you improve shuffle so intermediate data will not be downloaded to the client at all and shuffle performed in the cloud as well. Explain pros and cons of the approaches you suggest.**


### Summary:
The best way to improve the shuffle step and streamline the entire engine in the cloud will involve some version of localizing the mapped data database to the cloud (SQL or otherwise) in object storage (including a choice about whether it should be in the same bucket or a separate one), and modifying the shuffle function itself to conform to being a serverless action in the cloud environment, and to the specifications of the cloud provider.

### Pros:
- enables much greater potential scalability for larger projects and big data
    - also enables using hybrid shuffle where slow and fast storage can be utilized when appropriate, and big-data shuffles can be partitioned into smaller chunks or rounds (*see image below).
- no need to manage local hardware / infrastructure, equipment, processing power, and most importantly, storage
- the entire engine is "under one roof", the data stays in one place, and any debugging and troubleshooting can be more efficient as it's all in the same common infrastructure.
    - fault-tolerance would be more reliable as the cloud service provider certainly has backup covered
- as a result, there is potential for a better cost-performance trade-off than would otherwise be achieved with the local shuffle

### Cons:
- potentially higher overall absolute cost. The relative cost could still be more worthwhile, but depending on the project and the data in question, a higher overall cost may be overkill and not the best use of resources
- potentially decreased customizability
- for very simple tasks in accessing the data, the latency would be higher and could take longer on the cloud when compared to running locally, for that very specific finite task.
- increasing the burden on the serverless engine and adding more and more functions comes with its own risks, in communication and coordination of functions, and depending on the amount of and type of functions, and the serverless engine's capabilities and development, this could create additional unwanted complexity.
- the fault-tolerance, error-rate, latency, and run-time would all be beholden to the functioning of the cloud infrastructure and servers. This can lead to situations that are out of the user's control compared to handling things on a local server/machine and having full control and transparency into everything.

### Images:

![image.png](attachment:image.png)

### Sources:

- https://www.researchgate.net/publication/338040188_REDUCING_DATA_SHUFFLING_AND_IMPROVING_MAP_REDUCE_PERFORMANCE_USING_ENHANCED_DATA_LOCALITY

- http://norma.ncirl.ie/4147/1/achyutanantakumarvadavadagi.pdf

- https://www.usenix.org/system/files/nsdi19-pu.pdf

<br><br>
**3. Can you make serverless shuffle?**


Yes, serverless shuffle IS possible. Can be done by creating an SQL database in object storage and then performing the shuffle in a similiar but serverless-conforming function. Alternatively, we could bypass SQL and write code that will handle the mapped data and perform the same shuffle & sort functionality. This can be done using the lithops call_async method (see references below) to dramatically simplify and reduce steps required. However, it is not a slam-dunk that this would be worthwhile, as it could consume more resources than are absolutely necessary, and also because of the shuffle being so critical to MapReduce and being inter-connected to both Map & Reduce sides, there is potential to complicate the workflow and reduce the efficiency of the overall engine, depending on a lot of other factors.

#### Lithops call_async references:

- https://github.com/lithops-cloud/lithops/blob/master/examples/call_async.py)
- https://www.ibm.com/cloud/blog/serverless-without-constraints

<br><br><br><br>
Good Luck :) 