<a href="https://colab.research.google.com/github/saha-nah/DCDM/blob/main/miniMapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **MiniMapReduce**
A non-parallel, non-scalable Map-Reduce implementation

In [None]:
def groupByKey(data):
    result = dict()
    for key, value in data:
        if key in result:
            result[key].append(value)
        else:
            result[key] = [value]
    return result
        
def reduceByKey(f, data):
    key_values = groupByKey(data)
    return map(lambda key: 
                   (key, reduce(f, key_values[key])), 
                       key_values)

## **WORD COUNT USING MINI_MAPREDUCE**

In [None]:
data = map(lambda x: (x, 1), "to be or not to be".split())
data
groupByKey(data)

{'to': [1, 1], 'be': [1, 1], 'or': [1], 'not': [1]}

In [None]:
reduceByKey(lambda x, y: x + y, data)

<map at 0x7f464e392880>

# **Parallelising MiniMapReduce**

*   We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework by using the my_map_multithreaded function we defined earlier.

*   This will allow us to perform map-reduce computations that exploit parallel processing using multiple cores on a single computer.


In [None]:
def reduceByKey_multithreaded(f, data):
    key_values = groupByKey(data)
    return my_map_multithreaded(
        lambda key: (key, reduce(f, key_values[key])), key_values.keys())

In [None]:
from threading import Thread

def schedule_computation_threaded(f, result, data, threads, i):    
    # Each function evaluation is scheduled on a different core.
    def my_job(): 
        print ("Processing data:", data[i], "... ")
        result[i] = f(data[i])
        print ("Finished job #", i)    
        print ("Result was", result[i])       
    threads[i] = Thread(target=my_job)
    
def my_map_multithreaded(f, data):
    n = len(data)
    result = [None] * n
    threads = [None] * n
    print ("Scheduling jobs.. ")
    for i in range(n):
        schedule_computation_threaded(f, result, data, threads, i)
    print ("Starting jobs.. ")
    for i in range(n):
        threads[i].start()
    print ("Waiting for jobs to finish.. ")
    for i in range(n):
        threads[i].join()
    print ("All done.")
    return result

In [None]:
reduceByKey_multithreaded(lambda x, y: x + y, data)

Scheduling jobs.. 
Starting jobs.. 
Waiting for jobs to finish.. 
All done.


[]

In [None]:
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])

Scheduling jobs.. 
Starting jobs.. 
Processing data: 1 ... 
Finished job # 0
Result was 1
Processing data: 2 ... 
Finished job # 1
Result was 4
Processing data: 3 ... 
Finished job # 2
Result was 9
Processing data:Processing data: 5 ... 
Finished job # 4
Result was 25
Waiting for jobs to finish.. 
 4 ... 
Finished job # 3
Result was 16
All done.


[1, 4, 9, 16, 25]

In [None]:
from numpy.random import uniform
from time import sleep

def a_function_which_takes_a_long_time(x):
    sleep(uniform(2, 10))  # Simulate some long computation
    return x*x

my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])

Scheduling jobs.. 
Starting jobs.. 
Processing data: 1 ... 
Processing data: 2 ... 
Processing data: 3 ... 
Processing data: Processing data: 5 ... 
Waiting for jobs to finish.. 
4 ... 
Finished job # 4
Result was 25
Finished job # 3
Result was 16
Finished job # 2
Result was 9
Finished job # 0
Result was 1
Finished job # 1
Result was 4
All done.


[1, 4, 9, 16, 25]