Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read_csv execution on GPU #859

Merged
merged 3 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions mars/dataframe/datasource/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def _tile_compressed(cls, op):
columns_value=df.columns_value,
chunks=[new_chunk], nsplits=nsplits)

@classmethod
def _validate_dtypes(cls, dtypes, is_gpu):
dtypes = dtypes.to_dict()
# CuDF doesn't support object type, turn it to 'str'.
if is_gpu:
dtypes = dict((n, dt.name if dt != np.dtype('object') else 'str') for n, dt in dtypes.items())
return dtypes

@classmethod
def tile(cls, op):
if op.compression:
Expand Down Expand Up @@ -183,6 +191,32 @@ def tile(cls, op):
columns_value=df.columns_value,
chunks=out_chunks, nsplits=nsplits)

@classmethod
def _pandas_read_csv(cls, f, op):
csv_kwargs = op.extra_params.copy()
out_df = op.outputs[0]
start, end = _find_chunk_start_end(f, op.offset, op.size)
f.seek(start)
b = BytesIO(f.read(end - start))
if end == start:
# the last chunk may be empty
df = build_empty_df(out_df.dtypes)
else:
if start == 0:
# The first chunk contains header
# As we specify names and dtype, we need to skip header rows
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
df = pd.read_csv(b, sep=op.sep, names=op.names, index_col=op.index_col,
dtype=out_df.dtypes.to_dict(), **csv_kwargs)
return df

@classmethod
def _cudf_read_csv(cls, op):
qinxuye marked this conversation as resolved.
Show resolved Hide resolved
csv_kwargs = op.extra_params
df = cudf.read_csv(op.path, byte_range=(op.offset, op.size), sep=op.sep, names=op.names,
dtype=cls._validate_dtypes(op.outputs[0].dtypes, op.gpu), **csv_kwargs)
return df

@classmethod
def execute(cls, ctx, op):
xdf = cudf if op.gpu else pd
Expand All @@ -194,21 +228,10 @@ def execute(cls, ctx, op):
# As we specify names and dtype, we need to skip header rows
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
df = xdf.read_csv(BytesIO(f.read()), sep=op.sep, names=op.names, index_col=op.index_col,
dtype=out_df.dtypes.to_dict(), **csv_kwargs)
dtype=cls._validate_dtypes(op.outputs[0].dtypes, op.gpu), **csv_kwargs)
else:
start, end = _find_chunk_start_end(f, op.offset, op.size)
f.seek(start)
b = BytesIO(f.read(end - start))
if end == start:
# the last chunk may be empty
df = build_empty_df(out_df.dtypes)
else:
if start == 0:
# The first chunk contains header
# As we specify names and dtype, we need to skip header rows
csv_kwargs['skiprows'] = 1 if op.header == 'infer' else op.header
df = xdf.read_csv(b, sep=op.sep, names=op.names, index_col=op.index_col,
dtype=out_df.dtypes.to_dict(), **csv_kwargs)
df = cls._cudf_read_csv(op) if op.gpu else cls._pandas_read_csv(f, op)

ctx[out_df.key] = df

def __call__(self, index_value=None, columns_value=None, dtypes=None, chunk_bytes=None):
Expand Down
21 changes: 20 additions & 1 deletion mars/dataframe/datasource/tests/test_datasource_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import mars.tensor as mt
import mars.dataframe as md
from mars.executor import Executor
from mars.tests.core import TestBase
from mars.tests.core import TestBase, require_cudf
from mars.dataframe.datasource.dataframe import from_pandas as from_pandas_df
from mars.dataframe.datasource.series import from_pandas as from_pandas_series
from mars.dataframe.datasource.from_tensor import dataframe_from_tensor
Expand Down Expand Up @@ -277,3 +277,22 @@ def testReadCSVExecution(self):

finally:
shutil.rmtree(tempdir)

@require_cudf
def testReadCSVGPUExecution(self):
tempdir = tempfile.mkdtemp()
file_path = os.path.join(tempdir, 'test.csv')
try:
df = pd.DataFrame([[1, 2.0, 'v1'], [4, 5.0, 'v2'], [7, 8.0, 'v3']], columns=['a', 'b', 'c'])
df.to_csv(file_path, index=False)

pdf = pd.read_csv(file_path)
mdf = self.executor.execute_dataframe(md.read_csv(file_path, gpu=True), concat=True)[0]
pd.testing.assert_frame_equal(pdf.reset_index(drop=True), mdf.to_pandas().reset_index(drop=True))

mdf2 = self.executor.execute_dataframe(md.read_csv(file_path, gpu=True, chunk_bytes=10),
concat=True)[0]
pd.testing.assert_frame_equal(pdf.reset_index(drop=True), mdf2.to_pandas().reset_index(drop=True))

finally:
shutil.rmtree(tempdir)
16 changes: 11 additions & 5 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin):
_func = AnyField('func')
_by = AnyField('by')
_as_index = BoolField('as_index')
_sort = BoolField('sort')
_method = StringField('method')
_stage = StringField('stage', on_serialize=lambda x: getattr(x, 'value', None),
on_deserialize=Stage)

def __init__(self, func=None, by=None, as_index=None, method=None, stage=None, **kw):
super(DataFrameGroupByAgg, self).__init__(_func=func, _by=by, _as_index=as_index, _method=method,
def __init__(self, func=None, by=None, as_index=None, sort=None, method=None, stage=None, **kw):
super(DataFrameGroupByAgg, self).__init__(_func=func, _by=by, _as_index=as_index, _sort=sort, _method=method,
_stage=stage, _object_type=ObjectType.dataframe, **kw)

@property
Expand All @@ -58,6 +59,10 @@ def by(self):
def as_index(self):
return self._as_index

@property
def sort(self):
return self._sort

@property
def method(self):
return self._method
Expand Down Expand Up @@ -200,14 +205,14 @@ def execute(cls, ctx, op):
@classmethod
def _execute_map(cls, df, op):
if isinstance(op.func, (six.string_types, dict)):
return df.groupby(op.by, as_index=op.as_index).agg(op.func)
return df.groupby(op.by, as_index=op.as_index, sort=False).agg(op.func)
else:
raise NotImplementedError

@classmethod
def _execute_combine(cls, df, op):
if isinstance(op.func, (six.string_types, dict)):
return df.groupby(op.by, as_index=op.as_index).agg(op.func)
return df.groupby(level=0, as_index=op.as_index, sort=op.sort).agg(op.func)
else:
raise NotImplementedError

Expand Down Expand Up @@ -243,5 +248,6 @@ def agg(groupby, func, method='tree'):
raise NotImplementedError('Aggregation function %s has not been supported' % f)

in_df = groupby.inputs[0]
agg_op = DataFrameGroupByAgg(func=func, by=groupby.op.by, method=method, as_index=groupby.op.as_index)
agg_op = DataFrameGroupByAgg(func=func, by=groupby.op.by, method=method,
as_index=groupby.op.as_index, sort=groupby.op.sort)
return agg_op(in_df)
13 changes: 9 additions & 4 deletions mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ class DataFrameGroupByOperand(DataFrameOperand, DataFrameOperandMixin):

_by = AnyField('by')
_as_index = BoolField('as_index')
_sort = BoolField('sort')

def __init__(self, by=None, as_index=None, object_type=ObjectType.groupby, **kw):
super(DataFrameGroupByOperand, self).__init__(_by=by, _as_index=as_index,
def __init__(self, by=None, as_index=None, sort=None, object_type=ObjectType.groupby, **kw):
super(DataFrameGroupByOperand, self).__init__(_by=by, _as_index=as_index, _sort=sort,
_object_type=object_type, **kw)

@property
Expand All @@ -107,6 +108,10 @@ def by(self):
def as_index(self):
return self._as_index

@property
def sort(self):
return self._sort

def __call__(self, df):
return self.new_tileable([df])

Expand Down Expand Up @@ -147,8 +152,8 @@ def execute(cls, ctx, op):
ctx[op.outputs[0].key] = list(df.groupby(op.by))


def dataframe_groupby(df, by, as_index=True):
def dataframe_groupby(df, by, as_index=True, sort=True):
if isinstance(by, six.string_types):
by = [by]
op = DataFrameGroupByOperand(by=by, as_index=as_index)
op = DataFrameGroupByOperand(by=by, as_index=as_index, sort=sort)
return op(df)
2 changes: 2 additions & 0 deletions mars/dataframe/merge/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def _auto_concat_dataframe_chunks(chunk, inputs):
ret = xdf.concat(concats, sort=False)
else:
ret = xdf.concat(concats)
# cuDF will lost index name when concat two seriess.
ret.index.name = concats[0].index.name
if getattr(chunk.index_value, 'should_be_monotonic', False):
ret.sort_index(inplace=True)
if getattr(chunk.columns_value, 'should_be_monotonic', False):
Expand Down