# Task 2: MapReduce with Ray Actors

This is a simple task of running MapReduce with Ray actors. For reference, you can use the implementation of MapReduce with Ray tasks [here](https://github.com/maxpumperla/learning_ray/blob/main/notebooks/ch_02_ray_core.ipynb) (Make sure to scroll to the end of the notebook).

The task is to use MapReduce to count the number of occurrences for each word in a block of text. We'll be using a `Mapper` and `Reducer` actor. Implement the MapReduce algorithm in the following way - 
  -  You will employ Mappers equivalent to the count of `NUM_CPUS` workers in your Ray cluster, and segment the text into partitions to evenly distribute the workload among the workers. 
  - You will then assign these partitions to each Mapper, and compute the word counts on partitions assigned to them using `.map`
  - Once the counts are computed by the Mappers, you will use `.reduce` from your `Reducer` to combine the results from all partitions


In [1]:
import ray
import re
import time
import string 
import json 
import os

NUM_CPUS=8

ray.shutdown()
ray.init(include_dashboard=True)

2025-11-02 07:13:10,391	INFO worker.py:2003 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8266 [39m[22m


0,1
Python version:,3.11.9
Ray version:,2.51.1
Dashboard:,http://127.0.0.1:8266


In [5]:

@ray.remote
class Mapper:
    def __init__(self):
        self.word_counts = {}

    def map(self, lines):
        # YOUR CODE HERE
        counts = {}
        for line in lines:
            words = re.findall(r"[a-zA-Z0-9]+", line.lower())
            for word in words:
                counts[word] = counts.get(word, 0) + 1
        self.word_counts = counts
        return counts
    #raise NotImplementedError()

    def get_counts(self):
        return self.word_counts

@ray.remote
class Reducer:
    def __init__(self):
        self.word_counts = {}

    def reduce(self, counts):
        # YOUR CODE HERE
        for count_dict in counts:
            for word, cnt in count_dict.items():
                self.word_counts[word] = self.word_counts.get(word, 0) + cnt
        return self.word_counts
    #raise NotImplementedError()

    def get_counts(self):
        return self.word_counts


def main(text):

    lines = text.split(". ")
    cl = [line.strip() for line in lines]

    # Please complete the core MapReduce algorithm here

    # YOUR CODE HERE
    cleaned_text = " ".join(cl).strip()
    if not cleaned_text:
        return {}

    tokens = re.findall(r"[a-zA-Z0-9]+", cleaned_text.lower())

    if not tokens:
        return {}

    def partition_data(data, num_partitions):
        partitions = []
        base, remainder = divmod(len(data), num_partitions)
        start = 0
        for idx in range(num_partitions):
            extra = 1 if idx < remainder else 0
            end = start + base + extra
            partitions.append(data[start:end])
            start = end
        return partitions

    partitions = partition_data(tokens, NUM_CPUS)

    mappers = [Mapper.remote() for _ in range(len(partitions))]
    reducer = Reducer.remote()

    map_refs = [mappers[i].map.remote(partitions[i]) for i in range(len(partitions))]

    mapper_outputs = ray.get(map_refs)

    final_counts = ray.get(reducer.reduce.remote(mapper_outputs))
    #raise NotImplementedError()

    return final_counts

# Wait, why am I using Ray for this again?

We're dealing with a toy example with a small text file here. But even now, note that this code will run worker processes across all the nodes in a Ray cluster. The same code written for a single machine can scale to a cluster now.

In [6]:
# ray.shutdown()   # stop all Ray instances cleanly
# ray.init()

# # launch a test mapper!
mapper = Mapper.remote()
# launch a test reducer
reducer = Reducer.remote()

# Wait briefly for Ray to register actors
time.sleep(0.5)

# let's check their status
for actor_info in ray._private.state.actors().values():
    if actor_info["State"] == "ALIVE":
        print(actor_info)

{'ActorID': 'c84b4e7adfca92f25b70f62801000000', 'ActorClassName': 'Mapper', 'IsDetached': False, 'Name': '', 'JobID': '01000000', 'Address': {'IPAddress': '10.32.73.157', 'Port': 44793, 'NodeID': '2dda4f324129112302c3d72004ced534b75e45509fd0df12c8028a39'}, 'OwnerAddress': {'IPAddress': '10.32.73.157', 'Port': 39981, 'NodeID': '2dda4f324129112302c3d72004ced534b75e45509fd0df12c8028a39'}, 'State': 'ALIVE', 'NumRestarts': 0, 'Timestamp': 1762067994324.0, 'StartTime': 1762067994324, 'EndTime': 0, 'DeathCause': , 'Pid': 19191}
{'ActorID': 'a40dea83c7a9d261a7c6fec901000000', 'ActorClassName': 'Mapper', 'IsDetached': False, 'Name': '', 'JobID': '01000000', 'Address': {'IPAddress': '10.32.73.157', 'Port': 43215, 'NodeID': '2dda4f324129112302c3d72004ced534b75e45509fd0df12c8028a39'}, 'OwnerAddress': {'IPAddress': '10.32.73.157', 'Port': 39981, 'NodeID': '2dda4f324129112302c3d72004ced534b75e45509fd0df12c8028a39'}, 'State': 'ALIVE', 'NumRestarts': 0, 'Timestamp': 1762067993848.0, 'StartTime': 17620

In [7]:
# get word counts for the example text file
essay_path = os.path.expanduser("~/public/pa2/Essay.txt")

with open(essay_path, 'r') as file:
    text = file.read()
final_counts = main(text)

In [8]:
task2_output_path = os.path.expanduser("~/public/pa2/task2_expected_output.txt")

with open(task2_output_path, "r") as f:
    expected_out = json.load(f)
assert expected_out == final_counts


print("? Verified: Task 2 output.")

AssertionError: 

In [9]:
# shutdown!
ray.shutdown()