Skip to content

Commit

Permalink
use groupsort_indexer in shuffle_group_2
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Mar 4, 2017
1 parent df70050 commit 093a601
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
12 changes: 10 additions & 2 deletions dask/dataframe/shuffle.py
Expand Up @@ -337,8 +337,16 @@ def set_partitions_pre(s, divisions):


def shuffle_group_2(df, col):
g = df.groupby(col)
return {i: g.get_group(i) for i in g.groups}, df.head(0)
if not len(df):
return {}, df
ind = df[col]._values.astype(np.int64)
n = ind.max() + 1
indexer, locations = pd.algos.groupsort_indexer(ind.view(np.int64), n)
df2 = df.take(indexer)
locations = locations.cumsum()
parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]
result2 = dict(zip(range(n), parts))
return result2, df.iloc[:0]


def shuffle_group_get(g_head, i):
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/tests/test_shuffle.py
Expand Up @@ -50,7 +50,7 @@ def test_default_partitions():
assert shuffle(d, d.b).npartitions == d.npartitions


def test_shuffle_npatitions_task():
def test_shuffle_npartitions_task():
df = pd.DataFrame({'x': np.random.random(100)})
ddf = dd.from_pandas(df, npartitions=10)
s = shuffle(ddf, ddf.x, shuffle='tasks', npartitions=17, max_branch=4)
Expand Down

0 comments on commit 093a601

Please sign in to comment.