# Parallelizing Your Own Python Algorithms

<img src='images/city.png' width=600>

Although we can do a lot with dataframes and arrays at scale, not every computation we need to do will match these patterns.

Dask offers two mechanism for scheduling our own work on the cluster and working with results of distributed and/or long-running computations.

* `delayed` which represents a function along with its dependencies, which we may want to run later
    * recall that we called `dask.delayed()` on our functions to make them lazy
* and `Future` which represents a future result (or failure) from a running, asynchronous function
    * we used `client.submit()` to start our example computation, and it returned a Future immediately

## Why two systems? Which should we use?

There is no single right anwser

__`delayed`__ is often more useful when we know the control flow and structure of our computation ahead of time and we want to run a function -- including computing any dependencies -- later.

In our toy dataframe example, we'll implement operations `sum` and `count`. Since we know what those computations will look like and require, we'll create `delayed` versions to make them lazy and composable.

__`Future`__ makes sense when we want to start a computation immediately, and have complete freedom regarding what will happen to the result: it might be output; it might be a parameter to another function; or it might be a condition that controls what happens next (e.g., if we're running an optimizer until some convergence criterion is reached).

*So we'll look at both APIs*

In [None]:
import coiled
from dask.distributed import Client

cluster = coiled.Cluster(name="training-cluster")
client = Client(cluster)
client

## Delayed functions and task graphs

We can create a delayed, or lazy, function by calling `dask.delayed` or by marking the function definition with the `@delayed` decorator. 

Most operations on a delayed function (like calls) or object (like property access) are proxied from the delayed to the actual object when needed, but not right away. Until then, if we compose function calls, we are effectively creating a chain (or graph) of delayed objects.

When we call `.compute()` or `.persist()`, Dask schedules the necessary functions from the graph and takes care of passing the results to the subsequent functions that rely on them.

Before running this code, open up your __Graph__ and __Task Stream__ dashboards -- we'll see some output there.

In [None]:
from dask import delayed

@delayed
def square(x):
    return x*x

my_delayed_square_of_3 = square(3)

my_delayed_square_of_3

We can visualize the dependencies involved

In [None]:
my_delayed_square_of_3.visualize()

In these diagrams, circles represent delayed functions (tasks), while rectangles represent delayed data items, including the output of any delayed functions.

We can compose delayed objects, and evaluate them...

In [None]:
my_delayed_square_of_the_square_of_3 = square(square(3))

my_delayed_square_of_the_square_of_3

In [None]:
my_delayed_square_of_the_square_of_3.visualize()

In [None]:
my_delayed_square_of_the_square_of_3.compute()

We can launch computation and persist results for future access, as we did with dataframes

In [None]:
persisted = my_delayed_square_of_3.persist()
persisted

In your __Graph__ and __Task Stream__ dashboards, notice
* Task Stream shows the `square` function we just ran
* Graph shows the `square` result resident in cluster memory

We can retrieve the value any time with `.compute()`

In [None]:
persisted.compute()

To get us a little closer to our real-world work challenges, let's look at a Monte Carlo estimation of pi. While this estimation is simple, many real-world problems rely on expensive and time-consuming simulation, from Bayesian model sampling to traffic, supply chain, and climate simulations.

## Monte Carlo Estimation of π

Here, the idea is to sample points from the unit square and count how many of them are also within the unit circle. The proportion of points in the circle to all of our points will approximate pi/4.

<img src='images/pi.png' width=300>

We'll start with a local version and then try to run it on the cluster with `delayed`

In [None]:
import random

def sample():
    x = random.uniform(0, 1)
    y = random.uniform(0, 1)
    d = x*x + y*y
    return 1 if d < 1 else 0

sample()

In [None]:
def bunch_of_samples(n):
    total = 0
    for i in range(n):
        total += sample()
    return total/n

bunch_of_samples(1000)

In [None]:
print(f"pi is about {4* bunch_of_samples(1000)}")

Let's make a lazy version of sample

In [None]:
sample = delayed(sample)

output = bunch_of_samples(5)
output

Notice that our output is a delayed, because it depends on a bunch of other delayed outputs...

In [None]:
output.visualize()

We can run this task graph

In [None]:
output.compute()

But let's make a few improvements. First, take a look in the Task Stream and locate the `sample` tasks. Zoom until you can read how long they take. You should see they take a few microseconds. 

This is not an optimal task duration because we are paying a high price in scheduler overhead and network communication in order to distribute these tasks.

*Dask's scheduler requires several hundred microseconds (say 500-1000 μs) per task* so ideally our tasks should spend significantly longer than this doing their computation.

What should we do?

Usually:
* operate on more data within the task
* perform more steps or iterations of an algorithm within the task
* or both

The obvious choice for our simulation is to run more sampling within each task

In [None]:
@delayed
def multi_sample(n):
    total = 0
    for i in range(n):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        d = x*x + y*y
        if d < 1:
            total += 1
    return total

How many samples should we run? Let's see how long it takes to run 1000

In [None]:
multi_sample(1000).compute()

Your numbers may vary (depending on hardware, network, etc.) but I'm seeing a very small value. That's similar to scheduler overhead, and we really want our compute to but significantly larger than that. Let's try 100,000

In [None]:
hits = multi_sample(100_000).compute()
hits

Let's look in the task stream for timing ... this should be better (larger) than the previous version.

In [None]:
hits/1e5*4

But still not a great estimate of pi. Let's schedule 100 of these (a total of 10 million samples)

In [None]:
def run_batches(batches, per_batch):    
    return sum([multi_sample(per_batch) for i in range(batches)])

run_batches(100, 100000).compute() * 4 / 10e6

If you were looking closely, you might have noticed a feature that slipped by in there: `run_batches` returns a delayed because `sum` does ... and that's because the `+` operator is overloaded to work correctly with delayed. 

Most operations "just work" with delayed, but not everything. The breakdown is at https://docs.dask.org/en/latest/delayed-api.html

Most critically, we can't make control-flow decisions based on delayeds, because they don't have a truth value (after all, they haven't run at the time of graph/dependency construction).

Before moving on, let's look at the __Memory By Key__ dashboard, which will tell us what we are storing in our cluster and how much space it is taking up.

Open the Memory By Key dashboard: you should see the result of the `square` operation we `persist`ed earlier, taking up a few bytes.

Lets run our sampler and persist the results

In [None]:
persisted_sample_count = run_batches(100, 100000).persist()

While that was running, you should see `multi_sample` using the most memory, but when the computation is complete, those intermediate results have been released, and just the final sum (`add` task result) holding a few bytes.

## Dask Dataframe: No Magic When You Have `Delayed`

Dask Dataframe and other high-level collections can seem magical, but we'll take a look at how Delayed can do a lot of the heavy lifting in an API like that. 

To illustrate a little bit of how a parallel dataframe can work, as well as give insight into how Dask's low-level constructs can be assembled to create high-level ones, we'll build a toy example.

We'll use a small, simple CSV data file, which you can inspect: `df-demo.csv`

In [None]:
import urllib.request

file = 'http://coiled-training.s3.amazonaws.com/data/df-demo.csv'

with urllib.request.urlopen(file) as f:
    print(f.read().decode('utf-8'))

There are 14 rows there. Let's imagine that we can only fit 6 rows at a time in memory comfortably, so we want to partition with a partition or blocksize of 6 rows.

We can write a function to ingest the data like this:

In [None]:
import pandas as pd

In [None]:
def load_csv(filename, blocksize, partition, schema):
    return pd.read_csv(filename, skiprows=partition*blocksize, nrows=blocksize, names=schema, header=0)

and run it for each partition:

In [None]:
def load_big_df(file, blocksize, partition_count, schema):
    partitions = []
    for i in range(partition_count):
        partitions.append(load_csv(file, blocksize, i, schema))
    return partitions

blocksize = 6
partition_count = 3
schema = ['player', 'points']

myToyDF = load_big_df(file, blocksize, partition_count, schema)

for p in myToyDF:
    print(str(p) + '\n' + '-'*20)

The full dataframe will look something like this:

In [None]:
def whole_frame(df):
    return pd.concat([p for p in df])

whole_frame(myToyDF)

Notice the replicated index values in our "big" dataframe ... you'll see the same behavior with Dask: it's consistent with each partition being its own dataframe with its own index.

Notice also that we used a little bit of metadata (number of partitions, schema) that we didn't discover from the file. In a real implementation we'd need to do some inspection of the data to figure out this info, and we'd probably want to store it somewhere! Dask does exactly that when you create a Dask dataframe from a file.

For now, we'll "cheat" and keep out dataframe as a list of constituent dataframes (partitions)

Let's implement two toy operations: count and sum

In [None]:
def df_count(df):
    return sum(p.shape[0] for p in df)

def df_sum(df, col):
    return sum(p[col].sum() for p in df)

print('Count ', df_count(myToyDF))
print('Sum(points)', df_sum(myToyDF, 'points'))

Now let's add a tiny bit of Dask `delayed`

In [None]:
import dask.delayed

load_csv = dask.delayed(load_csv)

In [None]:
myToy_delayed_DF = load_big_df(file, blocksize, partition_count, schema)

myToy_delayed_DF

In [None]:
def whole_frame_from_delayed(df):
    return pd.concat([p.compute() for p in df])

whole_frame_from_delayed(myToy_delayed_DF)

It works ... and check out the Task Stream dashboard: we can see our `load_csv` tasks scheduled in the cluster!

Now what about lazy operations on a lazy dataframe ... What about count and sum?

In [None]:
df_count(myToy_delayed_DF)

This seems to run -- or at least it doesn't explode -- but it returns a Delayed ... which is actually what we want.

In [None]:
df_count(myToy_delayed_DF).compute()

But why does this work?

```python
def df_count(df):
    return sum(p.shape[0] for p in df)
```

* the call to `p.shape` accessor and then the subscript `[]` operator on the result are proxied to the underlying object when needed, but return a Delayed in the meantime
* the same thing works for the builtin `sum` function

In [None]:
myToy_delayed_DF[0].shape

In [None]:
myToy_delayed_DF[0].shape.compute()

In [None]:
myToy_delayed_DF[0].shape[0]

In [None]:
myToy_delayed_DF[0].shape[0].compute()

... and so on for most (though not all) operations we might want to do. We can even visualize the dependencies between these lazy objects.

In [None]:
df_count(myToy_delayed_DF).visualize()

For the same reason, our `df_sum` function will work unchanged:

In [None]:
df_sum(myToy_delayed_DF, 'points')

In [None]:
df_sum(myToy_delayed_DF, 'points').compute()

Ok, as fun as it would be to try and reconstruct more and more of Dask dataframe, that might be a bit much for today!

## Dynamic scheduling with Future

Dask's Future interface is like a cluster-aware version of Python's `concurrent.futures` asynchronous programming pattern. If you're not familiar with `concurrent.futures`, that's ok. But for those who are, we'll start with that API and before moving to Dask, where we'll see similar behavior.

Let's redefine `multi_sample` as a regular Python function (no `@delayed`)

In [None]:
def multi_sample(n):
    total = 0
    for i in range(n):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        d = x*x + y*y
        if d < 1:
            total += 1
    return total

multi_sample(100_000)

__Example with Python's `concurrent.futures`__

In [None]:
import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor()
out = executor.submit(multi_sample, 100000)
out

In [None]:
out.running()

In [None]:
out.done()

In [None]:
out.result()

In [None]:
executor.shutdown(wait=True)

__Let's do that distributed, with Dask__

By calling `client.submit()`, we can run this on the cluster and get a handle to the pending result, called a `Future`

In [None]:
sample_future = client.submit(multi_sample, 100000)
sample_future

In the Task Graph dashboard, you'll notice that the task appears red (since it finishes nearly instantly)

If we look at the Future again, it should show *status: finished*

In [None]:
sample_future

In [None]:
sample_future.result()

Let's try this again more times

In [None]:
sample2 = client.submit(multi_sample, 100000)
sample2

In [None]:
sample2.result()

Ok, that's a little weird: this time, the Future shows *status: finished* right away, and the result is identical to the earlier sample batch, which seems unlikely.

__What's going on?__

In order to optimize, Dask has encoded our function arguments into the task data and has assumed our function is __pure__. A pure function is one whose output depends only on the input: no randomness, no side effects.

Usually, that's helpful, but in our case we need new random samples every time. We tell Dask that our function is not pure by passing `pure=False`

In [None]:
sample3 = client.submit(multi_sample, 100000, pure=False)
sample3

In [None]:
sample3.result()

If we want to do alot of sampling, we can use a regular Python loop to submit our tasks

In [None]:
results = []
for i in range(5):
    results.append(client.submit(multi_sample, 100000, pure=False))
results

or, if we're using the same function repeatedly, we can use `client.map`

In [None]:
results = results + client.map(multi_sample, [100000, 110000, 90000], pure=False)
results

To block until Futures are finished and collect the output, we call

In [None]:
client.gather(results)

__Managing tasks dynamically: sampling until convergence__

We can use futures to schedule sampling tasks continuously until we hit a convergence criterion.

Let's start the cluster off with 10 sampling tasks to get started.

We can use `as_completed` to access an interator from which can collect results.

We'll estimate pi using what we get back, and if our successive estimates aren't close enough to each other, we'll schedule more tasks.

> Note: This is just an illustration of dynamic scheduling using Dask `Future` ... in a similar real experiment, since this is a Bernoulli process, the standard error shrinks with ${1/\sqrt{n}}$ (https://en.wikipedia.org/wiki/Binomial_proportion_confidence_interval) ... so if we wanted to quantify the likelihood of our estimate, we could just choose a suitable *n* ahead of time

In [None]:
from dask.distributed import as_completed

batch_size = 100000

initial_future = client.submit(multi_sample, batch_size, pure=False)

completion_iterator = as_completed([initial_future])

In [None]:
old_estimate = 0.0
new_estimate = 1.0
estimate_count = 0
epsilon = 0.0001

for future in completion_iterator:
    result = future.result()
    
    # update estimate: arithmetic moving average
    new_estimate = old_estimate * estimate_count + result * 4.0 / batch_size
    estimate_count += 1
    new_estimate /= estimate_count
    print(new_estimate)
    
    if not abs(new_estimate - old_estimate) < epsilon:
        completion_iterator.add(client.submit(multi_sample, batch_size, pure=False))
        
    old_estimate = new_estimate

__Composing Results of Operations with Future__

This is handy, so long as we're launching monolithic, embarrassingly parallel tasks on the cluster. 

But we often need to break our logic up into multiple functions, user other people's code, and combine results in distributed code.

`Future` is designed specifically to accommodate composition -- meaning passing the results from one or more function calls into another function. The only catch is ... you can't mix local and remote code transparently, because a lot of local code won't know how to handle Futures.

In [None]:
def add2(x):
    return 2 + x

def make_list(n):
    return [None] * (2*n)

def write(a_list, index, val):
    a_list[index] = val
    return a_list

def combine_lists(list1, list2):
    return list1 + list2

# local...

combine_lists(write(make_list(add2(1)), add2(0), 7), ['foo', 'bar'])

Let's start with just the first two operations (in evaluation order, from inside out)

In [None]:
try:
    make_list(client.submit(add2, 1))
except TypeError as e:
    print(e)

Why did this fail? We tried to mix a Future result with a regular local function invocation.

We can do this composition if we submit both tasks for async execution

In [None]:
r1 = client.submit(add2, 1)
r2 = client.submit(make_list, r1)

In [None]:
r2.result()

And we can keep going...

In [None]:
r3 = client.submit(write, r2, r1, 7)

In [None]:
r3.result()

In [None]:
r4 = client.submit(combine_lists, r3, [-1, -2])

In [None]:
r4.result()

*Summing up...*

We can't supply Futures to arbitrary local functions

But we can supply...
* regular (non-Future) values to async calls
* Futures to async calls
* combinations of Futures and regular values when submitting async calls

## Lab - Emergency Services Modeling

We'll work on a more complicated simulation-based model to evaluate time-to-response for emergency vehicles in different schemes for Cascadia City.

Part of the city is planned as a street grid, and we'd like to look at a few different models where we divide this region into equal-sized zones, and each zone has its own emergency vehicle (which must remain inside that zone).

The purpose of our lab is to use Dask to distribute the work, so we'll start with some functions that do most of the calculation work, and focus on running those in the Dask cluster using `Future`.

In [None]:
import numpy as np
from matplotlib import pyplot as plt

traffic = np.load('data/traffic.npy') #note this data is local

plt.imshow(traffic)
plt.colorbar()

This array represents transit time costs (in minutes, under congested conditions) to reach each of the intersections in this 16x16 intersection grid from adjacent intersections.

To find travel time between points for the whole grid -- or for a section -- we'll build an *adjacency matrix* and then use a shortest-path algorithm.

In [None]:
city_chunk_width = 4 # we'll work with square chunks, so N-S and E-W are both 4

def build_adj_matrix(costs):
    adj_dim = costs.shape[0] ** 2
    adj_matix = np.zeros((adj_dim, adj_dim)) # since every pair of locations gets a cost in the adj matrix
    
    def linear_loc_for_row_col(r, c):
        return r + c*costs.shape[0]
    
    for i in range(costs.shape[0]):
        for j in range(costs.shape[1]):
            cost_to_ij = costs[i, j]
            dest_loc = linear_loc_for_row_col(i, j)
            if i > 0:
                adj_matix[linear_loc_for_row_col(i-1, j), dest_loc] = cost_to_ij                
            if i < costs.shape[0] - 1:
                adj_matix[linear_loc_for_row_col(i+1, j), dest_loc] = cost_to_ij                
            if j > 0:
                adj_matix[linear_loc_for_row_col(i, j-1), dest_loc] = cost_to_ij                
            if j < costs.shape[1] - 1:
                adj_matix[linear_loc_for_row_col(i, j+1), dest_loc] = cost_to_ij
    return adj_matix

demo_adj = build_adj_matrix(traffic[0:city_chunk_width, 0:city_chunk_width])
plt.imshow(demo_adj)

We can use a helper from `scipy` to find the shortest path (expressed here as travel time)

In [None]:
from scipy.sparse.csgraph import shortest_path

In [None]:
total_travel_time_all = shortest_path(demo_adj)
plt.imshow(total_travel_time_all)
plt.colorbar()

Now, suppose there are a fire and a fire truck at particular locations

In [None]:
def response_to_random_fire(travel_time_matrix, zone_rows, zone_cols):
    fire_x = random.randint(0, zone_cols-1)
    fire_y = random.randint(0, zone_rows-1)

    firetruck_x = random.randint(0, zone_cols-1)
    firetruck_y = random.randint(0, zone_rows-1)

    travel_from = firetruck_y + zone_rows*firetruck_x
    travel_to = fire_y + zone_rows*fire_x
    
    return travel_time_matrix[travel_from, travel_to]

response_sample = response_to_random_fire(total_travel_time_all, city_chunk_width, city_chunk_width)

print("Travel time", response_sample)

We'd like to measure response time under various scenarios, including ones where more trucks are available.

#### Activity 1: Travel time matrices for all zones

Divide the full traffic map (matrix) into 16 subsections similar to the one above, and generate travel time matrices for all of them using Dask.

Note: in some scenarios we might use Dask array, but for today's exercise, let's use regular NumPy and focus on parallelizing our work with `Future`.

Hint: For dividing the matrix into subsections, adapt this sample code using:

In [None]:
example = np.array([[1,2,3,4],[5,6,7,8],[9,10,11,12], [13,14,15,16]])
example

In [None]:
arrays = []

for outer in map(lambda m : np.vsplit(m, 2), np.hsplit(example, 2)):
    for inner in outer:
        arrays.append(inner)
    
arrays

#### Activity 2: Emergency response times for all zones

Simulate emergency response times for each zone, using Dask

#### Activity 3: Collect and plot samples for all zones

Gather 100 samples for each zone, combine the results, and plot a histogram

#### Activity 4: Compare zone schemes

*Bonus*

Simulate
* the single-zone model with 16 firetrucks uniformly distributed
  * this means 1 zone and `city_chunk_width` of 16
  * 16 random firetruck locations, so 16 travel times (choose shortest or mean)
* 4-zone model (each zone `city_chunk_width` of 8)

Compare the response time distributions to the 16-zone model we've done so far

Note: In these activities we didn't focus on making optimal use of the cluster. Take a look at the task stream and see when you were (and weren't) utilizing all of those expensive cores.

## Best Practices

Take a look at the Dask delayed best practices https://docs.dask.org/en/latest/delayed-best-practices.html

There also a set of advanced features under Futures which you are unlikely to often need, but may be useful in solving particularly complex challenges.