diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index a1dbc217380..97e916cda07 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -653,6 +653,11 @@ def to_castra(self, fn=None, categories=None): from .io import to_castra return to_castra(self, fn, categories) + @wraps(pd.DataFrame.to_csv) + def to_csv(self, filename, **kwargs): + from .io import to_csv + return to_csv(self, filename, **kwargs) + def _assign(df, *pairs): kwargs = dict(partition(2, pairs)) diff --git a/dask/dataframe/io.py b/dask/dataframe/io.py index 7ca5deebef4..03c25ea8ae9 100644 --- a/dask/dataframe/io.py +++ b/dask/dataframe/io.py @@ -593,3 +593,25 @@ def to_castra(df, fn=None, categories=None): c, _ = get(merge(dsk, df.dask), [(name, -1), list(dsk.keys())]) return c + + +def to_csv(df, filename, **kwargs): + myget = kwargs.pop('get') + name = 'to-csv' + next(tokens) + + dsk = dict() + dsk[(name, 0)] = (apply, pd.DataFrame.to_csv, + (tuple, [(df._name, 0), filename]), + kwargs) + + kwargs2 = kwargs.copy() + kwargs2['mode'] = 'a' + kwargs2['header'] = False + + for i in range(1, df.npartitions): + dsk[(name, i)] = (_link, (name, i - 1), + (apply, pd.DataFrame.to_csv, + (tuple, [(df._name, i), filename]), + kwargs2)) + + get(merge(dsk, df.dask), (name, i), get=myget) diff --git a/dask/dataframe/tests/test_io.py b/dask/dataframe/tests/test_io.py index efc0121f801..9aff6b61749 100644 --- a/dask/dataframe/tests/test_io.py +++ b/dask/dataframe/tests/test_io.py @@ -457,3 +457,16 @@ def test_read_hdf(): tm.assert_frame_equal( dd.read_hdf(fn, '/data', chunksize=2, start=1, stop=3).compute(), pd.read_hdf(fn, '/data', start=1, stop=3)) + + +def test_to_csv(): + df = pd.DataFrame({'x': ['a', 'b', 'c', 'd'], + 'y': [1, 2, 3, 4]}, index=[1., 2., 3., 4.]) + a = dd.from_pandas(df, 2) + + with tmpfile('csv') as fn: + a.to_csv(fn, get=get_sync) + + result = pd.read_csv(fn, index_col=0) + + tm.assert_frame_equal(result, df)