<img src="images/dask_horizontal.svg"
     width="300px"
     alt="Dask logo">
     
# Distributed

So far we've been dealing with Dask's **high-level interface**, and the **task graph**. Here we'll briefly talk about the how the task graphs get executed. For a longer discussion of this topic, take a look at [this notebook](https://github.com/jrbourbeau/hacking-dask/blob/main/4-distributed-scheduler.ipynb).

<img src="images/dask-cluster.png" width="100%"> 

## Create a cluster

Let's create a cluster locally

In [None]:
from dask.distributed import Client

client = Client()
client

Let's use the example from the notebook before to trigger a computation.

In [None]:
import dask.dataframe as dd

ddf = dd.read_csv("data/*", parse_dates=["timestamp"]).set_index("timestamp")

mean_temperature = ddf.temperature.mean()
output = (ddf.temperature - mean_temperature).resample("1M").agg(["min", "max"])

## Run directly on workers
You can also directly run functions on every worker bypassing the scheduler entirely.

In [None]:
import os

client.run(os.listdir, "data")

## Worker Plugins

Do _stuff_ on a worker every time the worker changes state.

In [None]:
import os
from distributed import WorkerPlugin

class CopyFile(WorkerPlugin):
    """A WorkerPlugin to copy a local file to workers.
    
    Parameters
    ----------
    filepath: str
        A path to the file to copy to workers
    Examples
    --------
    >>> client.register_worker_plugin(CopyFile(".env"))
    """
    def __init__(self, filepath):
        """
        Initialize the plugin by reading in the data from the given file.
        """
        self.filename = os.path.basename(filepath)
        with open(filepath, "rb") as f:
            self.data = f.read()

    async def setup(self, worker):
        with open(self.filename, "wb+") as f:
            f.write(self.data)
        return os.listdir()

**NOTE:** This is slightly goofy since this is a `LocalCluster` and it'll only work on my machine, since I put a picture of a cat up a few levels up.

In [None]:
client.register_worker_plugin(CopyFile("../../cat.jpg"))