Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] dask-cudf doesn't support "corr"/correlation function like Pandas and cuDF #3363

Closed
jdye64 opened this issue Nov 12, 2019 · 28 comments · Fixed by #3393
Closed

[FEA] dask-cudf doesn't support "corr"/correlation function like Pandas and cuDF #3363

jdye64 opened this issue Nov 12, 2019 · 28 comments · Fixed by #3393
Labels
dask Dask issue feature request New feature or request Python Affects Python cuDF API.

Comments

@jdye64
Copy link
Contributor

jdye64 commented Nov 12, 2019

When attempting to perform a correlation like sales_corr = sales['pr_review_rating', 'count'].corr(sales['pr_review_rating', 'mean']) dask-cudf fails with the following error.

TypeError: cannot concatenate object of type "<class 'cudf.core.series.Series'>"; only pd.Series, pd.DataFrame, and pd.Panel (deprecated) objs are valid

It seems that we might limit this currently. However dask-cudf should behave exactly like cuDF and Pandas.

def concat_cudf(
dfs, axis=0, join="outer", uniform=False, filter_warning=True, sort=None
):
assert axis == 0

@jdye64 jdye64 added Needs Triage Need team to review and classify feature request New feature or request labels Nov 12, 2019
@beckernick beckernick added Python Affects Python cuDF API. dask Dask issue and removed Needs Triage Need team to review and classify labels Nov 12, 2019
@beckernick
Copy link
Member

beckernick commented Nov 12, 2019

Reproducer in a dev-nightly container as of 11/12/19:

import dask_cudf
import cudfdf = cudf.datasets.randomdata(10)
ddf = dask_cudf.from_cudf(df, 2)
ddf.x.corr(ddf.y)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-3-4ce11b32272a> in <module>
      4 df = cudf.datasets.randomdata(10)
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 6 ddf.x.corr(ddf.y)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in corr(self, other, method, min_periods, split_every)
   2978         if method != "pearson":
   2979             raise NotImplementedError("Only Pearson correlation has been implemented")
-> 2980         df = concat([self, other], axis=1)
   2981         return cov_corr(
   2982             df, min_periods, corr=True, scalar=True, split_every=split_every

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
   1046     if axis == 1:
   1047         if all(df.known_divisions for df in dasks):
-> 1048             return concat_indexed_dataframes(dfs, axis=axis, join=join)
   1049         elif (
   1050             len(dasks) == len(dfs)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/multi.py in concat_indexed_dataframes(dfs, axis, join)
    884     warn = axis != 0
    885     meta = methods.concat(
--> 886         [df._meta for df in dfs], axis=axis, join=join, filter_warning=warn
    887     )
    888     empties = [strip_unknown_categories(df._meta) for df in dfs]

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/methods.py in concat(dfs, axis, join, uniform, filter_warning)
    354         func = concat_dispatch.dispatch(type(dfs[0]))
    355         return func(
--> 356             dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
    357         )
    358 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf(dfs, axis, join, uniform, filter_warning, sort)
     31     dfs, axis=0, join="outer", uniform=False, filter_warning=True, sort=None
     32 ):
---> 33     assert axis == 0
     34     assert join == "outer"
     35     return cudf.concat(dfs)

AssertionError: 

@mrocklin
Copy link
Collaborator

In the following code:

https://github.com/dask/dask/blob/9c72876af571ef90ecad5df7c8bd123b06220305/dask/dataframe/core.py#L5124-L5134

It looks like we need to change the np.zeros functions to np.zeros_like to respect the cupy array values.

However, it looks like this function might also use record arrays, which may not be supported in cupy. We can probably have this function return a tuple/list/dict instead.

@mrocklin
Copy link
Collaborator

@rjzamora any interest in diving in here?

@rjzamora
Copy link
Member

@rjzamora any interest in diving in here?

Sure - as long as you are okay with my suggestion here :)

@beckernick
Copy link
Member

Re-opening this due to new information and cross-linking.

This is partially dependent on dask/dask#5643 , but also currently fails for me. In a small test, I end up with a different error from dask/dask#5643 . This test can get past that issue because the dataframes have known divisions, which leads to the generalized concat_indexed_dataframes during the concat. Instead, here, we end up in a codepath that tries to execute lambda self, other: elemwise(op, self, other), where the operation is __ne__ , self is the dask dataframe, and other is the string pearson. This binary op ends up causing a cuDF error way down the call stack due to this binary op.

Example:

import cudf
import dask_cudf
d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
d2 = cudf.datasets.randomdata(100, dtypes={"c":float, "d":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd2 = dask_cudf.from_cudf(d2, 5)
dd1.corr(dd2)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    168     try:
--> 169         yield
    170     except Exception as e:
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   4421         with raise_on_meta_error(funcname(op)):
-> 4422             meta = partial_by_order(*parts, function=op, other=other)
   4423 
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py in partial_by_order(*args, **kwargs)
   1075         args2.insert(i, arg)
-> 1076     return function(*args2, **kwargs)
   1077 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in __ne__(self, other)
    893     def __ne__(self, other):
--> 894         return self._apply_op("__ne__", other)
    895 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py in _apply_op(self, fn, other, fill_value)
    754             for k, col in enumerate(self._cols):
--> 755                 result[col] = getattr(self._cols[col], fn)(other[k])
    756         elif isinstance(other, DataFrame):
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in __ne__(self, other)
   1000     def __ne__(self, other):
-> 1001         return self._unordered_compare(other, "ne")
   1002 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _unordered_compare(self, other, cmpops)
    948         result_name = utils.get_result_name(self, other)
--> 949         other = self._normalize_binop_value(other)
    950         outcol = self._column.unordered_compare(cmpops, other)
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py in _normalize_binop_value(self, other)
    943         else:
--> 944             return self._column.normalize_binop_value(other)
    945 
/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py in normalize_binop_value(self, other)
    139         else:
--> 140             raise TypeError("cannot broadcast {}".format(type(other)))
    141 
TypeError: cannot broadcast <class 'str'>
During handling of the above exception, another exception occurred:
ValueError                                Traceback (most recent call last)
<ipython-input-36-b0dbaa32cc44> in <module>
      6 dd1 = dask_cudf.from_cudf(d1, 5)
      7 dd2 = dask_cudf.from_cudf(d2, 5)
----> 8 dd1.corr(dd2)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in corr(self, method, min_periods, split_every)
   4013     @derived_from(pd.DataFrame)
   4014     def corr(self, method="pearson", min_periods=None, split_every=False):
-> 4015         if method != "pearson":
   4016             raise NotImplementedError("Only Pearson correlation has been implemented")
   4017         return cov_corr(self, min_periods, True, split_every=split_every)
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in <lambda>(self, other)
   1333             return lambda self, other: elemwise(op, other, self)
   1334         else:
-> 1335             return lambda self, other: elemwise(op, self, other)
   1336 
   1337     def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   4420         ]
   4421         with raise_on_meta_error(funcname(op)):
-> 4422             meta = partial_by_order(*parts, function=op, other=other)
   4423 
   4424     result = new_dd_object(graph, _name, meta, divisions)
/opt/conda/envs/rapids/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that
/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    188         )
    189         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 190         raise ValueError(msg)
    191 
    192 
ValueError: Metadata inference failed in `ne`.
Original error is below:
------------------------
TypeError("cannot broadcast <class 'str'>",)
Traceback:
---------
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/utils.py", line 169, in raise_on_meta_error
    yield
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py", line 4422, in elemwise
    meta = partial_by_order(*parts, function=op, other=other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/utils.py", line 1076, in partial_by_order
    return function(*args2, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 894, in __ne__
    return self._apply_op("__ne__", other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/dataframe.py", line 755, in _apply_op
    result[col] = getattr(self._cols[col], fn)(other[k])
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 1001, in __ne__
    return self._unordered_compare(other, "ne")
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 949, in _unordered_compare
    other = self._normalize_binop_value(other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/series.py", line 944, in _normalize_binop_value
    return self._column.normalize_binop_value(other)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/cudf/core/column/numerical.py", line 140, in normalize_binop_value
    raise TypeError("cannot broadcast {}".format(type(other)))

@beckernick
Copy link
Member

@rjzamora , thanks for pointing out that the above example shouldn't work. It's incorrectly using dataframes, which likely explains binaryop issue. This is representative of the workflow (correlation of two columns:

import cudf
import dask_cudfd1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
dd1 = dask_cudf.from_cudf(d1, 5)
dd1.a.corr(dd1.b).compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-7816cf3671a0> in <module>
      4 d1 = cudf.datasets.randomdata(100, dtypes={"a":float, "b":float})
      5 dd1 = dask_cudf.from_cudf(d1, 5)
----> 6 dd1.a.corr(dd1.b).compute()

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     79         get_id=_thread_get_id,
     80         pack_exception=pack_exception,
---> 81         **kwargs
     82     )
     83 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/opt/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in cov_corr_chunk(df, corr)
   5147             )
   5148             mu_discrepancy[mask] = np.nan
-> 5149             m[idx] = np.nansum(mu_discrepancy, axis=0)
   5150         m = m.T
   5151         dtype.append(("m", m.dtype))

<__array_function__ internals> in nansum(*args, **kwargs)

TypeError: no implementation found for 'numpy.nansum' on types that implement __array_function__: [<class 'cupy.core.core.ndarray'>]

CuPy just recently implemented nansum in v7.0, which is ahead of ours. I'll test this the master branch of CuPy, and report back. Thanks for looking into it.

@rjzamora
Copy link
Member

CuPy just recently implemented nansum in v7.0, which is ahead of ours. I'll test this the master branch of CuPy, and report back. Thanks for looking into it.

Right, im using '7.0.0rc1', and this seems to run fine

@beckernick
Copy link
Member

Love it @rjzamora . That solves the problem. Let's leave this issue open for tracking purposes and keep an eye on when CuPy v7.0 is considered the "stable" release and then evaluate updating our CuPy dependency accordingly.

@beckernick
Copy link
Member

beckernick commented Dec 5, 2019

Brief summary of the current status below:

In a vanilla cuDF / Dask environment, we cannot use Dask-cuDF's correlation. To use correlation on indexed dataframes, we need the following:

  • NumPy > 1.17
  • CuPy > 7.0

The NumPy upgrade can be done via conda, and the CuPy upgrade is best done by cloning the repository and then running pip install . from the cupy directory.

To use correlation on unindexed dataframes, we also need:

Since general usage can include both, the functional requirements are:

  • NumPy > 1.17
  • CuPy > 7.0
  • Dask master at least as recent as 2019-12-03

@kkraus14 @shwina , do you have any strong feelings on updating our dependency to NumPy >= 1.17? I suspect Dask will take care of itself and CuPy v7.0 is not yet considered the stable release.

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

We haven't pinned a numpy version in general for cuDF so I'm somewhat hesitant to require such a new version. What chunk of code requires numpy > 1.17? I'm fine with upgrading to Cupy >= 7.0 once there's a conda-forge release.

@beckernick
Copy link
Member

Ah, I guess I always just get 1.16 from another dependency then. Good to know.

This is the chunk of code that necessitates it, currently:

https://github.com/dask/dask/blob/efadf5a60b6f469099b2f63f704b80448f21c872/dask/array/utils.py#L325-L334

Without numpy > 1.17, we don't currently leverage duck typing to create a CuPy internal to Dask's cov_corr_chunk function. Thus, a slice of a numpy array ends up trying to be set with a CuPy array coming from a .values call on a cuDF object.

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

https://github.com/dask/dask/blob/efadf5a60b6f469099b2f63f704b80448f21c872/dask/array/utils.py#L325-L334

From my perspective this should be a dispatched function to allow for CuPy/Numpy agnostic behavior. @mrocklin @pentschev do you agree?

@mrocklin
Copy link
Collaborator

mrocklin commented Dec 5, 2019

We've been relying on the __array_function__ protocol for Numpy/Cupy agnostics behavior. I'm somewhat against creating an internal dispatching layer for Numpy within Dask Array if we can avoid it.

I suspect that the 1.17 requirement that @beckernick is referring to is just that that is when __array_function__ becomes on by default.

@pentschev
Copy link
Member

To complement @mrocklin 's comment, for that functionality we need CuPy >= 6.4.0 only, and we can still use NumPy 1.16 if people are fine with setting the NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 environment variable (which I assume users will not be).

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

I think this is something different than __array_function__ protocol, because this is a function that allocates a numpy array. As far as I'm aware there's no way to override this as of yet, though the community is talking about what that would look like.

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

To complement @mrocklin 's comment, for that functionality we need CuPy >= 6.4.0 only, and we can still use NumPy 1.16 if people are fine with setting the NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 environment variable (which I assume users will not be).

I'm happy to enforce numpy >= 1.17 for cudf then, but does the zeros_like function allow for allocating a Cupy array if a cupy array is passed in?

@beckernick
Copy link
Member

cc @rjzamora , too.

My understanding is consistent with Keith's. The NEP-18 flag doesn't solve the problem because if zeros_like in the NumPy namespace does not have the shape argument, we default to a pure np.zeros(shape), which will allocate a numpy array. The shape argument was not added until 1.17

@beckernick
Copy link
Member

@pentschev I believe we also need CuPy >7.0 in order to dispatch to cupy.nansum internal to the cov_cor_chunk function, which doesnt exist until v7.0.

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

Confirmed this will work as expected, so no need for a Dask dispatch, sorry for false alarm 😅

In [1]: import numpy as np

In [2]: import cupy

In [3]: test = cupy.zeros(10)

In [4]: type(np.zeros_like(test, shape=5))
Out[4]: cupy.core.core.ndarray

@beckernick
Copy link
Member

zeros_like allows for CuPy array allocation:

import numpy as np
import cupy as cpgarr = cp.array(range(5))
print(type(np.zeros_like(garr)))
<class 'cupy.core.core.ndarray'>

But the issue is that without the shape parameter we switch codepaths, currently.

@pentschev
Copy link
Member

The shape= argument was introduced by me in NumPy and CuPy to address exactly that shortcoming. It's the only special case for array creation with __array_function__. The *_like functions will allow dispatching via __array_function__ according to the first argument (if NumPy, dispatch to NumPy itself, if CuPy dispatch to CuPy, etc.), and the new shape= argument allows us to create an arbitrarily-shaped array with the correct array type, which wasn't possible before.

@kkraus14
Copy link
Collaborator

kkraus14 commented Dec 5, 2019

As long as we enforce a sufficiently new numpy and CuPy version all should be well which I'm perfectly happy to do for cuDF.

@kkraus14 kkraus14 added this to Issue-Needs prioritizing in v0.12 Release via automation Dec 6, 2019
@beckernick
Copy link
Member

beckernick commented Dec 12, 2019

Now that CuPy v7.0 is officially released, are we ready to undo the CuPy version restriction from #3539 and also enforce NumPy > 1.17?

I believe that would officially close this issue. cc @randerzander

CuPy 7.0 is available from conda-forge

@jakirkham
Copy link
Member

We now support CuPy 7.0.0 ( #3619 ) 🙂

@kkraus14
Copy link
Collaborator

This should be resolved as of now with CuPy >= 7.

v0.12 Release automation moved this from Issue-Needs prioritizing to Done Jan 13, 2020
@jangorecki
Copy link

@kkraus14 are there unit tests covering that? If not maybe it is worth to keep this issue open just for adding unit tests, so eventual regression in future can be spotted. Any idea if it works in groupby already? Or that should be a separate issue?

@jakirkham
Copy link
Member

It's important to note that this failure was the product of an issue with cuDF and upstream libraries. That said, IIRC @rjzamora included a fix to cuDF and to Dask both of which include tests. @pentschev also implemented nansum in CuPy, which has its own test. So I think this is covered pretty well. That said, if there is another test you would like to add, I think that would be happily accepted 🙂

@jangorecki
Copy link

@jakirkham thanks for clarifying. It was not obvious from reading this thread. It is always useful to refer the issue from commit or PR so it is clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue feature request New feature or request Python Affects Python cuDF API.
Projects
No open projects
v0.12 Release
  
Done
Development

Successfully merging a pull request may close this issue.

8 participants