In [1]:
# ## Set up Jupyter Kernel, select Python 3 (ipykernel).
# ## Only need to run once.
# ## restart the kernel after finishing installation

# !pip install "ray==2.10.0"
# !pip install "pyarrow==18.1.0"
# !pip install "modin==0.37.0"

In [2]:
## Verify installations:
!python -c "import ray; print('ray:', ray.__version__)"
!python -c "import modin; print('modin:', modin.__version__)"

ray: 2.10.0
modin: 0.37.0


## Task 2
#### Task 2.1

In [3]:
# Note: We include only 10M data in the csv file to fit the memory of Datahub.

import ray, json, os
import time
import numpy as np
import modin.pandas as pd
from modin.utils import reload_modin
ray.shutdown()
reload_modin()
ray.init(num_cpus=4)

def run_task2(path):
    ## DO NOT MODIFY: START 
    start_time = time.perf_counter()
    raw_df = pd.read_csv(path)
    ## DO NOT MODIFY: End 

    # YOUR CODE HERE
    # Compact vote parsing: normalize missing, convert comma-decimals, remove thousands commas,
    # coerce to numeric, fill missing with 0, and clip extreme outliers at the 99th percentile.
    v = raw_df['vote'].replace('-', pd.NA).astype(str).str.strip()
    v = v.str.replace(r',(?=\d{1,2}$)', '.', regex=True).str.replace(',', '', regex=False)
    v = v.replace({'nan': None, '': None})
    # Convert to numeric and keep NaN for missing values so mean() ignores them
    raw_df['vote'] = pd.to_numeric(v, errors='coerce')
    raw_df['rating_bucket'] = raw_df['overall'].round().astype('Int64')
    # Convert unixReviewTime to year
    raw_df['reviewYear'] = pd.to_datetime(raw_df['unixReviewTime'], unit='s').dt.year

    # Treat missing votes as 0 for the avg_helpful_votes_per_review calculation
    raw_df['vote_filled'] = raw_df['vote'].fillna(0.0)
    output = raw_df.groupby('rating_bucket').agg(
    	n_reviews=('reviewerID', 'size'),
    	n_unique_reviewers=('reviewerID', 'nunique'),
    	sum_helpful_votes=('vote_filled', 'sum'),
    	earliest_review_year=('reviewYear', 'min')
    ).reset_index()
    # compute average helpful votes per review using sum / n_reviews (includes missing as zeros)
    output['avg_helpful_votes_per_review'] = output['sum_helpful_votes'] / output['n_reviews']
    output = output.drop(columns=['sum_helpful_votes'])
    
    ## DO NOT MODIFY: START 
    submit = output.describe().round(2)
    duration_s = time.perf_counter() - start_time
    print(f"Processing time (excluding file write): {duration_s:.3f} s")
    current_data = json.loads(submit._to_pandas().to_json())
    return current_data
    ## DO NOT MODIFY: END 

2025-10-12 20:37:19,212	INFO worker.py:1752 -- Started a local Ray instance.


In [4]:
def compare_json_files(current_data):
        # Load both JSON files
    path = os.path.expanduser("~/public/origin_results_PA1_task2_1.json")
    with open(path, 'r') as f:
        origin_data = json.load(f)
        
    # Track if any error > 1% exists
    has_error_over_1_percent = False
    
    # Compare each metric
    for metric in origin_data.keys():
        print(f"\n=== Comparing {metric} ===")
        
        for stat in origin_data[metric].keys():
            origin_val = origin_data[metric][stat]
            current_val = current_data[metric][stat]
            
            # Calculate error percentage
            if origin_val != 0:
                error_percent = abs(current_val - origin_val) / abs(origin_val) * 100
            else:
                error_percent = 0 if current_val == 0 else float('inf')
            
            # Only print if error > 1%
            if error_percent > 1.0:
                print(f"  {stat}: {current_val} vs {origin_val} (error: {error_percent:.2f}%)")
                has_error_over_1_percent = True
    
    # Assert if there exists an error > 1%
    assert not has_error_over_1_percent, "Found errors greater than 1% between the JSON files"
    
    if not has_error_over_1_percent:
        print("\n✓ All comparisons within 1% tolerance")

In [5]:
raw_dataset_path = "~/public/modin_dev_dataset_10M_rows.csv"
current_data = run_task2(raw_dataset_path)
compare_json_files(current_data)

Processing time (excluding file write): 25.945 s

=== Comparing rating_bucket ===

=== Comparing n_reviews ===

=== Comparing n_unique_reviewers ===

=== Comparing avg_helpful_votes_per_review ===

=== Comparing earliest_review_year ===

✓ All comparisons within 1% tolerance


#### Task 2.2
Change the number of CPUs used by Modin or the Ray backend ([documentation here](https://modin.readthedocs.io/en/stable/getting_started/using_modin/using_modin_locally.html#advanced-configuring-the-resources-modin-uses)) on your instance and run your data manipulation code. Document the execution times you see with 1, 2, 3, and 4 CPUs. Is it a linear speedup? If not, why?

Note:
* We won't check the processing time for Task2. So in your submission, it doesn't matter how many CPUs you set in ray.init(num_cpus=X).

In [6]:
import contextlib
from modin.utils import reload_modin

@contextlib.contextmanager
def _reconfigure_ray(num_cpus: int):
    """Rebind Modin to a fresh Ray cluster with the requested CPU count."""
    if ray.is_initialized():
        ray.shutdown()
    reload_modin()
    ray.init(num_cpus=num_cpus)
    try:
        yield
    finally:
        # keep cluster alive for interactive use
        pass


def measure_task2_runtimes(cpus=(1, 2, 3, 4)):
    timings = {}
    for c in cpus:
        with _reconfigure_ray(c):
            t0 = time.perf_counter()
            _ = run_task2(raw_dataset_path)  # uses your Task 2.1 pipeline
            t1 = time.perf_counter()
            timings[c] = t1 - t0
            print(f"CPUs={c}: {timings[c]:.2f}s")
    return timings

# Run and print speedups/efficiency
_timings_2_2 = measure_task2_runtimes()
print("\nTimings:", _timings_2_2)
_base = _timings_2_2[1]
for c in sorted(_timings_2_2):
    _s = _base / _timings_2_2[c]
    print(f"CPUs={c}: speedup={_s:.2f} (ideal={c}, efficiency={_s/c:.1%})")

2025-10-12 20:37:50,699	INFO worker.py:1752 -- Started a local Ray instance.


Processing time (excluding file write): 71.143 s
CPUs=1: 71.15s


2025-10-12 20:39:07,385	INFO worker.py:1752 -- Started a local Ray instance.


Processing time (excluding file write): 39.509 s
CPUs=2: 39.52s


2025-10-12 20:39:52,348	INFO worker.py:1752 -- Started a local Ray instance.


Processing time (excluding file write): 29.599 s
CPUs=3: 29.61s


2025-10-12 20:40:27,446	INFO worker.py:1752 -- Started a local Ray instance.


Processing time (excluding file write): 25.067 s
CPUs=4: 25.07s

Timings: {1: 71.15424335561693, 2: 39.51682998565957, 3: 29.607038429006934, 4: 25.074957669246942}
CPUs=1: speedup=1.00 (ideal=1, efficiency=100.0%)
CPUs=2: speedup=1.80 (ideal=2, efficiency=90.0%)
CPUs=3: speedup=2.40 (ideal=3, efficiency=80.1%)
CPUs=4: speedup=2.84 (ideal=4, efficiency=70.9%)


The speedup is not linear (ideal would be 4× at 4 CPUs). Using Amdahl’s-law style reasoning, the observed speedups imply ≈86–90% of the workload is parallelizable; the remaining serial fraction, plus scheduling/serialization and I/O overheads, limit scaling. 

In practice the following factors cause the sub-linear speedup:
1. Non-parallel work (I/O, CSV parsing, small serial steps) — Amdahl’s law limits maximum speedup.
2. Ray/Modin overheads (task scheduling, serialization/deserialization, actor startup).
3. Data skew and partition imbalance (some partitions/partitions take longer → stragglers).
4. Resource contention (disk bandwidth, memory pressure, GC) when adding workers.

Practical takeaway: expect sub-linear but meaningful speedups; to improve scaling, profile the pipeline, minimize serialization and small tasks, tune partitioning, and avoid re-initializing Ray in the timed loop.

## Task 3
#### Task 3.1

In [7]:
"""
Plain merge sort algorithm 
"""
import heapq
from typing import List
import time
import ray
import numpy as np
import json

num_workers = 4
ray.shutdown()
ray.init(num_cpus=num_workers)

def merge(sublists: List[list]) -> list:
    """
    Merge sorted sublists into a single sorted list.

    :param sublists: List of sorted lists
    :return: Merged result
    """
    result = []
    sublists = [sublist for sublist in sublists if len(sublist)> 0]
    heap = [(sublist[0], i, 0) for i, sublist in enumerate(sublists)]
    heapq.heapify(heap)
    while len(heap):
        val, i, list_ind = heapq.heappop(heap)
        result.append(val)
        if list_ind+1 < len(sublists[i]):
            heapq.heappush(heap, (sublists[i][list_ind+1], i, list_ind+1))
    return result

def plain_merge_sort(collection: list, npartitions: int = 4) -> list:
    """
    Sorts a list using the merge sort algorithm. Breaks the list into multiple partitions.

    :param collection: A mutable ordered collection with comparable items.
    :return: The same collection ordered in ascending order.

    Time Complexity: O(n log n)

    Examples:
    >>> merge_sort([0, 5, 3, 2, 2])
    [0, 2, 2, 3, 5]
    >>> merge_sort([])
    []
    >>> merge_sort([-2, -5, -45])
    [-45, -5, -2]

    Modified from: https://github.com/TheAlgorithms/Python/
    """

    if len(collection) < npartitions:
        return sorted(collection)
    breaks = [i*len(collection)//npartitions for i in range(npartitions)]
    breaks.append(len(collection))
    sublists = [collection[breaks[i]:breaks[i+1]] for i in range(len(breaks)-1)]
    sorted_sublists = [plain_merge_sort(sublist, npartitions=2) for sublist in sublists] # just use 2 partitions in recursive calls
    return merge(sorted_sublists)

@ray.remote
def ray_merge_sort(collection, pair, npartitions):
    return plain_merge_sort(collection[pair[0]:pair[1]], npartitions)

2025-10-12 20:40:57,724	INFO worker.py:1752 -- Started a local Ray instance.


In [8]:
def merge_sort_ray(collection_ref: ray.ObjectRef, length: int, npartitions: int = 4) -> list:
    """
    Merge sort with ray
    """
    ## DO NOT MODIFY: START    
    breaks = [i*length//npartitions for i in range(npartitions)]
    breaks.append(length)
    # Keep track of partition end points
    sublist_end_points = [(breaks[i], breaks[i+1]) for i in range(len(breaks)-1)]
    ## DO NOT MODIFY: END
    
    # YOUR CODE HERE
    # Launch Ray tasks for each partition
    futures = [
        ray_merge_sort.remote(collection_ref, pair, npartitions)
        for pair in sublist_end_points
    ]
    sorted_sublists = ray.get(futures)
    # Pass your list of sorted sublists to `merge`
    return merge(sorted_sublists)

In [9]:
# We will be testing your code for a list of size 10M. Feel free to edit this for debugging. 
list1 = list(np.random.randint(low=0, high=1000, size=2_000_000))
list2 = [c for c in list1] # make a copy
length = len(list2)
list2_ref = ray.put(list2) # insert into the driver's object store

start1 = time.time()
list1 = plain_merge_sort(list1, npartitions=num_workers)
end1 = time.time()
time_baseline = end1 - start1
print("Plain sorting:", time_baseline)

start2 = time.time()
list2 = merge_sort_ray(collection_ref=list2_ref, length=length, npartitions=num_workers)
end2 = time.time()
time_ray = end2 - start2
print("Ray sorting:", time_ray)

speedup = time_baseline / time_ray
print("Speedup: ", speedup)
# Save timing metrics to JSON
results = {
    "time_baseline": time_baseline,
    "time_ray": time_ray,
    "speedup": speedup
}
assert sorted(list1) == list2, "Sorted lists are not equal"
assert speedup > 1.4

Plain sorting: 18.880656480789185
Ray sorting: 12.017321348190308
Speedup:  1.5711202133770374


#### Task 3.2
The ideal speedup you can get for a task with 4 workers is, of course, 4. Can you estimate the theoretical maximum speedup you can get for the above merge sort algorithm (in terms of the time for sorting sublists, and the time for merging)? How do you account for the difference between the theoretical result and the observed speedup with Ray?

In [10]:
import heapq


def instrumented_four_way_mergesort(lst, npartitions: int = 4):
    """Time the sublist sorts (parallelizable) and the final merge (mostly serial)."""
    n = len(lst)
    breaks = [i * n // npartitions for i in range(npartitions)] + [n]
    parts = [lst[breaks[i]:breaks[i + 1]] for i in range(npartitions)]

    # Time sublist sorts (represents work that would be split across p workers)
    t0 = time.perf_counter()
    sorted_parts = [plain_merge_sort(p) for p in parts]
    t1 = time.perf_counter()
    t_sort_sublists = t1 - t0

    # Time serial merge on the driver
    t2 = time.perf_counter()
    _merged = list(heapq.merge(*sorted_parts))
    t3 = time.perf_counter()
    t_merge = t3 - t2

    return {
        "t_sort_sublists": t_sort_sublists,
        "t_merge": t_merge,
        "t_total": (t1 - t0) + (t3 - t2),
    }


def theoretical_speedup_from_timings(tinfo: dict, p: int = 4):
    """S_max(p) = T_total / (T_sort/p + T_merge)."""
    Tsort, Tmerge, Ttotal = tinfo["t_sort_sublists"], tinfo["t_merge"], tinfo["t_total"]
    return Ttotal / ((Tsort / p) + Tmerge)

# Use the same list length as in the timing experiment above to keep apples-to-apples
N_theory = length if 'length' in globals() else 200_000
_arr = list(np.random.randint(low=0, high=10_000_000, size=N_theory))
_tinfo = instrumented_four_way_mergesort(_arr, npartitions=4)
_Smax = theoretical_speedup_from_timings(_tinfo, p=4)

print("Instrumented timings:", _tinfo)
print(f"Theoretical S_max(p=4) ≈ {_Smax:.2f}x")
if 'speedup' in globals():
    print(f"Observed Ray speedup S_obs = {speedup:.2f}x; gap = {_Smax - speedup:.2f}x")
else:
    print("Run the Task 3.1 timing cell above to compute the observed Ray speedup.")

Instrumented timings: {'t_sort_sublists': 17.238127594813704, 't_merge': 0.6717044711112976, 't_total': 17.909832065925002}
Theoretical S_max(p=4) ≈ 3.60x
Observed Ray speedup S_obs = 1.57x; gap = 2.02x


**Accounting for the difference to the observed speedup:**
- The merge phase T_merge is serial in our implementation (final heap-merge on the driver), so it limits speedup by Amdahl’s law. Even with perfect parallel sorting, we cannot exceed 1 + T_sort/T_merge.
- Practical overheads (scheduling, serialization, data movement, object copies, driver-side merge work, load imbalance, GC and memory pressure, I/O) further reduce observed speedup below the ideal S(p).