Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lazily load dask arrays to dask data frames by calling to_dask_dataframe #1489

Merged
merged 33 commits into from
Oct 28, 2017

Conversation

jmunroe
Copy link
Contributor

@jmunroe jmunroe commented Jul 26, 2017

Working towards on a solution for #1462

Just some stub code for the moment.

Dask dataframes don't appear to support MultiIndex so not sure what to do about that.

@shoyer
Copy link
Member

shoyer commented Jul 27, 2017

Given that dask dataframes don't support MultiIndexes (among many other features), I have a hard time seeing them as a drop-in replacement for pandas.DataFrame. So maybe it would make sense to make this a separate method, e.g., to_dask_dataframe()?

We could also use a new method as an opportunity to slightly change the API, by not setting an index automatically. This lets us handle N-dimensional data while side-stepping the issue of MultiIndex support -- I don't think this would be very useful when limited to 1D arrays, and dask MultiIndex support seems to be a ways away (dask/dask#1493). Also, set_index() in dask shuffles data, so it can be somewhat expensive.

@jmunroe
Copy link
Contributor Author

jmunroe commented Jul 27, 2017

After working on this for a little while, I agree that this really should be a to_dask_dataframe() method. I'll make that change.

@jmunroe jmunroe changed the title WIP: lazily load dask arrays when calling to_dataframe lazily load dask arrays to dask data frames by calling to_dask_dataframe Jul 28, 2017

columns = [k for k in self if k not in self.dims]

data = [self._variables[k].data.reshape(-1) for k in columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use Variable.set_dims() like in the to_dataframe() example. Otherwise this breaks when different variables have different shapes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd added that ... but I am not entirely clear if what I did was sufficient.

columns = [k for k in self if k not in self.dims]

data = [self._variables[k].data.reshape(-1) for k in columns]
df = dd.from_dask_array(da.stack(data, axis=1), columns=columns)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stacking together variables into a single dask array loses dtype information, which is not ideal. Can you look into constructing a dask dataframe from a list of dask arrays?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a simple way of doing this by wrapping something already in the dask api. So, instead, I am constructing the dask graph directly following the pattern of the other dask dataframe constructors (like dd.from_dask_array() ).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

So far, we have tried to avoid adding code that actually constructs dask graphs to xarray proper as it's easier to maintain in dask. Can you check with the dask developers to see if they would be interested in adding this functionality (i.e., DataFrame from list of dask arrays) upstream in some form? I suspect they would be!

@@ -2384,6 +2384,34 @@ def from_dataframe(cls, dataframe):
obj[name] = (dims, data)
return obj

def to_dask_dataframe(self, set_index=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to default to set_index=True, since xarray focuses on handling N-dimensional data, so data known ahead of time to be 1D is somewhat rare.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, that makes sense. set_index=False is the default (index is simply an integer as would be from calling df.reset_index() )

index = self.coords.to_index(self.dims)

index = dd.from_array(index.values).repartition(divisions=df.divisions)
df = df.set_index(index, sort=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index variable should already have been added as a column, so rather than converting it into a dask array again, I would simply handle it by name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 1-d case, if the index/coordinate variable is a normal array, it needs to be partitioned into divisions consistent with the chunking pattern for the underlying dask array.

I still need to handle the case where the coordinate variable itself is already a dask array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coordinate variable can now be a dask array

x = da.from_array(np.random.randn(10), chunks=4)
y = np.random.randn(10)
t = list('abcdefghij')
ds = Dataset(OrderedDict([('a', ('t', x)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some test cases with non-1D data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a 2-d test case.

@jhamman jhamman modified the milestone: 0.10 Aug 4, 2017
@shoyer
Copy link
Member

shoyer commented Aug 10, 2017

@jmunroe This is great functionality -- thanks for your work on this!

One concern: if possible, I would like to avoid adding explicit dask graph building code in xarray. It looks like the canonical way to transform from a list of dask/numpy arrays to a dask dataframe is to make use of dask.dataframe.from_array along with dask.dataframe.concat:

In [34]: import numpy as np

In [35]: import dask.dataframe as dd

In [36]: import dask.array as da

In [37]: x = da.from_array(np.arange(5), 2)

In [38]: y = da.from_array(np.linspace(-np.pi, np.pi, 5), 2)

# notice that dtype is preserved properly
In [39]: dd.concat([dd.from_array(x), dd.from_array(y)], axis=1)
Out[39]:
Dask DataFrame Structure:
                   0        1
npartitions=2
0              int64  float64
2                ...      ...
4                ...      ...
Dask Name: concat-indexed, 26 tasks

Can you look into refactoring your code to make use of these?

@jmunroe
Copy link
Contributor Author

jmunroe commented Aug 13, 2017

I agree that using dask.dataframe.from_array and dask.dataframe.concat should work. Sorry I haven't had a chance to get back to this recently. I'll try to make the change early next week.

@jmunroe
Copy link
Contributor Author

jmunroe commented Sep 5, 2017

Sorry for the delay. I think this task is now complete.

@jmunroe
Copy link
Contributor Author

jmunroe commented Oct 11, 2017

Hi @shoyer and @jhamman . Thanks for your patience. Please let me know if there is still anything needed to be done on this PR.

# order columns so that coordinates appear before data
columns = list(chunked)
num_coords = len(coord_columns)
columns = columns[-num_coords:] + columns[:-num_coords]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is a little fragile -- it relies on the iteration order of xarray datasets, which although unlikely to change is not really guaranteed to have coordinates last.

It would be better to build this up explicitly from the dataset, e.g.,

columns = list(ds.dims)
columns.extend(k for k in ds.coords if k not in ds.dims)
columns.extend(ds.data_vars)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! that is much better. I'll do what I think is the simplest thing:

columns = list(self.coords) + list(self.data_vars)

actual = ds.to_dask_dataframe(set_index=True)
self.assertIsInstance(actual, dd.DataFrame)
assert expected_pd.equals(actual.compute())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more test I would like to see is an xarray Dataset without coordinate labels along a dimension, e.g., xarray.Dataset({'foo': ('x', [1, 2, 3])}). x should appear as a column in the dask DataFrame as it does when converting into pandas, with default integer values [0, ..., n-1].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

coord_columns = [c for c in self if c in ordered_dims]

# convert all coordinates into data variables
ds = self.reset_index(coord_columns).reset_coords()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually necessary? Everything is already in ._variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent was to be able to call Dataset.chunk() so that all of the variables (both coordinates and data) were first converted to dask arrays. But IndexVariables only have a dummy .chunk() method.

I'll change it so coordinate variables are converted with .to_base_variable()


# convert all coordinates into data variables
ds = self.reset_index(coord_columns).reset_coords()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might need to assign default coordinates here for dimensions without coordinates. This might be easiest to do by pulling out default variables from this dataset, e.g.,

ds = self.assign_coords(**{k: ds[k] for k in ds.dims})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see the motivation for including the default coordinates in the dask dataframe for dimensions without coordinates. But I have ensured that dimensions without coordinates do not cause a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning here would be that the default coordinates might not be necessary in array form, but in flattened DataFrame form they're probably necessary to work with the data. For example, consider an image with dimensions ('x', 'y', 'channel') without any labels along x and y. If you don't add the default dimensions, when you flatten the image you can no longer distinguish between the two original orderings (x, y) and (y, x).

for k in columns]

# restore names of columns that were also index names (e.g. x_ -> x)
columns = [c[:-1] if c[:-1] in ds.dims else c for c in columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little fragile -- it would be nice to avoid if possible. (See my note above on avoiding reset_index()/reset_coords())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've restructured the code so that this is no longer needed.

df = df.set_index(coord_dim)

if isinstance(df, dd.Series):
df = df.to_frame()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does dask.dataframe.concat really create a Series if asked to concat along axis=1 with only one input? If so, it would be appreciated if you could file this as an dask bug (which it almost certainly is) and reference it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that appears to be the current behaviour of dask.dataframe.concat. And I agree that this not the same as what pd.concat does (if s is a pandas Series, then pd.concat([s], axis=1) is indeed a DataFrame).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue filed as dask/dask#2798

@@ -546,6 +547,92 @@ def test_from_dask_variable(self):
coords={'x': range(4)}, name='foo')
self.assertLazyAndIdentical(self.lazy_array, a)

def test_to_dask_dataframe(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be appreciated if you could break this into a few more sub-methods. We don't always follow this well currently, but smaller tests that only test one things are easier to work with.

There's no strict line limit, but aim for less than 10-20 lines if possible. Another good time to break a test into parts is when you have different input data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. Done.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a little more, there are at least two obvious ways to order entries in the resulting DataFrame:

  1. Along each axis in order. This is what we have here. The advantage is that the order of the rows is independent of the chunksize, and matches the order of .to_dataframe(). The downside is that if there is a small chunksize along the last axis, the dask DataFrame will be divided into a very large number of divisions. This might make things very slow.
  2. Flattening each block of the arrays in order (e.g., by calling .map_blocks(np.ravel, ...). This would be guaranteed to be very fast, since it directly translates each chunk of a dask array into a DataFrame division. The downside is that the order of rows would now depend on the chunks. Eventually, the data might need to be redivided for analysis, but that would only when explicitly requested.

I think it might make sense to switch to this second behavior, especially if we want this to work well for very large datasets. Anyone else have thoughts here?

for k in columns:
v = self._variables[k]

# consider coordinate variables as well as data varibles
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good place to mention in a comment your discovery that we need to convert to base variables in order for chunk() to work properly.

if v.chunks != chunks:
v = v.chunk(chunks)

# reshape variable contents as a 1d array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: some of these comments are probably slightly overboard -- if they simply restate what's in the code it's better to omit them.

if len(ordered_dims) != 1:
raise ValueError(
'set_index=True only is valid for '
'for one-dimensional datasets')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the list of multiple dimensions in the error message?


# convert all coordinates into data variables
ds = self.reset_index(coord_columns).reset_coords()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning here would be that the default coordinates might not be necessary in array form, but in flattened DataFrame form they're probably necessary to work with the data. For example, consider an image with dimensions ('x', 'y', 'channel') without any labels along x and y. If you don't add the default dimensions, when you flatten the image you can no longer distinguish between the two original orderings (x, y) and (y, x).

@jmunroe
Copy link
Contributor Author

jmunroe commented Oct 20, 2017

I don't understand how only test (TestDataArrayAndDataset::test_to_dask_dataframe_2D) can pass on TravisCI yet fail on Appveyor.

@shoyer
Copy link
Member

shoyer commented Oct 21, 2017

@jcrist @mrocklin @jhamman do any of you have opinions on my latest design question above about the order of elements in dask dataframes? Is it as important as I suspect to keep chunking/divisions consistent when converting from arrays to dataframes?

@mrocklin
Copy link
Contributor

I think that you would want to rechunk the dask.array so that its chunks align with the outputs divisions of the dask.dataframe. For example if you have a 2d array and are partitioning along the x-axis then you will want to align the array so that there is no chunking along the y axis. In this case set_index will also be free because your data is already aligned and you already know (I think) the division values.

@shoyer
Copy link
Member

shoyer commented Oct 21, 2017

@mrocklin are you saying that it's easier to properly rechunk data on the xarray side (as arrays) before converting to dask dataframes? That does make sense -- we have some nice structure (as multi-dimensional arrays) that is lost once the data gets put in a DataFrame.

In this case, I suppose we really should add a keyword argument like dims_order to to_dask_dataframe() that lets the user choose how they want to order dimensions on the result.

Initially, I was concerned about the resulting dask graphs when flattening out arrays in the wrong order. Although that would have bad performance implications if you need to stream the data from disk, I see now the total number of chunks no longer blows up, thanks to @pitrou's impressive rewrite of dask.array.reshape().


import dask.dataframe as dd

ordered_dims = self.dims
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add that dims_order keyword argument. Then this becomes something like:

if dims_order is None:
    dims_order = self.dims
ordered_dims = OrderedDict((k, self.dims[k]) for k in dims_order)

actual = ds.to_dask_dataframe(set_index=False)

self.assertIsInstance(actual, dd.DataFrame)
assert expected.equals(actual.compute()), (expected, actual.compute())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using pandas.testing.assert_frame_equal to get a better error message (for easier debugging on Appveyor):

# at the top
try:
    from pandas.testing import assert_frame_equal
except ImportError:
    # old location, for pandas < 0.20
    from pandas.util.testing import assert_frame_equal

# here
computed = actual.compute()
assert_frame_equal(actual, expected)

@shoyer
Copy link
Member

shoyer commented Oct 27, 2017

Just pushed a couple of commits, which should resolve the failures on Windows. It was typical int32 vs int64 NumPy on Windows nonsense.

@shoyer
Copy link
Member

shoyer commented Oct 28, 2017

@jmunroe Thanks for your help here! I'm going to merge this now and take care of my remaining clean-up requests in a follow-on PR.

@shoyer shoyer merged commit 1436509 into pydata:master Oct 28, 2017
@jmunroe
Copy link
Contributor Author

jmunroe commented Oct 28, 2017

@shoyer Sound good. Thanks.

@shoyer shoyer mentioned this pull request Oct 28, 2017
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants