Skip to content

Commit

Permalink
Merge branch 'master' into funcname-truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Sep 12, 2019
2 parents 3d7b568 + 9ae60bd commit 30b80e2
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 17 deletions.
4 changes: 2 additions & 2 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def store(
>>> x = ... # doctest: +SKIP
>>> import h5py # doctest: +SKIP
>>> f = h5py.File('myfile.hdf5') # doctest: +SKIP
>>> f = h5py.File('myfile.hdf5', mode='a') # doctest: +SKIP
>>> dset = f.create_dataset('/data', shape=x.shape,
... chunks=x.chunks,
... dtype='f8') # doctest: +SKIP
Expand Down Expand Up @@ -4356,7 +4356,7 @@ def to_hdf5(filename, *args, **kwargs):

import h5py

with h5py.File(filename) as f:
with h5py.File(filename, mode="a") as f:
dsets = [
f.require_dataset(
dp,
Expand Down
20 changes: 10 additions & 10 deletions dask/array/tests/test_array_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1854,23 +1854,23 @@ def test_to_hdf5():

with tmpfile(".hdf5") as fn:
x.to_hdf5(fn, "/x")
with h5py.File(fn) as f:
with h5py.File(fn, mode="r+") as f:
d = f["/x"]

assert_eq(d[:], x)
assert d.chunks == (2, 2)

with tmpfile(".hdf5") as fn:
x.to_hdf5(fn, "/x", chunks=None)
with h5py.File(fn) as f:
with h5py.File(fn, mode="r+") as f:
d = f["/x"]

assert_eq(d[:], x)
assert d.chunks is None

with tmpfile(".hdf5") as fn:
x.to_hdf5(fn, "/x", chunks=(1, 1))
with h5py.File(fn) as f:
with h5py.File(fn, mode="r+") as f:
d = f["/x"]

assert_eq(d[:], x)
Expand All @@ -1879,7 +1879,7 @@ def test_to_hdf5():
with tmpfile(".hdf5") as fn:
da.to_hdf5(fn, {"/x": x, "/y": y})

with h5py.File(fn) as f:
with h5py.File(fn, mode="r+") as f:
assert_eq(f["/x"][:], x)
assert f["/x"].chunks == (2, 2)
assert_eq(f["/y"][:], y)
Expand Down Expand Up @@ -2370,7 +2370,7 @@ def test_asarray_h5py():
h5py = pytest.importorskip("h5py")

with tmpfile(".hdf5") as fn:
with h5py.File(fn) as f:
with h5py.File(fn, mode="a") as f:
d = f.create_dataset("/x", shape=(2, 2), dtype=float)
x = da.asarray(d)
assert d in x.dask.values()
Expand Down Expand Up @@ -2593,7 +2593,7 @@ def test_h5py_newaxis():
h5py = pytest.importorskip("h5py")

with tmpfile("h5") as fn:
with h5py.File(fn) as f:
with h5py.File(fn, mode="a") as f:
x = f.create_dataset("/x", shape=(10, 10), dtype="f8")
d = da.from_array(x, chunks=(5, 5))
assert d[None, :, :].compute(scheduler="sync").shape == (1, 10, 10)
Expand Down Expand Up @@ -2829,8 +2829,8 @@ def test_h5py_tokenize():
h5py = pytest.importorskip("h5py")
with tmpfile("hdf5") as fn1:
with tmpfile("hdf5") as fn2:
f = h5py.File(fn1)
g = h5py.File(fn2)
f = h5py.File(fn1, mode="a")
g = h5py.File(fn2, mode="a")

f["x"] = np.arange(10).astype(float)
g["x"] = np.ones(10).astype(float)
Expand Down Expand Up @@ -4067,13 +4067,13 @@ def test_auto_chunks_h5py():
h5py = pytest.importorskip("h5py")

with tmpfile(".hdf5") as fn:
with h5py.File(fn) as f:
with h5py.File(fn, mode="a") as f:
d = f.create_dataset(
"/x", shape=(1000, 1000), chunks=(32, 64), dtype="float64"
)
d[:] = 1

with h5py.File(fn) as f:
with h5py.File(fn, mode="a") as f:
d = f["x"]
with dask.config.set({"array.chunk-size": "1 MiB"}):
x = da.from_array(d)
Expand Down
6 changes: 4 additions & 2 deletions dask/bytes/tests/test_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def test_glob(hdfs):
basedir + p for p in ["/a", "/a1", "/a2", "/a3"]
}

assert set(hdfs.glob(basedir + "/c/*")) == {basedir + p for p in ["/c/x1", "/c/x2"]}
assert set(hdfs.glob(basedir + "/c/*")) == {
basedir + p for p in ["/c/x1", "/c/x2", "/c/d"]
}

assert set(hdfs.glob(basedir + "/*/x*")) == {
basedir + p for p in ["/c/x1", "/c/x2", "/c2/x1", "/c2/x2"]
Expand All @@ -251,7 +253,7 @@ def test_glob(hdfs):
assert hdfs.glob(basedir + "/*/missing") == []

assert set(hdfs.glob(basedir + "/*")) == {
basedir + p for p in ["/a", "/a1", "/a2", "/a3", "/b1"]
basedir + p for p in ["/a", "/a1", "/a2", "/a3", "/b1", "/c", "/c2"]
}


Expand Down
10 changes: 10 additions & 0 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,11 @@ def groupby(self, by=None, **kwargs):
def count(self, split_every=False):
return super(Series, self).count(split_every=split_every)

@derived_from(pd.Series, version="0.25.0")
def explode(self):
meta = self._meta.explode()
return self.map_partitions(M.explode, meta=meta)

def unique(self, split_every=None, split_out=1):
"""
Return Series of unique values in the object. Includes NA values.
Expand Down Expand Up @@ -3609,6 +3614,11 @@ def to_timestamp(self, freq=None, how="start", axis=0):
df.divisions = tuple(pd.Index(self.divisions).to_timestamp())
return df

@derived_from(pd.DataFrame, version="0.25.0")
def explode(self, column):
meta = self._meta.explode(column)
return self.map_partitions(M.explode, column, meta=meta)

def to_bag(self, index=False):
"""Convert to a dask Bag of tuples of each row.
Expand Down
5 changes: 3 additions & 2 deletions dask/dataframe/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ def pivot_table(df, index=None, columns=None, values=None, aggfunc="mean"):
# _emulate can't work for empty data
# the result must have CategoricalIndex columns
new_columns = pd.CategoricalIndex(df[columns].cat.categories, name=columns)
meta = pd.DataFrame(columns=new_columns, dtype=np.float64)
meta.index.name = index
meta = pd.DataFrame(
columns=new_columns, dtype=np.float64, index=pd.Index(df._meta[index])
)

kwargs = {"index": index, "columns": columns, "values": values}

Expand Down
24 changes: 24 additions & 0 deletions dask/dataframe/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4061,3 +4061,27 @@ def func():

assert "['x']" in str(info.value)
assert "['x', 'y']" in str(info.value)


@pytest.mark.skipif(
PANDAS_VERSION < "0.25.0", reason="Explode not implemented in pandas < 0.25.0"
)
def test_dataframe_explode():
df = pd.DataFrame({"A": [[1, 2, 3], "foo", [3, 4]], "B": 1})
exploded_df = df.explode("A")
ddf = dd.from_pandas(df, npartitions=2)
exploded_ddf = ddf.explode("A")
assert ddf.divisions == exploded_ddf.divisions
assert_eq(exploded_ddf.compute(), exploded_df)


@pytest.mark.skipif(
PANDAS_VERSION < "0.25.0", reason="Explode not implemented in pandas < 0.25.0"
)
def test_series_explode():
s = pd.Series([[1, 2, 3], "foo", [3, 4]])
exploded_s = s.explode()
ds = dd.from_pandas(s, npartitions=2)
exploded_ds = ds.explode()
assert_eq(exploded_ds, exploded_s)
assert ds.divisions == exploded_ds.divisions
14 changes: 14 additions & 0 deletions dask/dataframe/tests/test_reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ def test_pivot_table_dtype():
assert_eq(res, exp)


def test_pivot_table_index_dtype():
df = pd.DataFrame(
{
"A": pd.date_range(start="2019-08-01", periods=3, freq="1D"),
"B": pd.Categorical(list("abc")),
"C": [1, 2, 3],
}
)
ddf = dd.from_pandas(df, 2)
res = dd.pivot_table(ddf, index="A", columns="B", values="C", aggfunc="count")

assert res.index.dtype == np.dtype("datetime64[ns]")


def test_pivot_table_errors():
df = pd.DataFrame(
{
Expand Down
13 changes: 12 additions & 1 deletion dask/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ def foo(a, b, c):
assert funcname(functools.partial(foo, a=1)) == "foo"


def test_funcname_numpy_vectorize():
np = pytest.importorskip("numpy")

func = np.vectorize(int)

assert funcname(func) == "vectorize_int"


def test_ndeepmap():
L = 1
assert ndeepmap(0, inc, L) == 2
Expand Down Expand Up @@ -492,6 +500,7 @@ def test_parse_bytes():
assert parse_bytes("1e6") == 1000000
assert parse_bytes("1e6 kB") == 1000000000
assert parse_bytes("MB") == 1000000
assert parse_bytes(123) == 123


def test_parse_timedelta():
Expand All @@ -508,11 +517,13 @@ def test_parse_timedelta():
("1 ns", 1e-9),
("2m", 120),
("2 minutes", 120),
(None, None),
(3, 3),
(datetime.timedelta(seconds=2), 2),
(datetime.timedelta(milliseconds=100), 0.1),
]:
result = parse_timedelta(text)
assert abs(result - value) < 1e-14
assert result == value or abs(result - value) < 1e-14

assert parse_timedelta("1ms", default="seconds") == 0.001
assert parse_timedelta("1", default="seconds") == 1
Expand Down
9 changes: 9 additions & 0 deletions dask/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,9 @@ def funcname(func):
# multipledispatch objects
if "multipledispatch" in module_name and "Dispatcher" == type_name:
return func.name[:50]
# numpy.vectorize objects
if "numpy" in module_name and "vectorize" == type_name:
return ("vectorize_" + func.pyfunc.__name__)[:50]

# All other callables
try:
Expand Down Expand Up @@ -1158,9 +1161,13 @@ def parse_bytes(s):
1000000000
>>> parse_bytes('MB')
1000000
>>> parse_bytes(123)
123
>>> parse_bytes('5 foos') # doctest: +SKIP
ValueError: Could not interpret 'foos' as a byte unit
"""
if isinstance(s, (int, float)):
return int(s)
s = s.replace(" ", "")
if not s[0].isdigit():
s = "1" + s
Expand Down Expand Up @@ -1292,6 +1299,8 @@ def parse_timedelta(s, default="seconds"):
>>> parse_timedelta(timedelta(seconds=3)) # also supports timedeltas
3
"""
if s is None:
return None
if isinstance(s, timedelta):
s = s.total_seconds()
return int(s) if int(s) == s else s
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ This more advanced API is available in the `Dask distributed documentation
.. autofunction:: optimize
.. autofunction:: persist
.. autofunction:: visualize

Finally, Dask has a few helpers for generating demo datasets

.. autofunction:: datasets.make_people
.. autofunction:: datasets.timeseries
8 changes: 8 additions & 0 deletions docs/source/dataframe-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Dataframe
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.dtypes
DataFrame.explode
DataFrame.fillna
DataFrame.floordiv
DataFrame.get_partition
Expand Down Expand Up @@ -132,6 +133,7 @@ Series
Series.dt
Series.dtype
Series.eq
Series.explode
Series.ffill
Series.fillna
Series.first
Expand Down Expand Up @@ -411,6 +413,12 @@ Rolling

.. autofunction:: map_overlap

Dask Metadata
~~~~~~~~~~~~~

.. currentmodule:: dask.dataframe

.. autofunction:: utils.make_meta

Other functions
~~~~~~~~~~~~~~~
Expand Down

0 comments on commit 30b80e2

Please sign in to comment.