In [None]:
import dask.dataframe as dd
from dask.distributed import Client
from matplotlib import pyplot as plt
from matplotlib import rcParams
rcParams['figure.figsize'] = [15, 10]

### Set up a local scheduler using the Client class
#### The web interface is accessable at http://localhost:8787

In [None]:
client = Client()
client

### Read in a data set, as per usual

In [None]:
ddf = dd.read_csv('../data/ign.csv').repartition(npartitions=100)

In [None]:
ddf_genre_counts = ddf.genre.value_counts()
popular_game_genres = ddf_genre_counts[ddf_genre_counts > 500]

### Rather than calling the .compute() method of the dask.dataframe object, we call the .compute() method of the client object, passing the dataframe to the scheduler

In [None]:
popular_games = client.compute(popular_game_genres)
popular_games

### This method returns a Future object; when complete, its .result() method returns the collected output

In [None]:
popular_games.result()

### DataFrame task graphs are composed just as before; any valid dask.dataframe operation can be handled by the distributed task scheduler

In [None]:
recent_popular_releases = ddf[(ddf.release_year > 2000) & (ddf.genre.isin(popular_game_genres.index.compute()))]
releases_by_genre = recent_popular_releases.groupby(['release_year','release_month','genre']).genre.count()

In [None]:
# Push task graph to distributed scheduler
out_releases_by_genre = client.compute(releases_by_genre)

### Note we are sorting the aggregated result outside the cluster context; this is much cheaper than shuffling the entire dataframe

In [None]:
out_releases_by_genre.result().sort_index().unstack().plot()
plt.show()

### We can submit custom workloads to the distributed scheduler as well

In [None]:
def factorial(x):
    if x == 1:
        return 1
    else:
        return x * factorial(x-1)

def div(x,y):
    return float(x)/float(y)

data = [20,10,5,2,1]
A = client.map(factorial, data)
B = client.map(div, A, [1000]*len(A))
client.gather(B)

### Interestingly, we get a list of Future objects; this means we can do things like slicing, item access, and attribute access asynchronously! 

In [None]:
A

### We can even access intermediate results; these are temporarily cached on the workers, but can be recomputed if the cache becomes stale

In [None]:
client.gather(A[1:3])

In [None]:
client.gather(B[1:3])

### If we want a true reduce operation, we can use the submit-result methods instead of the gather method

In [None]:
total = client.submit(sum, B)
total.result()