Skip to content

Commit

Permalink
Include partitions with no width in repartition
Browse files Browse the repository at this point in the history
Previously when repartitioning a dataframe that had partitions of zero
width like the following:

    >>> df.divisions
    (1, 1, 1)

We would lose information about the earlier partitions.

Fixes dask#3911
  • Loading branch information
mrocklin committed Sep 4, 2018
1 parent 09100d0 commit a72b0c3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
5 changes: 3 additions & 2 deletions dask/dataframe/core.py
Expand Up @@ -4081,8 +4081,9 @@ def _is_single_last_div(x):
else:
d[(out1, k)] = (methods.boundary_slice, (name, i - 1), low, b[j], False)
low = b[j]
if len(a) == i + 1 or a[i] < a[i + 1]:
j += 1
i += 1
j += 1
c.append(low)
k += 1

Expand Down Expand Up @@ -4116,7 +4117,7 @@ def _is_single_last_div(x):
while c[i] < b[j]:
tmp.append((out1, i))
i += 1
if last_elem and c[i] == b[-1] and (b[-1] != b[-2] or j == len(b) - 1) and i < k:
while last_elem and c[i] == b[-1] and (b[-1] != b[-2] or j == len(b) - 1) and i < k:
# append if last split is not included
tmp.append((out1, i))
i += 1
Expand Down
26 changes: 26 additions & 0 deletions dask/dataframe/tests/test_multi.py
Expand Up @@ -1308,3 +1308,29 @@ def test_singleton_divisions():
joined = ddf2.join(ddf2, rsuffix='r')
assert joined.divisions == (1, 1)
joined.compute()


def test_repartition_repeated_divisions():
df = pd.DataFrame({'x': [0, 0, 0, 0]})
ddf = dd.from_pandas(df, npartitions=2).set_index('x')

ddf2 = ddf.repartition(divisions=(0, 0), force=True)
assert_eq(ddf2, df.set_index('x'))


def test_multi_duplicate_divisions():
df1 = pd.DataFrame({'x': [0, 0, 0, 0]})
df2 = pd.DataFrame({'x': [0]})

ddf1 = dd.from_pandas(df1, npartitions=2).set_index('x')
ddf2 = dd.from_pandas(df2, npartitions=1).set_index('x')
assert ddf1.npartitions == 2
assert len(ddf1) == len(df1)

r1 = ddf1.merge(ddf2, how='left', left_index=True, right_index=True)

sf1 = df1.set_index('x')
sf2 = df2.set_index('x')
r2 = sf1.merge(sf2, how='left', left_index=True, right_index=True)

assert_eq(r1, r2)

0 comments on commit a72b0c3

Please sign in to comment.