Skip to content

Word Count problem solved by using a custom implementation

Notifications You must be signed in to change notification settings

zoraida/map-reduce-task

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Map Reduce Task - Word Count

Requirements

To install and run these examples you need:

  • Python 2.7 or 3.3+
  • virtualenv (not required if you are using Python 3.4)
  • git (only to clone this repository)

Installation

The commands below set everything up to run the examples:

$ git clone https://github.com/zoraida/map-reduce-task.git
$ cd map-reduce-task
$ virtualenv venv
$ . venv/bin/activate
(venv) pip install -r requirements.txt
$ python run_driver.py --mappers 4 --reducers 2 --port 5000 --i_path /path/to/input/dir --m_path /path/to/intermediate/dir  --o_path /path/to/output/dir
$ python run_worker.py --driver http://localhost:5000 # You need to include the protocol scheme

How it works

The driver exposes an API:

$ PUT /task/ for requesting a task to be run by a worker
$ POST /task/{type}/{id}/status for update task status
$ GET /task/{type}/{id}/status for getting taks status
  • For now I want to try to keep both driver and worker simple:
    • All tasks states are keep in memory.

    • The driver keeps READY and RUNNING queues. Initially only map tasks are on the READY queue whereas reduce tasks are on a BLOCKED status.

    • When a worker request a task(PUT /task end point):

      • Only when all the map tasks have finished reduce tasks start: if all map tasks have finished and reduce tasks are in a BLOCKED status then they are added to the READY queue.
      • If there is a READY task then it is given to a requester worker and moved it to the RUNNING queue.
      • If there is no a READY task but there are either RUNNING or BLOCKING tasks, then the worker is invited to try later.
      • If all the tasks are in a FINISHED status, then the worker is notified thus it exits successfully. If all the workers have been already notified then the driver exits successfully too.
    • When a worker notifies a finished task(POST /task/{type}/{id}/status end point):

      • The task is removed from the RUNNING queue and its status is changed to FINISHED.

Some thoughts

  • I would like to have implemented other approach for reduce tasks but for now, since a given map task may generate word counts for any word (thus any reducer) and data cannot be sent among processes, I found it the simplest even not the most efficient approach. I considered allowing the driver to assign a reduce task to a requester worker before all map tasks completion, just letting it monitor its input files. However, if any running map fails and there are no more workers available, it would produce a deadlock :( Some preemption mechanism???
  • Thinking on how to acknowledge the driver that workers are still alive:
    • providing a heartbeat end point to notice the driver that workers are still alive. Still not sure on timings and how to doit with a single threaded process on the worker. Maybe using multithread library and switching CPU between the main thread and a simple one that notifies the driver so I need some mechanism to be "regular".
    • Is the driver who knows the pid of the workers and pulls on the OS.

Other considerations

  • For the last 6 years I have been programing in either Scala or Java. I took this opportunity to play around with Python so please, be nice with me :)
  • I guess with gRPC I would be able to solve some the questions raised above. However, I found Flask easier to use, so I choose it instead.

Examples of API usage

Requesting a task:

curl -H "Accept: application/json" -H "Content-Type: application/json" -H "worker-pid: 1234" -X PUT http://localhost:5000/task
{
  "i_path": [
    "file2"
  ],
  "id": 1,
  "job_status": 1,
  "job_uuid": "af2254b5-4e08-480f-8ae5-addba4014584",
  "n_buckets": 1,
  "o_path": "/path/intermediate",
  "status": 3,
  "type": "mapper"
}

Getting the status of a task:

curl -H "Accept: application/json" -H "Content-Type: application/json" -H "worker-pid: 1234" -X GET http://localhost:5000/task/mapper/0/status
{
  "status": 2
}

Updating the status of a task:

curl -H "Accept: application/json" -H "Content-Type: application/json" -H "worker-pid: 1234" -X POST http://localhost:5000/task/mapper/1/status -d '{"status":4}'

About

Word Count problem solved by using a custom implementation

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages