# Bags, Futures, and Some Bonus Features

In this module, we'll take a *really* quick look at some additional aspects of Dask. The goal is not to make you proficient, or even talk about the full scope of what's possible, but to give you some orientation and awareness.

That way, if you're working on a project where these features might come in handy, you'll at least know where to look (or what to Google) to learn more.

In [None]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=1, memory_limit='256MB')

client

## Dask Bag

Dask's __Bag__ interface is the last of the high-level interfaces for manipulating data with Dask.

A __Bag__ is an unordered collection that *can* contain the same item multiple times (unlike a Python Set, which cannot distinguish duplicates). A __Bag__ is a little bit like a Python Counter. 

But its main feature is that it can be partitioned and processed in parallel.

Bags are documented at https://docs.dask.org/en/latest/bag.html

In [None]:
import dask.bag as db

b = db.from_sequence(range(30), npartitions=4)

In [None]:
b

In [None]:
b.count().compute()

In [None]:
b.npartitions

We program bags using functional-programming abstractions:

In [None]:
b_squared = b.map(lambda x:x**2)

In [None]:
b_squared.take(5)

In [None]:
even_odd_groups = b_squared.groupby(lambda x: x%2)

In [None]:
even_odd_groups.compute()

In [None]:
even_odd_groups.take(1)[0][1]

Remember, bags aren't ordered -- so you may or may not see the output in ascending sequence. 

Also, as with many data-parallel collection frameworks, we need need to be careful about "materializing groups" or "touching the data" ... In this case, we've asked Dask to fully compute the two groups (odd and even squares). 

It's a silly example, but we definitely want to be careful about this, since on real large datasets, it's common for individual groups to be too large to fit in memory (imagine taking your company's transactions and grouping by country!)

Whenever you think "group-by" (and maybe if you're thinking group-by you should think about dataframe instead) it's good to think, "... and then what?" ...

Typically, group-by goes with an aggregation like "group-by and count" or "group-by and add" -- in those cases, use the alternative operation `foldby`

In [None]:
def add(a,b):
    return a+b

b_squared.foldby(lambda n:n%2, add, 0, add, 0).compute()

In [None]:
def count(total, item):
    return total+1

b_squared.foldby(lambda n:n%2, count, 0, add, 0).compute()

Again, this is just to demonstrate the similarity between bag and other unstructured data representations in data-parallel tools, not to encourage their use when other approaches make more sense.

## Exercise: Dask Delayed and `concurrent.futures`

*Work through these excellent examples courtesy of Dask contributor James Bourbeau*

*More detailed docs are online at:*
* https://docs.dask.org/en/latest/delayed.html
* https://docs.dask.org/en/latest/futures.html

Sometimes problems don’t fit nicely into one of the high-level collections like Dask arrays or Dask DataFrames. In these cases, you can parallelize custom algorithms using the Dask `delayed` interface. This allows one to manually create task graphs with a light annotation of normal Python code.

In [None]:
import time

def inc(x):
    time.sleep(0.5)
    return x + 1

def double(x):
    time.sleep(0.5)
    return 2 * x

def add(x, y):
    time.sleep(0.5)
    return x + y

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

Dask `delayed` wraps function calls and delays their execution. Rather than computing results immediately, `delayed` functions record what we want to compute as a task into a graph that we’ll run later on parallel hardware by calling `compute`.

In [None]:
from dask import delayed

In [None]:
lazy_inc = delayed(inc)

In [None]:
inc_output = lazy_inc(3)  #inc(3)
inc_output

In [None]:
inc_output.visualize()

In [None]:
inc_output.compute()

Using delayed functions, we can build up a task graph for the particular computation we want to perform

In [None]:
double_inc_output = lazy_inc(inc_output)
double_inc_output

In [None]:
double_inc_output.visualize()

In [None]:
double_inc_output.compute()

We can use `delayed` to make our previous example computation lazy by wrapping all the function calls with delayed

In [None]:
@delayed
def inc(x):
    time.sleep(0.5)
    return x + 1

@delayed
def double(x):
    time.sleep(0.5)
    return 2 * x

@delayed
def add(x, y):
    time.sleep(0.5)
    return x + y

Now `add` returns a `Delayed` object which you can call `compute()` on at a later time

In [None]:
%%time

data = [1, 2, 3, 4]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)
total

In [None]:
total.visualize()

In [None]:
%%time

total.compute()

Check out the [Dask delayed best practices](http://docs.dask.org/en/latest/delayed-best-practices.html) page to avoid some common problems when using `delayed`. 

## `concurrent.futures` interface

The Dask distributed scheduler implements a superset of Python's [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) interface that allows for finer control and asynchronous computation.

The `submit` function sends a function and arguments to the distributed scheduler for processing. They return `Future` objects that refer to remote data on the cluster. The `Future` returns immediately while the computations run remotely in the background. There is no blocking of the local Python session.

In [None]:
import random

def inc(x):
    time.sleep(random.uniform(0, 2))
    return x + 1

In [None]:
f = client.submit(inc, 7.2)  # Submits inc(7.2) to the distributed scheduler
print(f)
print(type(f))

Once the computation for the `Future` is complete, you can retrieve the result using the `.result()` method

In [None]:
print(f)

In [None]:
f.result()

The `map` function can be used to apply a function on a sequence of arguments (similar to the built-in Python `map` function).

To delete `Futures` in distributed memory, use the `del` keyword

In [None]:
del f

In [None]:
data = range(10)
futures = client.map(inc, data)
futures

Here a list of `Futures` are returned, one for each item in the sequence of arguments. 

In [None]:
futures

In [None]:
results = client.gather(futures)
# Same as results = [future.result() for future in futures]

In [None]:
results

Notice what happens if we run the same calculation:

In [None]:
data = range(10)
futures = client.map(inc, data)
futures

The results are ready right away ... and ... the keys are the same. That's because all of the same objects are involved, and the results are still in the cluster memory.

The `concurrent.futures` API even allows you to submit tasks based on the output of other tasks. This gives more flexibility in situations where the computations may evolve over time.

```python
from dask.distributed import as_completed

seq = as_completed(futures)

for future in seq:
    y = future.result()
    if condition(y):
        new_future = client.submit(...)
        seq.add(new_future)  # add back into the loop
```

In [None]:
client.close()