# Discussion 11: How to Implement Parallel Sort?

As you may know, many algorithms for sorting a list of length $N$ have running time $O(N \log N)$.

Most sort implementations that we've learned (e.g., quicksort, merge sort, bubble sort, insertion sort, etc) are fundamentally serial algorithms. This exercise explores the question: **can we use multiple cores to speed up sorting.**

Merge sort recursively splits a list into smaller pieces and sorts those pieces. This recursive sorting step could be parallelized. However, merge sort then has to then merge the results. _The running time of the very last merge step will be $O(N)$_, so it seems unlikely that a parallel merge sort implementation will be faster than $O(N)$.

In this exercise, we present a randomized sorting algorithm that is more amenable to parallelization.

## Strategy

The strategy will be the following.

1. (Optional) Our list will start as a numpy array.
2. We will split the array into separate contiguous (input) blocks.
3. We will then rearrange the values into separate continguous (output) blocks.
4. (Optional) We will then concatenate the contiguous blocks.

##### Example Illustration


<img src="https://i.imgur.com/qWvOCuu.png"  width="800"/>

Note that in many cases of interest (for example, if the list is so large that it can't fit on a single machine), then it would make sense for the array to start out as **a collection of blocks distributed around the cluster and to end as a collection of blocks distributed around the cluster**. Steps 1 and 4 would be unnecessary.

## Implementation
To implement step 3, we will choose a number of output blocks $K$ that our algorithm will output in step 3. We will choose cutoff values $c_1, \ldots, c_{K-1}$. Then, the first output block will be a sorted block consisting of all values in the original array that are less than $c_1$. The second output block will be a sorted block consisting of all values in the original array that are greater than or equal to $c_1$ and less than $c_2$, and so on. Finally, the $K$th block will be a sorted block consisting of all values in the original array that are greater than or equal to $c_{K-1}$.

We will choose the cutoff values $c_1, \ldots, c_{K-1}$ by randomly sampling some values from the original array, sorting those sampled values, dividing the sorted values into $K$ contiguous blocks, and choosing cutoffs that separate the blocks from each other.


<img src="https://i.imgur.com/iHVB4Lx.png"  width="800"/>



<img src="https://i.imgur.com/rQfG1Mn.png"  width="800"/>


<img src="https://i.imgur.com/QzbWCMF.png"  width="800"/>


**EXERCISE:** Code to implement the serial version of this is given below. First try it out and make sure that it runs. The goal of this exercise is to parallelize the implementation and make sure that the parallel implementation is faster than numpy's builtin sorting algorithm.

In [1]:
import numpy as np
import ray
import time

In [2]:
ray.init(
    num_cpus=8, # We will be using 8 workers
    include_webui=False,  
    plasma_directory='/tmp', # The object store is mounted to local file system
    ignore_reinit_error=True,
    object_store_memory=int(2*1e9), 
)

2019-04-25 21:52:03,193	INFO node.py:423 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-04-25_21-52-03_1192/logs.
2019-04-25 21:52:03,301	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:13004 to respond...
2019-04-25 21:52:03,425	INFO services.py:363 -- Waiting for redis server at 127.0.0.1:13294 to respond...
2019-04-25 21:52:03,429	INFO services.py:760 -- Starting Redis shard with 0.43 GB max memory.
2019-04-25 21:52:03,460	INFO services.py:1384 -- Starting the Plasma object store with 2.0 GB memory using /tmp.


{'node_ip_address': None,
 'redis_address': '10.244.5.91:13004',
 'object_store_address': '/tmp/ray/session_2019-04-25_21-52-03_1192/sockets/plasma_store',
 'webui_url': None,
 'raylet_socket_name': '/tmp/ray/session_2019-04-25_21-52-03_1192/sockets/raylet'}

We will be working with 4 input blocks, 4 output blocks, and trying to sort 100,000,000 integers

In [3]:
num_input_blocks = 4
num_output_blocks = 4
num_samples_for_cutoffs = 100
array = np.random.randint(0, 256, size =10**8, dtype=np.uint8)

In [4]:
def compute_cutoffs(array, num_samples):
    samples = array[np.random.randint(0, len(array), size=num_samples)]
    samples.sort()
    boundary_indices = np.arange(1, num_output_blocks) * (len(samples) // num_output_blocks)

    # These are the boundaries between the output blocks. We will assume that each
    # boundary value is contained in the upper block.
    cutoffs = samples[boundary_indices]
    return cutoffs

Below we define two functions `partition_input_block` and `compute_output_block`.

1. The function `partition_input_block` will need to be called once per input block. We use the argument `num_return_vals` for the `@ray.remote` decorator in order to return one value per output block.

2. The function `compute_output_block` will need to be called once per output block.

In [5]:
@ray.remote(num_return_vals=num_output_blocks)
def partition_input_block(input_block, cutoffs):
    # By default, numpy arrays passed to remote functions are read-only so that
    # they can be accessed through shared memory without creating a copy.
    # However, we need to mutate this array, so we will create a copy.
    input_block = input_block.copy()
    input_block.sort()
    partition_indices = input_block.searchsorted(cutoffs)
    return np.split(input_block, partition_indices)

In [6]:
@ray.remote
def compute_output_block(*partitions):
    # There are probably faster ways to merge these sorted partitions together 
    # than to concatenate them and sort the result, but this seems to work.
    result = np.concatenate(partitions)
    result.sort()
    return result

Below we actually run the parallel sort.

#### Step 1: Split input array into blocks

In [7]:
blocks = np.split(array, num_input_blocks)

In [8]:
blocks

[array([ 77, 211, 154, ..., 124, 210,  59], dtype=uint8),
 array([ 32,  13, 252, ...,  43, 114,  36], dtype=uint8),
 array([104,  93,  32, ...,  93,  41,  71], dtype=uint8),
 array([117, 185,   9, ..., 124, 185, 138], dtype=uint8)]

In [9]:
[b.shape for b in blocks]

[(25000000,), (25000000,), (25000000,), (25000000,)]

#### Step 2: Compute cutoffs/pivots

In [10]:
cutoffs = compute_cutoffs(array, num_samples_for_cutoffs)
cutoffs

array([ 74, 117, 174], dtype=uint8)

#### Step 3: Use ray to paralleize sorting

We first put each input blocks into ray's shared object store, so each worker can access it without making expensive copies.

In [11]:
block_ids = [ray.put(block) for block in blocks]

Then we are going parittion each blocks into their corresponding output block

In [12]:
partition_ids = np.empty(shape=(num_input_blocks, num_output_blocks), dtype=object)
for i in range(num_input_blocks):
    partition_ids[i] = np.array(partition_input_block.remote(block_ids[i], cutoffs))

In [13]:
partition_ids.shape

(4, 4)

In [14]:
# This array stores object ids
partition_ids[0,0]

ObjectID(010000000d99e9de1f20d47e4b398f6d4b6654af)

We are now sorting each output blocks. Notice how we are just passing a list of object ids into the remote function

In [15]:
output_block_ids = []
for j in range(num_output_blocks):
    output_block_ids.append(compute_output_block.remote(*partition_ids[:, j]))

Lastly, we can get the result from each worker and concatenate it

In [16]:
sorted_result = np.concatenate(ray.get(output_block_ids))

In [17]:
sorted_result

array([  0,   0,   0, ..., 255, 255, 255], dtype=uint8)

#### Speed comparison!

In [19]:
parallel_start_time = time.time()

cutoffs = compute_cutoffs(array, num_samples_for_cutoffs)
blocks = np.split(array, num_input_blocks)
block_ids = [ray.put(block) for block in blocks]
partition_ids = np.empty(shape=(num_input_blocks, num_output_blocks), dtype=object)
for i in range(num_input_blocks):
    partition_ids[i] = np.array(partition_input_block.remote(block_ids[i], cutoffs))
output_block_ids = []
for j in range(num_output_blocks):
    output_block_ids.append(compute_output_block.remote(*partition_ids[:, j]))
sorted_result = np.concatenate(ray.get(output_block_ids))

parallel_duration = time.time() - parallel_start_time

print("Parallel sort duration: {}.".format(parallel_duration))

Parallel sort duration: 2.8410232067108154.


The box below times numpy's built in sorting algorithm. Our parallel sort should be faster than numpy's built in sort. You do not need to modify anything in the box below.

In [20]:
array_copy = array.copy()
serial_start_time = time.time()
array_copy.sort()
serial_duration = time.time() - serial_start_time
print("Serial sort duration: {}.".format(serial_duration))

Serial sort duration: 5.1032140254974365.


This box checks that the sort was implemented correctly and that the parallel version gave a speedup.

In [21]:
# Check work.
assert parallel_duration < 0.75 * serial_duration
assert np.all(sorted_result == array_copy)