Skip to content

Commit

Permalink
add sorted= keyword to DataFrame.set_index
Browse files Browse the repository at this point in the history
This allows for faster set_index operations when given an already
sorted index.

Fixes dask#336
  • Loading branch information
mrocklin committed May 5, 2016
1 parent 0d59d3c commit 71fccc4
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
65 changes: 59 additions & 6 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,10 +1410,34 @@ def dtypes(self):
self._pd, self._known_dtype = self._build_pd(self.head())
return self._pd.dtypes

@derived_from(pd.DataFrame)
def set_index(self, other, drop=True, **kwargs):
from .shuffle import set_index
return set_index(self, other, drop=drop, **kwargs)
def set_index(self, other, drop=True, sorted=False, **kwargs):
""" Set the DataFrame index 9row labels) using an existing column
This operation in dask.dataframe is expensive. If the input column is
sorted then we accomplish the set_index in a single full read of that
column. However, if the input column is not sorted then this operation
triggers a full shuffle, which can take a while and only works on a
single machine (not distributed).
Parameters
----------
other: Series or label
drop: boolean, default True
Delete columns to be used as the new index
sorted: boolean, default False
Set to True if the new index column is already sorted
Examples
--------
>>> df.set_index('x') # doctest: +SKIP
>>> df.set_index(d.x) # doctest: +SKIP
>>> df.set_index(d.timestamp, sorted=True) # doctest: +SKIP
"""
if sorted:
return set_sorted_index(self, other, drop=drop, **kwargs)
else:
from .shuffle import set_index
return set_index(self, other, drop=drop, **kwargs)

def set_partition(self, column, divisions, **kwargs):
""" Set explicit divisions for new column index
Expand Down Expand Up @@ -2061,9 +2085,9 @@ def map_partitions(func, metadata, *args, **kwargs):
metadata = _extract_pd(metadata)

assert callable(func)
token = kwargs.pop('token', 'map-partitions')
token = kwargs.pop('token', None)
token_key = tokenize(token or func, metadata, kwargs, *args)
name = '{0}-{1}'.format(token, token_key)
name = '{0}-{1}'.format(token or 'map-partitions', token_key)

if all(isinstance(arg, Scalar) for arg in args):
dask = {(name, 0):
Expand Down Expand Up @@ -2646,3 +2670,32 @@ def try_loc(df, ind):
return df.loc[ind]
except KeyError:
return df.head(0)


def set_sorted_index(df, index, drop=True, **kwargs):
if not isinstance(index, Series):
index2 = df[index]
meta = df._pd.set_index(index, drop=drop)
else:
index2 = index
meta = df._pd.set_index(index._pd, drop=drop)

mins = index2.map_partitions(pd.Series.min)
maxes = index2.map_partitions(pd.Series.max)
mins, maxes = compute(mins, maxes, **kwargs)

if (sorted(mins) != list(mins) or
sorted(maxes) != list(maxes) or
any(a >= b for a, b in zip(mins, maxes))):
raise ValueError("Column not properly sorted", mins, maxes)

divisions = tuple(mins) + (list(maxes)[-1],)

result = map_partitions(_set_sorted_index, meta, df, index, drop=drop)
result.divisions = divisions

return result


def _set_sorted_index(df, idx, drop):
return df.set_index(idx, drop=drop)
30 changes: 30 additions & 0 deletions dask/dataframe/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1685,3 +1685,33 @@ def iseven(x):
b.groupby(iseven).y.sum())
assert eq(a.y.groupby(iseven).sum(),
b.y.groupby(iseven).sum())


def test_set_index_sorted_true():
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [10, 20, 30, 40],
'z': [4, 3, 2, 1]})
a = dd.from_pandas(df, 2, sort=False)
assert not a.known_divisions

b = a.set_index('x', sorted=True)
assert b.known_divisions
assert set(a.dask).issubset(set(b.dask))

for drop in [True, False]:
eq(a.set_index('x', drop=drop),
df.set_index('x', drop=drop))
eq(a.set_index(a.x, sorted=True, drop=drop),
df.set_index(df.x, drop=drop))
eq(a.set_index(a.x + 1, sorted=True, drop=drop),
df.set_index(df.x + 1, drop=drop))

with pytest.raises(ValueError):
a.set_index(a.z, sorted=True)


def test_methods_tokenize_differently():
df = pd.DataFrame({'x': [1, 2, 3, 4]})
df = dd.from_pandas(df, npartitions=1)
assert (df.x.map_partitions(pd.Series.min)._name !=
df.x.map_partitions(pd.Series.max)._name)

0 comments on commit 71fccc4

Please sign in to comment.