In [18]:
import re
import threading
from queue import Queue

# read input file into memory
with open('input.txt', 'r') as f:
    text = f.read()

def map_fn(text, output_queue):
    words = re.findall(r'\b\w+\b', text.lower())
    mapped_values = [(w, 1) for w in words]
    output_queue.put(mapped_values)

def shuffle_fn(mapped_values):
    shuffle_dict = {}
    for key, value in mapped_values:
        if key in shuffle_dict:
            shuffle_dict[key].append(value)
        else:
            shuffle_dict[key] = [value]
    return shuffle_dict.items()

def reduce_fn(key, values, output_queue):
    reduce_output = (key, sum(values))
    output_queue.put(reduce_output)

# define number of threads
num_threads = 4

# create queue for map function output
map_output_queue = Queue()

# split text into chunks for each thread
chunk_size = len(text) // num_threads
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

# create threads and run map function on each chunk
threads = []
for i in range(num_threads):
    t = threading.Thread(target=map_fn, args=(chunks[i], map_output_queue))
    threads.append(t)
    t.start()

# collect output from map function
mapped_values = []
for i in range(num_threads):
    mapped_values.extend(map_output_queue.get())
print("Collecting output from map function")
print(mapped_values)
print("\n")

# run shuffle function to group mapped values by key
shuffle_output = shuffle_fn(mapped_values)
print("After running shuffle function to group mapped values by key")
print(shuffle_output)
print("\n")

# create queue for reduce function output
reduce_output_queue = Queue()

# create threads and run reduce function on each key
threads = []
for key, values in shuffle_output:
    t = threading.Thread(target=reduce_fn, args=(key, values, reduce_output_queue))
    threads.append(t)
    t.start()

# collect output from reduce function
reduce_output = []
for i in range(len(threads)):
    reduce_output.append(reduce_output_queue.get())
print("Collecting the output from the reduce function")
print(reduce_output)
print("\n")

# combine output from reduce function to get final word count
word_count = {key: value for key, value in reduce_output}
print("Combining output from reduce to get final word count")
print(word_count)


Collecting output from map function
[('the', 1), ('quick', 1), ('brown', 1), ('fo', 1), ('x', 1), ('jumps', 1), ('over', 1), ('the', 1), ('l', 1), ('azy', 1), ('dog', 1), ('the', 1), ('lazy', 1), ('dog', 1), ('was', 1), ('not', 1), ('amused', 1)]


After running shuffle function to group mapped values by key
dict_items([('the', [1, 1, 1]), ('quick', [1]), ('brown', [1]), ('fo', [1]), ('x', [1]), ('jumps', [1]), ('over', [1]), ('l', [1]), ('azy', [1]), ('dog', [1, 1]), ('lazy', [1]), ('was', [1]), ('not', [1]), ('amused', [1])])


Collecting the output from the reduce function
[('the', 3), ('quick', 1), ('brown', 1), ('fo', 1), ('x', 1), ('jumps', 1), ('over', 1), ('l', 1), ('azy', 1), ('dog', 2), ('lazy', 1), ('was', 1), ('not', 1), ('amused', 1)]


Combining output from reduce to get final word count
{'the': 3, 'quick': 1, 'brown': 1, 'fo': 1, 'x': 1, 'jumps': 1, 'over': 1, 'l': 1, 'azy': 1, 'dog': 2, 'lazy': 1, 'was': 1, 'not': 1, 'amused': 1}
