In [44]:
# worker.py
import http.client
import json
import os
from http.server import BaseHTTPRequestHandler, HTTPServer

# Driver

In [64]:
class Driver(BaseHTTPRequestHandler):
    def do_GET(self):
        """
        Returns a task type and task ID in JSON format to a request from the worker.
        """

        task = None #We initialize the variable to None, which will handle finished tasks (see below)
        
        #The task is requested by a call of the form .request('GET', '/map'). Hence, by splitting and calling the last element, we get
            #the kind of task requested.
        type = self.path.split('/')[-1]

        # We assign tasks by extracting the task ID from the dictionary and eliminating it from there, using pop
        if type == 'map' and tasks['map']: # If there are no tasks of a type left, this returns False
            task = tasks['map'].pop(0)  
        elif task_type == 'reduce' and tasks['reduce']:
            task = tasks['reduce'].pop(0)  
        
        # We send the response. 
        if task is not None:
            reply = {'task_type': task_type, 'task_id': task}
        else:
            reply = {'task_type': 'no_tasks'} # If there are no tasks of a kind left, we return 'no_tasks'
        
        self.send_response(200) # We signal a succesful request
        # We will send the reply using a JSON format
        self.send_header('Content-type', 'application/json') 
        self.end_headers()
        self.wfile.write(json.dumps(reply).encode('utf-8'))


    #################################
    def do_POST(self):
        """
        Processes a notification of completed task from a worker.
        """
        
        # We read the JSON coming from the worker
        size = int(self.headers['Content-Length'])
        message = json.loads(self.rfile.read(size).decode('utf-8'))
        
        task = message['type']
        id = message['id']
        completed_tasks[task].append(id) # We add the completed task to the dictionary
        
        self.send_response(200)# We signal a succesful request
        self.end_headers()

    #################################

# Worker

In [59]:
def _map(task_id, num_reduces):
    """
    Performs the map task. Saves the intermediate file to disk with a filename mr-{task_id}-{bucket_id}.txt.

    Parameters:
        task_id (int): ID of the task to be performed.
        num_reduces (int): Number of total reduce tasks.        
    """
    input = f'temp/tasks/{task_id}.txt'
    with open(input, 'r') as f:
        text = f.read().split()
    
    # Bucket words by the first letter modulo M
    for word in text:
        
        #We do not distinguish uppercas or lowercase
        word = word.lower()
        
        # We set the buket ID using the unicode code for the first character. 
        bucket_id = ord(word[0].lower()) % num_reduces 

        # We append the words to the intermediate file 
        intermediate = f'temp/intermediate/mr-{task_id}-{bucket_id}.txt'
        with open(intermediate, 'a') as bucket:
            bucket.write(f"{word}\n")

In [60]:
def _reduce(bucket_id, num_maps):

    """
    Performs the reduce task for the given bucket_id and all the maps.

    Parameters:
        bucket_id (int): ID of the bucket on which the function performs the reduce task.
        num_maps (int): Number of map tasks that have produced buckets.
    """

    counts = {} # Empty dictionary to store the count

    # We run over the map tasks to collect all bucket files
    for map_id in range(num_maps): 
        intermediate = f'temp/intermediate/mr-{map_id}-{bucket_id}.txt'
        if not os.path.exists(intermediate):
            continue
        with open(intermediate, 'r') as f:
            for word in f:
                word = word.strip() # We remove spaces in and after the word, just in case
                counts[word] = counts.get(word, 0) + 1 # We add one to the word count in the dictionary
    
    # Write the output to a file
    output = f'temp/output/out-{bucket_id}.txt'
    with open(output, 'w') as f:
        for word, count in counts.items():
            f.write(f"{word} {count}\n")

In [61]:
def _request(driver_IP):
    """
    Requests a task to the driver.

    Parameters:
        driver_IP (string): IP address of the driver, eg. 'localhost:8000'.

    Returns:
        reply_json (JSON): File with JSON data of the task requested.
    """

    driver = http.client.HTTPConnection(driver_IP) # We connect to the driver
    
    driver.request('GET', '/map')  # We first ask the driver for a map task
    
    # We obtain the reply from the driver and read it in JSON format 
    reply = driver.getresponse() 
    reply_json = json.loads(reply.read()) 
    
    # Checks if the map tasks are done. If so, asks for a reduce task
    if reply_json['task_type'] == 'done':
        driver.request('GET', '/reduce')
        reply = driver.getresponse()
        reply_json = json.loads(driver.read())

    connection.close()
    return reply_json

In [62]:
def _done(driver_IP, task, id):

    """
    Notifies the driver that the task is done.

    Parameters:
        driver_IP (string): IP address of the driver, eg. 'localhost:8000'.
        task (string): Task that has been performed (map or reduce).
        id: ID of the task done.
        
    """
    
    connection = http.client.HTTPConnection(driver_IP) # We connect to the driver

    #We save the output in a json file that will be sent to the server through the POST method
    head = {'Content-type': 'application/json'}
    info = json.dumps({'task': task, 'id': id})
    connection.request('POST', '/', info, head)
    connection.getresponse()
    connection.close()

In [63]:
def Worker(driver_IP, num_maps, num_reduces):
    """
    Function that runs the worker in a loop.

    Parameters:
        driver_IP (string): IP address of the driver, eg. 'localhost:8000'.
         num_maps (int): Number of map tasks. 
         num_reduces (int): Number of reduce tasks.
    """
    
    while True: #We set an infinite loop so that the worker continuously request the driver for work to do.
        
        reply = _request(driver_IP) # We request a task. Depending on the type we do one of another thing.
        task = reply['task']
        
        if task == 'no_tasks':
            # If there are no tasks available, we turn off the worker by breaking the loop.
            print("All tasks are done. Shutting down the worker.")
            break

        # Otherwise we retrieve the id and tell the worker to perform the adequate task.
        id = reply['id']
        
        if task == 'map':
            print(f"Map {task_id}")
            _map(id, num_reduces)
        elif task_type == 'reduce':
            print(f"Reduce {task_id}")
            _reduce(id, num_maps)
        
        # We end up by letting the driver know that the job is finished.
        _done(driver_host, task_type, task_id)


# Test

In [65]:
N = 5  # Number of map tasks
M = 3  # Number of reduce tasks

tasks = {'map': list(range(N)), 'reduce': list(range(M))}  # Tasks available to assign
completed_tasks = {'map': [], 'reduce': []}  # Tasks that have been completed

In [68]:
def run_driver(port):
    server_address = ('', port)
    httpd = HTTPServer(server_address, Driver)
    print(f"Driver Server running on port {port}")
    httpd.serve_forever()

In [69]:
run_driver(8080)

Driver Server running on port 8080


KeyboardInterrupt: 

In [71]:
for i in range(3):
    print(i)

0
1
2


In [72]:
ts = [0,1,2]

In [77]:
ts.pop(0)

2

In [78]:
ts

[]

In [79]:
ts == True

False