# Dask

<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

## Parallelize code with `dask.delayed`

In this notebook we parallelize simple for-loop style code with Dask and `dask.delayed`. Often, this is the only function that you will need to convert functions for use with Dask.

We will then go on and take a look at Dask DataFrames and how they compute

**Related Documentation**

* [Delayed documentation](https://docs.dask.org/en/latest/delayed.html)
* [Delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU)
* [Delayed API](https://docs.dask.org/en/latest/delayed-api.html)
* [Delayed examples](https://examples.dask.org/delayed.html)
* [Delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html)

As we'll see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. For now we will just use the default implementation to let the tasks run locally.

First let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally.

In the next section we'll parallelize this code.

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

We time the execution of this normal code using the `%%time` magic, which is a special function of the Jupyter Notebook.

In [None]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)
z

Those two increment calls *could* be called in parallel, because they are totally independent of one-another.

We'll transform the `inc` and `add` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.
Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.


### Delay, visualize, compute

In [None]:
from dask import delayed
import dask

In [None]:
%%time
# This runs immediately, as it does not compute anything.
# All it does is build a graph.

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

This ran immediately, since nothing has really happened yet.

To get the result, call `compute`. Notice that this runs faster than the original code.

In [None]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

The `z` object is a lazy `Delayed` object.  This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another.  We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`.

In [None]:
z

In [None]:
# Look at the task graph for `z`
z.visualize()

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the `inc` functions to the inputs of `add`.

### Some questions to consider:

-  Why did we go from 3s to 2s?  Why weren't we able to parallelize down to 1s?
-  What would have happened if the inc and add functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

In [None]:
def inc(x):
    #sleep(1)
    return x + 1

def add(x, y):
    #sleep(1)
    return x + y

In [None]:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)
x,y,z

In [None]:
%%time
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
dask.compute(x,y,z)

### Exercise: Parallelize a for loop

`for` loops are one of the most common things that we want to parallelize.  Use `dask.delayed` on `inc` and `sum` to parallelize the computation below:

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)

In [None]:
total

In [None]:
%%time
# Your parallel code here...

Solution:

In [None]:
%%time
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what total is
result = total.compute()
print("After computing :", result)  # After it's computed

How do the graph visualizations compare with the given solution, compared to a version with the `sum` function used directly rather than wrapped with `delayed`?

In [None]:
total.visualize() # sum function wrapped with delayed

In [None]:
%%time
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = sum(results)

total.compute()

In [None]:
total.visualize()

### Parallelizing a for-loop code with control flow

Often we want to delay only *some* functions, running a few of them immediately.  This is especially helpful when those functions are fast and help us to determine what other slower functions we should call.  This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`.

In the example below we iterate through a list of inputs.  If that input is even then we want to call `inc`.  If the input is odd then we want to call `double`.  This `is_even` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

In [None]:
def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)

total = sum(results)
print(total)

In [None]:
%%time
# TODO: parallelize the sequential code above using dask.delayed
# You will need to delay some functions, but not all

In [None]:
%%time
results = []
for x in data:
    if is_even(x):  # even
        y = delayed(double)(x)
    else:          # odd
        y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)
total.compute()

In [None]:
total.visualize()

In [None]:
%%time
results = []
for x in data:
        if is_even(x):  # even
            y = delayed(double)(x)
        else:          # odd
            y = delayed(inc)(x)
        results.append(y)

total = sum(results)
total.compute()

In [None]:
total.visualize()

### Some questions to consider:

-  What would have happened if we had delayed the evaluation of `is_even(x)` in the example above?
-  What are your thoughts on delaying `sum`?  This function is both computational but also fast to run.

### Parallelizing a Pandas Groupby Reduction

In this exercise we read several CSV files and perform a groupby operation in parallel.  We are given sequential code to do this and parallelize it with `dask.delayed`.

The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data.  We will do this by using `dask.delayed` together with `pandas`.  In a future section we will do this same exercise with `dask.dataframe`.

#### Sequential code: Mean Departure Delay Per Airport


In the previous notebook on pandas, we used concat() and a for-loop to read multiple files into a DataFrame. Here we also perform a groupby operation as well as a sum and count within the for-loop to finally compute the mean manually. This should help visualise the steps that can be parallelised later.

In [None]:
import pandas as pd
import os
from glob import glob

In [None]:
filenames = sorted(glob(os.path.join('data', 'nycflights', '*.csv')))

In [None]:
filenames

In [None]:
%%time

sums = []
counts = []
for fn in filenames:
    # Read in file
    pdf = pd.read_csv(fn)

    # Groupby origin airport
    by_origin = pdf.groupby('Origin')

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
mean

#### Parallel code: Mean Departure Delay Per Airport

Use `dask.delayed` to parallelize the code above.  Some extra things you will need to know.

1.  Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.

    ```python
    x = delayed(np.arange)(10)
    y = (x + 1)[::2].sum()  # everything here was delayed
    ```
2.  Calling the `.compute()` method works well when you have a single output.  When you have multiple outputs you might want to use the `dask.compute` function:

    ```python
    >>> from dask import compute
    >>> x = delayed(np.arange)(10)
    >>> y = x ** 2
    >>> min_, max_ = compute(y.min(), y.max())
    >>> min_, max_
    (0, 81)
    ```
    
    This way Dask can share the intermediate values (like `y = x**2`)
    
So your goal is to parallelize the code above (which has been copied below) using `dask.delayed`.  You may also want to visualize a bit of the computation to see if you're doing it correctly.

In [None]:
from dask import compute

In [None]:
%%time
# your code here

Solution:

In [None]:
%%time

# This is just one possible solution, there are
# several ways to do this using `delayed`

sums = []
counts = []
for fn in filenames:
    # Read in file
    ddf = delayed(pd.read_csv)(fn)

    # Groupby origin airport
    by_origin = ddf.groupby('Origin')

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Compute the intermediates
#sums, counts = compute(sums, counts)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
#total_delays = delayed(sum)(sums)
#n_flights = delayed(sum)(counts)
#total_delays, n_flights = compute(total_delays, n_flights)
mean = total_delays / n_flights
mean.compute()
#mean

In [None]:
mean.visualize()

### Some questions to consider:

- How much speedup did you get? Is this how much speedup you'd expect?
- Experiment with where to call `compute`. What happens when you call it on `sums` and `counts`? What happens if you wait and call it on `mean`?
- Experiment with delaying the call to `sum`. What does the graph look like if `sum` is delayed? What does the graph look like if it isn't?
- Can you think of any reason why you'd want to do the reduction one way over the other?

### Learn More

Visit the [Delayed documentation](https://docs.dask.org/en/latest/delayed.html). In particular, this [delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU) will reinforce the concepts you learned here and the [delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html) document collects advice on using `dask.delayed` well.

## Dask DataFrames
In this section we use `dask.dataframe` to automatically build similar computations as we just performed, for the common case of tabular computations. Dask dataframes look and feel like Pandas dataframes but they run on the same infrastructure that powers dask.delayed.

In this chapter we use the same airline data as before, but now rather than write for-loops we let dask.dataframe construct our computations for us. The `dask.dataframe.read_csv` function can take a globstring like "data/nycflights/*.csv" and build parallel computations on all of our data at once.

Pandas is great for tabular datasets that fit in memory. Dask becomes useful when the dataset you want to analyse is larger than your machine's RAM. Our dataset does not bring the avaliable memory to its limits, but we can still demonstrate how you go about making use of `dask.dataframe`.

The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame` API. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.

For the most part, a Dask DataFrame feels like a pandas DataFrame.
So far, the biggest difference we've seen is that Dask operations are lazy; they build up a task graph instead of executing immediately.
This lets Dask do operations in parallel and out of core.

A Dask DataFrame is composed of many pandas DataFrames. For `dask.dataframe` the chunking happens along the index.

<img src="http://docs.dask.org/en/latest/_images/dask-dataframe.svg" width="30%">

We call each chunk a *partition*, and the upper / lower bounds are *divisions*.
Dask *can* store information about the divisions. For now, partitions come up when you write custom functions to apply to Dask DataFrames
**Main Take-aways**

1.  Dask DataFrame should be familiar to Pandas users
2.  The partitioning of dataframes is important for efficient execution

### Data Loading

#### Single Files

In [None]:
import dask.dataframe as dd
import dask

In [None]:
ddf = dd.read_csv("./data/nycflights/1999.csv") # With Dask, you can also enter a url instead of a directory

In [None]:
ddf.head(3) # This works just as in pandas

In [None]:
ddf.tail(3) # And so does this

Watch out for wrongly interpreted data types in Dask! Dask is lazy in every way and you possibly need to manually enter some data types.

In [None]:
ddf.dtypes

#### Mulitple Files

Dask can intelligently read multiple files into one dataframe with a glob (asterisk):

In [None]:
filepath = glob("./data/nycflights/*.csv")

In [None]:
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'))

In [None]:
ddf.head(3) # this works.

In [None]:
#ddf.tail(3) # This fails. Why?

In [None]:
ddf.dtypes # Let's check the datatypes

Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes, `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions.

In this case, the datatypes inferred in the sample are incorrect. The first `n` rows have no value for `CRSElapsedTime` (which pandas infers as a `float`), and later on turn out to be strings (`object` dtype). Note that Dask gives an informative error message about the mismatch. When this happens you have a few options:

- Specify dtypes directly using the `dtype` keyword. This is the recommended solution, as it's the least error prone (better to be explicit than implicit) and also the most performant.
- Increase the size of the `sample` keyword (in bytes)
- Use `assume_missing` to make `dask` assume that columns inferred to be `int` (which don't allow missing values) are actually floats (which do allow missing values). In our particular case this doesn't apply.

In our case we'll use the first option and directly specify the `dtypes` of (just) the offending columns.

In [None]:
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]}, # Here we parse the year, month and day into date
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

In [None]:
ddf

In [None]:
ddf.tail(3) # Now it works

Let's also read the holidays data which will use in the exercises

In [None]:
holidays = dd.read_parquet(os.path.join('data', "holidays"))

In [None]:
holidays.head()

### Computations with `dask.dataframe`

We compute the maximum of the `DepDelay` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums

```python
maxes = []
for fn in filenames:
    pdf = pd.read_csv(fn)
    maxes.append(df["DepDelay"].max())
    
final_max = max(maxes)
```

We could wrap that `pd.read_csv` with `dask.delayed` so that it runs in parallel. Regardless, we're still having to think about loops, intermediate results (one per file) and the final reduction (`max` of the intermediate maxes). This is just noise around the real task, which pandas solves with

```python
pdf = pd.read_csv(filename, dtype=dtype)
pdf["DepDelay"].max()
```

`dask.dataframe` lets us write pandas-like code, that operates on larger than memory datasets in parallel.

Let's compare the performance of data loading and computation to pandas.
First we are going to time this in Dask:

In [None]:
%%time
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]}, # Here we parse the year, month and day into date
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
ddf["DepDelay"].max().compute()

Let's compare this to pandas:

In [None]:
%%time
filepath = glob("./data/nycflights/*.csv")
pdf = pd.concat(pd.read_csv(f) for f in filepath)
pdf["DepDelay"].max()

In [None]:
pdf = pd.concat(pd.read_csv(f) for f in filepath)

In [None]:
%time pdf["DepDelay"].max()

This writes the delayed computation for us and then runs it.  

Some things to note:

1.  As with `dask.delayed`, we need to call `.compute()` when we're done.  Up until this point everything is lazy.
2.  Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible.
    -  This lets us handle datasets that are larger than memory
    -  This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?)
    
As with `Delayed` objects, you can view the underlying task graph using the `.visualize` method (notice the parallelism):

In [None]:
ddf.DepDelay.max().visualize()

### Exercises

In this section we do a few `dask.dataframe` computations. If you are comfortable with pandas then these should be familiar. You will have to think about when to call `compute`.

#### 1.) How many rows are in our dataset?

In [None]:
# Your code here

Solution:

In [None]:
len(ddf)

#### 2.) In total, how many non-canceled flights were taken?

With pandas, you would use [boolean indexing](https://pandas.pydata.org/pandas-docs/stable/indexing.html#boolean-indexing).

In [None]:
# Your code here

Solution:

In [None]:
len(ddf[ddf.Cancelled==False])

#### 3.) What was the average departure delay from each airport?
*Hint*: use [`ddf.groupby`](https://pandas.pydata.org/pandas-docs/stable/groupby.html).

Note, this is the same computation you did in the previous notebook (is this approach faster or slower?)

In [None]:
# Your code here

Solution:

In [None]:
%time ddf["DepDelay"].groupby(ddf["Origin"]).mean().compute()
# Alternative solution:
# ddf.groupby("Origin").DepDelay.mean().compute()

#### 4.) In total, how many non-cancelled flights were taken from each airport?

*Hint*: use [`ddf.groupby`](https://pandas.pydata.org/pandas-docs/stable/groupby.html).

In [None]:
# Your code here

Solution:

In [None]:
ddf[~ddf.Cancelled].groupby("Origin").Origin.count().compute()

#### 5.) What day of the week has the worst average departure delay?

In [None]:
# Your code here

Solution:

In [None]:
ddf["DepDelay"].groupby(ddf["DayOfWeek"]).mean().compute()

#### 6.) What holiday has the worst average departure delay?

*Hint*: use [`ddf.merge`](https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html) to bring holiday information.

In [None]:
# Your code here

Solution:

In [None]:
%time ddf.merge(holidays, on=["Date"], how="left").groupby("holiday").DepDelay.mean().compute()

# Alternative Solution:
#%%time
#ddf_merged = ddf.merge(holidays, on=["Date"], how="left")
#ddf_merged["DepDelay"].groupby(ddf_merged["holiday"]).mean().compute()

### Sharing Intermediate Results

When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. Since dask operations are lazy, those values aren't the final results yet. They're just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.

In [None]:
non_cancelled = ddf[~ddf.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

In [None]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

But let's try by passing both to a single `compute` call.

In [None]:
%%time

mean_delay_res, std_delay_res = compute(mean_delay, std_delay)

Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. In particular, using `dask.compute` only does the following once:

- the calls to `read_csv`
- the filter (`df[~df.Cancelled]`)
- some of the necessary reductions (`sum`, `count`)

To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function (we might want to use `filename='graph.pdf'` to save the graph to disk so that we can zoom in more easily):

In [None]:
dask.visualize(mean_delay, std_delay, filename="graph.pdf")

### Conversion to a timestamp

This dataset stores timestamps as `HHMM`, which are read in as integers in `read_csv`:

In [None]:
crs_dep_time = pdf["CRSDepTime"].head(10)
crs_dep_time

To convert these to timestamps of scheduled departure time, we need to convert these integers into `pd.Timedelta` objects, and then combine them with the `Date` column.

In the pandas notebook, we rounded and converted the data to integers, then to strings and then to the date_time datatype. However, we could also do this using the `pd.to_timedelta` function, and a bit of arithmetic:

In [None]:
pdf.head(3)

In [None]:
import pandas as pd
# Rename column "DayofMonth"
pdf.rename(columns = {"DayofMonth":"Day"}, inplace=True)

# Parse to Date:
pdf["Date"] = pd.to_datetime(pdf[["Year","Month","Day"]])

# Get the first 10 dates to complement our `crs_dep_time`
date = pdf.Date.head(10)
date

In [None]:
# Get hours as an integer, convert to a timedelta
hours = crs_dep_time // 100
hours_timedelta = pd.to_timedelta(hours, unit='h')

# Get minutes as an integer, convert to a timedelta
minutes = crs_dep_time % 100
minutes_timedelta = pd.to_timedelta(minutes, unit='m')

# Apply the timedeltas to offset the dates by the departure time
departure_timestamp = date + hours_timedelta + minutes_timedelta
departure_timestamp

#### Custom code and Dask Dataframe

We could swap out `pd.to_timedelta` for `dd.to_timedelta` and do the same operations on the entire dask DataFrame. But let's say that Dask hadn't implemented a `dd.to_timedelta` that works on Dask DataFrames. What would you do then?

`dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:

- [`map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)
- [`map_overlap`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap)
- [`reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)

Here we'll just be discussing `map_partitions`, which we can use to implement `to_timedelta` on our own:

The basic idea is to apply a function that operates on a DataFrame to each partition.
In this case, we'll apply `pd.to_timedelta`.

In [None]:
hours = ddf.CRSDepTime // 100
# hours_timedelta = pd.to_timedelta(hours, unit='h')
hours_timedelta = hours.map_partitions(pd.to_timedelta, unit='h')

minutes = ddf.CRSDepTime % 100
# minutes_timedelta = pd.to_timedelta(minutes, unit='m')
minutes_timedelta = minutes.map_partitions(pd.to_timedelta, unit='m')

departure_timestamp = ddf.Date + hours_timedelta + minutes_timedelta

In [None]:
departure_timestamp

In [None]:
departure_timestamp.head()

#### Rewrite above to use a single call to `map_partitions`

This will be slightly more efficient than two separate calls, as it reduces the number of tasks in the graph.

In [None]:
def compute_departure_timestamp(ddf):
    hours = ddf.CRSDepTime // 100
    hours_timedelta = pd.to_timedelta(hours, unit='h')

    minutes = ddf.CRSDepTime % 100
    minutes_timedelta = pd.to_timedelta(minutes, unit='m')

    return ddf.Date + hours_timedelta + minutes_timedelta

In [None]:
departure_timestamp = ddf.map_partitions(compute_departure_timestamp)
departure_timestamp.head()

### Limitations

Dask.dataframe only covers a small but well-used portion of the Pandas API.
This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel (e.g. sort)

Additionally, some important operations like ``set_index`` work, but are slower
than in Pandas because they include substantial shuffling of data, and may write out to disk.

### Learn More


* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)
* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [DataFrame examples](https://examples.dask.org/dataframe.html)
* [Pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)

## Dask Array

Parallel, larger-than-memory, n-dimensional array using blocked algorithms.

* Parallel: Uses all of the cores on your computer
* Larger-than-memory: Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
* Blocked Algorithms: Perform large computations by performing many smaller computations.

In [None]:
import dask.array as da
import numpy as np
import dask

In [None]:
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

In [None]:
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z

In [None]:
z.compute()

Let's compare Numpy to Dask. First we time the numpy version of a computation:

In [None]:
%%time
xn = np.random.normal(10, 0.1, size=(30_000, 30_000))
yn = xn.mean(axis=0)
yn

In [None]:
del xn, yn # Let's release some memory

And now the Dask version:

In [None]:
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
xd

In [None]:
xd.nbytes / 1e9  # Gigabytes of the input processed lazily

In [None]:
yd = xd.mean(axis=0)
yd

In [None]:
%%time
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()

**Questions to think about:**

* What happens if the Dask chunks=(10000,10000)?
* What happens if the Dask chunks=(30,30)?

### Exercise:

For Dask arrays, compute the mean along axis=1 of the sum of the x array and its transpose.

In [None]:
# Your code here


**Solution:**

In [None]:
x_sum = xd + xd.T
res = x_sum.mean(axis=1)
res.compute()

In [None]:
del xd, yd # Another memory release

### Persist Data in Memory

If you have the available RAM for your dataset then you can persist data in memory.
This allows future computations to be much faster.

In [None]:
y = y.persist()

In [None]:
%time y[0, 0].compute()

In [None]:
%time y.sum().compute()