Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added dask data interface #974

Merged
merged 8 commits into from Nov 16, 2016

Conversation

Projects
None yet
3 participants
@philippjfr
Copy link
Contributor

philippjfr commented Nov 5, 2016

This PR adds an interface for Dask Dataframes making it possible to work with very large out-of-core dataframes. The interface is almost complete with some notable exceptions:

  1. Dask dataframes do not support sorting which means that the sort method simply warns and continues.
  2. Not all functions can be easily applied to a dask dataframe so some functions applied with aggregate and reduce will fail.
  3. Dask does not support setting sort=False on aggregations, meaning that the aggregation groups are sorted and do not preserve the same order as other interfaces.
  4. Dask does not easily support adding a new column to an existing dataframe unless it is a scalar, therefore add_dimension will error when supplied a non-scalar value.

Otherwise the full dataset test suite is being run against the interface so it all seems to be working.

Here is an example loading a 1.1GB CSV file and generating a DynamicMap of datashaded images grouped by the origin of the flights. In this example only the flight origins have to be loaded to apply the groupby. The aggregated data is not loaded until the first datashaded plot is displayed:

%%timeit -r 1 -n 1
df = pd.read_csv('../apps/opensky.csv')
dataset = hv.Dataset(df, vdims=['velocity'])
groups = dataset.to(hv.Points, ['longitude', 'latitude'], [], ['origin'], dynamic=True)
shaded_origins=datashade(groups)

1 loop, best of 1: 22 s per loop

%%timeit -r 1 -n 1
df = dd.read_csv('../apps/opensky.csv', blocksize=50000000)
dataset = hv.Dataset(df, vdims=['velocity'])
groups = dataset.to(hv.Points, ['longitude', 'latitude'], [], ['origin'], dynamic=True)
shaded_origins=datashade(groups)

1 loop, best of 1: 8.19 s per loop

And here is an example execution graph from a fairly complex expression, computing the mean velocity for every flight callsign originating in Algeria.

hv.Dataset(dd.read_csv('../apps/opensky.csv', blocksize=50000000), vdims=['velocity'])\
.select(origin='Algeria').aggregate(['icao24'], np.mean).data.visualize()

image

And here's a task execution plot from a datashader aggregation executed on two remote workers:

screen shot 2016-11-06 at 1 29 43 am

Overall this will do for us what xarray/iris have done for gridded data, letting us lazily load columnar data, extending our reach to datasets that are more than a few gigabytes.

@philippjfr philippjfr added the data label Nov 5, 2016

@philippjfr

This comment has been minimized.

Copy link
Contributor Author

philippjfr commented Nov 5, 2016

Just have to add a docstring and update the tests:

======================================================================
FAIL: test_Columnar_Data_data_002 (__main__.NBTester)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/ioam/holoviews/doc/nbpublisher/nbtest.py", line 512, in data_comparison
    raise e
AssertionError: ['array', 'dataframe', 'dictionary', 'grid', 'ndelement', 'cube', 'xarray', 'dask'] != ['array', 'dataframe', 'dictionary', 'grid', 'ndelement', 'cube', 'xarray']
@jbednar

This comment has been minimized.

Copy link
Contributor

jbednar commented Nov 5, 2016

Cool!!!!!!

@philippjfr philippjfr added the feature label Nov 5, 2016

@philippjfr philippjfr force-pushed the dask_interface branch from dc0e11f to d0c84d6 Nov 6, 2016

@philippjfr

This comment has been minimized.

Copy link
Contributor Author

philippjfr commented Nov 6, 2016

Buildbot still failing to update reference data for some reason:

You asked to amend the most recent commit, but doing so would make
it empty. You can repeat your command with --allow-empty, or you can
remove the commit entirely with "git reset HEAD^".

@jbednar

This comment has been minimized.

Copy link
Contributor

jbednar commented Nov 7, 2016

Not sure what that buildbot message could be about.

I don't quite understand the task execution plot; is there some reason the core numbers keep going up? Seems like it's only ever using 8 cores, but then for some reason which 8 it is changes over time? Confusing!

@philippjfr

This comment has been minimized.

Copy link
Contributor Author

philippjfr commented Nov 8, 2016

Not sure what that buildbot message could be about.

Hopefully @jlstevens can figure it out ;-)

I don't quite understand the task execution plot; is there some reason the core numbers keep going up? Seems like it's only ever using 8 cores, but then for some reason which 8 it is changes over time? Confusing!

Tbh I don't quite understand that bit either, it's nice to watch while it's executing though.

Philipp Rudiger added some commits Nov 8, 2016

Philipp Rudiger Philipp Rudiger
Philipp Rudiger Philipp Rudiger

@philippjfr philippjfr force-pushed the dask_interface branch from 8937d8b to 9c80879 Nov 8, 2016

Philipp Rudiger Philipp Rudiger
empty.loc[0, :] = (np.NaN,) * empty.shape[1]
paths = [elem for path in paths for elem in (path, empty)][:-1]
datasets = [Dataset(p) for p in paths]
if isinstance(paths[0], dd.DataFrame):

This comment has been minimized.

@jlstevens

jlstevens Nov 15, 2016

Contributor

isinstance checks over data formats is just the sort of thing interfaces are supposed to handle for you. I am hoping we can get rid of these isinstance checks, perhaps by using the appropriate utility to select the right interface based on the data type (i.e whatever dataframe type it happens to be)?

@jlstevens

This comment has been minimized.

Copy link
Contributor

jlstevens commented Nov 15, 2016

I've reviewed this PR and for the most part, I am generally happy with it as a step towards proper dask support.

For this PR, I feel the new code inget_agg_data can probably be improved to avoid the use of isinstance. Other than that, we discussed two other changes (not to be implemented in this PR) that would complete the dask interface:

  1. Using __nonzero__ (Python 2) and __bool__ (Python 3) instead of __len__ to check 'truthiness'.
  2. Completing some of the missing methods (indicated where SkipTest has been used in the unit tests), specifically those to do with adding dimension values and boolean indexing. If the sorting methods can't be supported, I feel that would be an acceptable limitation.
@jlstevens

This comment has been minimized.

Copy link
Contributor

jlstevens commented Nov 16, 2016

Looks good! Merging.

@jlstevens jlstevens merged commit 3ba0b42 into master Nov 16, 2016

3 of 4 checks passed

coverage/coveralls Coverage decreased (-2.7%) to 72.975%
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
s3-reference-data-cache Test data is cached.
Details

@philippjfr philippjfr deleted the dask_interface branch Dec 10, 2016

@philippjfr philippjfr added this to the v1.7.0 milestone Jan 14, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.