diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index bc9ba5e142e..c7ac85014a7 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -1410,10 +1410,34 @@ def dtypes(self): self._pd, self._known_dtype = self._build_pd(self.head()) return self._pd.dtypes - @derived_from(pd.DataFrame) - def set_index(self, other, drop=True, **kwargs): - from .shuffle import set_index - return set_index(self, other, drop=drop, **kwargs) + def set_index(self, other, drop=True, sorted=False, **kwargs): + """ Set the DataFrame index 9row labels) using an existing column + + This operation in dask.dataframe is expensive. If the input column is + sorted then we accomplish the set_index in a single full read of that + column. However, if the input column is not sorted then this operation + triggers a full shuffle, which can take a while and only works on a + single machine (not distributed). + + Parameters + ---------- + other: Series or label + drop: boolean, default True + Delete columns to be used as the new index + sorted: boolean, default False + Set to True if the new index column is already sorted + + Examples + -------- + >>> df.set_index('x') # doctest: +SKIP + >>> df.set_index(d.x) # doctest: +SKIP + >>> df.set_index(d.timestamp, sorted=True) # doctest: +SKIP + """ + if sorted: + return set_sorted_index(self, other, drop=drop, **kwargs) + else: + from .shuffle import set_index + return set_index(self, other, drop=drop, **kwargs) def set_partition(self, column, divisions, **kwargs): """ Set explicit divisions for new column index @@ -2061,9 +2085,9 @@ def map_partitions(func, metadata, *args, **kwargs): metadata = _extract_pd(metadata) assert callable(func) - token = kwargs.pop('token', 'map-partitions') + token = kwargs.pop('token', None) token_key = tokenize(token or func, metadata, kwargs, *args) - name = '{0}-{1}'.format(token, token_key) + name = '{0}-{1}'.format(token or 'map-partitions', token_key) if all(isinstance(arg, Scalar) for arg in args): dask = {(name, 0): @@ -2646,3 +2670,32 @@ def try_loc(df, ind): return df.loc[ind] except KeyError: return df.head(0) + + +def set_sorted_index(df, index, drop=True, **kwargs): + if not isinstance(index, Series): + index2 = df[index] + meta = df._pd.set_index(index, drop=drop) + else: + index2 = index + meta = df._pd.set_index(index._pd, drop=drop) + + mins = index2.map_partitions(pd.Series.min) + maxes = index2.map_partitions(pd.Series.max) + mins, maxes = compute(mins, maxes, **kwargs) + + if (sorted(mins) != list(mins) or + sorted(maxes) != list(maxes) or + any(a >= b for a, b in zip(mins, maxes))): + raise ValueError("Column not properly sorted", mins, maxes) + + divisions = tuple(mins) + (list(maxes)[-1],) + + result = map_partitions(_set_sorted_index, meta, df, index, drop=drop) + result.divisions = divisions + + return result + + +def _set_sorted_index(df, idx, drop): + return df.set_index(idx, drop=drop) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 8d3498a2a61..ce00b6f4c76 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -1685,3 +1685,33 @@ def iseven(x): b.groupby(iseven).y.sum()) assert eq(a.y.groupby(iseven).sum(), b.y.groupby(iseven).sum()) + + +def test_set_index_sorted_true(): + df = pd.DataFrame({'x': [1, 2, 3, 4], + 'y': [10, 20, 30, 40], + 'z': [4, 3, 2, 1]}) + a = dd.from_pandas(df, 2, sort=False) + assert not a.known_divisions + + b = a.set_index('x', sorted=True) + assert b.known_divisions + assert set(a.dask).issubset(set(b.dask)) + + for drop in [True, False]: + eq(a.set_index('x', drop=drop), + df.set_index('x', drop=drop)) + eq(a.set_index(a.x, sorted=True, drop=drop), + df.set_index(df.x, drop=drop)) + eq(a.set_index(a.x + 1, sorted=True, drop=drop), + df.set_index(df.x + 1, drop=drop)) + + with pytest.raises(ValueError): + a.set_index(a.z, sorted=True) + + +def test_methods_tokenize_differently(): + df = pd.DataFrame({'x': [1, 2, 3, 4]}) + df = dd.from_pandas(df, npartitions=1) + assert (df.x.map_partitions(pd.Series.min)._name != + df.x.map_partitions(pd.Series.max)._name)