Skip to content

Commit

Permalink
Include filename in read_csv (dask#3908)
Browse files Browse the repository at this point in the history
* Adding include_path_col to read_csv

* Adding tests for read_bytes and read_csv/table with include_path

* Allowing path converters and fixing up test for windows compat

* Fixing test for windows

* Checking whether include_path_col name is duplicate of cols in files

* include_path_col --> include_path_column

* Making path categorical

* Making categories known

* Adding a space

* Unnesting output from read_bytes

* Fixing up column error handling

* KeyError --> ValueError
  • Loading branch information
jsignell authored and jcrist committed Aug 30, 2018
1 parent 83fdc8d commit 9d4b99d
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 17 deletions.
20 changes: 15 additions & 5 deletions dask/bytes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


def read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=2**27,
sample=True, compression=None, **kwargs):
sample=True, compression=None, include_path=False, **kwargs):
"""Given a path or paths, return delayed objects that read from those paths.
The path may be a filename like ``'2015-01-01.csv'`` or a globstring
Expand Down Expand Up @@ -50,6 +50,9 @@ def read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=2**27,
sample : bool or int
Whether or not to return a header sample. If an integer is given it is
used as sample size, otherwise the default sample size is 10kB.
include_path : bool
Whether or not to include the path with the bytes representing a particular file.
Default is False.
**kwargs : dict
Extra options that make sense to a particular storage connection, e.g.
host, port, username, password, etc.
Expand All @@ -58,6 +61,7 @@ def read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=2**27,
--------
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
>>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True) # doctest: +SKIP
Returns
-------
Expand All @@ -66,6 +70,10 @@ def read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=2**27,
blocks : list of lists of ``dask.Delayed``
Each list corresponds to a file, and each delayed object computes to a
block of bytes from that file.
paths : list of strings, only included if include_path is True
List of same length as blocks, where each item is the path to the file
represented in the corresponding block.
"""
fs, fs_token, paths = get_fs_token_paths(urlpath, mode='rb',
storage_options=kwargs)
Expand Down Expand Up @@ -106,15 +114,17 @@ def read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=2**27,
token = tokenize(fs_token, delimiter, path, fs.ukey(path),
compression, offset)
keys = ['read-block-%s-%s' % (o, token) for o in offset]
out.append([delayed_read(OpenFile(fs, path, compression=compression),
o, l, delimiter, dask_key_name=key)
for o, key, l in zip(offset, keys, length)])
values = [delayed_read(OpenFile(fs, path, compression=compression),
o, l, delimiter, dask_key_name=key)
for o, key, l in zip(offset, keys, length)]
out.append(values)

if sample:
with OpenFile(fs, paths[0], compression=compression) as f:
nbytes = 10000 if sample is True else sample
sample = read_block(f, 0, nbytes, delimiter)

if include_path:
return sample, out, paths
return sample, out


Expand Down
6 changes: 6 additions & 0 deletions dask/bytes/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ def test_read_bytes_blocksize_float():
read_bytes('.test.account*', blocksize=5.5)


def test_read_bytes_include_path():
with filetexts(files, mode='b'):
_, _, paths = read_bytes('.test.accounts.*', include_path=True)
assert {os.path.split(path)[1] for path in paths} == set(files.keys())


def test_with_urls():
with filetexts(files, mode='b'):
# OS-independent file:// URI with glob *
Expand Down
74 changes: 62 additions & 12 deletions dask/dataframe/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
except ImportError:
psutil = None

import numpy as np
import pandas as pd
try:
from pandas.api.types import CategoricalDtype
Expand Down Expand Up @@ -35,7 +36,7 @@


def pandas_read_text(reader, b, header, kwargs, dtypes=None, columns=None,
write_header=True, enforce=False):
write_header=True, enforce=False, path=None):
""" Convert a block of bytes to a Pandas DataFrame
Parameters
Expand All @@ -50,6 +51,8 @@ def pandas_read_text(reader, b, header, kwargs, dtypes=None, columns=None,
A dictionary of keyword arguments to be passed to ``reader``
dtypes : dict
DTypes to assign to columns
path : tuple
A tuple containing path column name, path to file, and all paths.
See Also
--------
Expand All @@ -68,6 +71,11 @@ def pandas_read_text(reader, b, header, kwargs, dtypes=None, columns=None,
raise ValueError("Columns do not match", df.columns, columns)
elif columns:
df.columns = columns
if path:
colname, path, paths = path
code = paths.index(path)
df = df.assign(**{
colname: pd.Categorical.from_codes(np.full(len(df), code), paths)})
return df


Expand Down Expand Up @@ -157,7 +165,7 @@ def coerce_dtypes(df, dtypes):

def text_blocks_to_pandas(reader, block_lists, header, head, kwargs,
collection=True, enforce=False,
specified_dtypes=None):
specified_dtypes=None, path=None):
""" Convert blocks of bytes to a dask.dataframe or other high-level object
This accepts a list of lists of values of bytes where each list corresponds
Expand All @@ -179,6 +187,8 @@ def text_blocks_to_pandas(reader, block_lists, header, head, kwargs,
kwargs : dict
Keyword arguments to pass down to ``reader``
collection: boolean, optional (defaults to True)
path : tuple, optional
A tuple containing column name for path and list of all paths
Returns
-------
Expand Down Expand Up @@ -215,21 +225,32 @@ def text_blocks_to_pandas(reader, block_lists, header, head, kwargs,
columns = list(head.columns)
delayed_pandas_read_text = delayed(pandas_read_text, pure=True)
dfs = []
for blocks in block_lists:
colname, paths = path or (None, None)

for i, blocks in enumerate(block_lists):
if not blocks:
continue
if path:
path_info = (colname, paths[i], paths)
else:
path_info = None
df = delayed_pandas_read_text(reader, blocks[0], header, kwargs,
dtypes, columns, write_header=False,
enforce=enforce)
enforce=enforce, path=path_info)

dfs.append(df)
rest_kwargs = kwargs.copy()
rest_kwargs.pop('skiprows', None)
for b in blocks[1:]:
dfs.append(delayed_pandas_read_text(reader, b, header, rest_kwargs,
dtypes, columns,
enforce=enforce))
enforce=enforce, path=path_info))

if collection:
if path:
head = head.assign(**{
colname: pd.Categorical.from_codes(np.zeros(len(head)), paths)
})
if len(unknown_categoricals):
head = clear_known_categories(head, cols=unknown_categoricals)
return from_delayed(dfs, head)
Expand Down Expand Up @@ -257,12 +278,15 @@ def auto_blocksize(total_memory, cpu_count):
def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,
lineterminator=None, compression=None, sample=256000,
enforce=False, assume_missing=False, storage_options=None,
include_path_column=False,
**kwargs):
reader_name = reader.__name__
if lineterminator is not None and len(lineterminator) == 1:
kwargs['lineterminator'] = lineterminator
else:
lineterminator = '\n'
if include_path_column and isinstance(include_path_column, bool):
include_path_column = 'path'
if 'index' in kwargs or 'index_col' in kwargs:
raise ValueError("Keyword 'index' not supported "
"dd.{0}(...).set_index('my-index') "
Expand All @@ -282,6 +306,10 @@ def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,
if isinstance(kwargs.get('header'), list):
raise TypeError("List of header rows not supported for "
"dd.{0}".format(reader_name))
if isinstance(kwargs.get('converters'), dict) and include_path_column:
path_converter = kwargs.get('converters').get(include_path_column, None)
else:
path_converter = None

if blocksize and compression not in seekable_files:
warn("Warning %s compression does not support breaking apart files\n"
Expand All @@ -294,11 +322,21 @@ def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,
compression)

b_lineterminator = lineterminator.encode()
b_sample, values = read_bytes(urlpath, delimiter=b_lineterminator,
blocksize=blocksize,
sample=sample,
compression=compression,
**(storage_options or {}))
b_out = read_bytes(urlpath, delimiter=b_lineterminator,
blocksize=blocksize,
sample=sample,
compression=compression,
include_path=include_path_column,
**(storage_options or {}))

if include_path_column:
b_sample, values, paths = b_out
if path_converter:
paths = [path_converter(path) for path in paths]
path = (include_path_column, paths)
else:
b_sample, values = b_out
path = None

if not isinstance(values[0], (tuple, list)):
values = [values]
Expand All @@ -321,8 +359,13 @@ def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,

header = b'' if header is None else parts[skiprows] + b_lineterminator

# Use sample to infer dtypes
# Use sample to infer dtypes and check for presense of include_path_column
head = reader(BytesIO(b_sample), **kwargs)
if include_path_column and (include_path_column in head.columns):
raise ValueError("Files already contain the column name: %s, so the "
"path column cannot use this name. Please set "
"`include_path_column` to a unique name."
% include_path_column)

specified_dtypes = kwargs.get('dtype', {})
if specified_dtypes is None:
Expand All @@ -336,7 +379,8 @@ def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,

return text_blocks_to_pandas(reader, values, header, head, kwargs,
collection=collection, enforce=enforce,
specified_dtypes=specified_dtypes)
specified_dtypes=specified_dtypes,
path=path)


READ_DOC_TEMPLATE = """
Expand Down Expand Up @@ -384,6 +428,10 @@ def read_pandas(reader, urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,
storage_options : dict, optional
Extra options that make sense for a particular storage connection, e.g.
host, port, username, password, etc.
include_path_column : bool or str, optional
Whether or not to include the path to each particular file. If True a new
column is added to the dataframe called ``path``. If str, sets new column
name. Default is False.
**kwargs
Extra keyword arguments to forward to :func:`pandas.{reader}`.
Expand Down Expand Up @@ -415,13 +463,15 @@ def make_reader(reader, reader_name, file_type):
def read(urlpath, blocksize=AUTO_BLOCKSIZE, collection=True,
lineterminator=None, compression=None, sample=256000,
enforce=False, assume_missing=False, storage_options=None,
include_path_column=False,
**kwargs):
return read_pandas(reader, urlpath, blocksize=blocksize,
collection=collection,
lineterminator=lineterminator,
compression=compression, sample=sample,
enforce=enforce, assume_missing=assume_missing,
storage_options=storage_options,
include_path_column=include_path_column,
**kwargs)
read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name,
file_type=file_type)
Expand Down
54 changes: 54 additions & 0 deletions dask/dataframe/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def normalize_text(s):
return '\n'.join(map(str.strip, s.strip().split('\n')))


def parse_filename(path):
return os.path.split(path)[1]


csv_text = """
name,amount
Alice,100
Expand Down Expand Up @@ -284,6 +288,56 @@ def test_read_csv_files_list(dd_read, pd_read, files):
dd_read([])


@pytest.mark.parametrize('dd_read,files',
[(dd.read_csv, csv_files),
(dd.read_table, tsv_files)])
def test_read_csv_include_path_column(dd_read, files):
with filetexts(files, mode='b'):
df = dd_read('2014-01-*.csv', include_path_column=True,
converters={'path': parse_filename})
filenames = df.path.compute().unique()
assert '2014-01-01.csv' in filenames
assert '2014-01-02.csv' not in filenames
assert '2014-01-03.csv' in filenames


@pytest.mark.parametrize('dd_read,files',
[(dd.read_csv, csv_files),
(dd.read_table, tsv_files)])
def test_read_csv_include_path_column_as_str(dd_read, files):
with filetexts(files, mode='b'):
df = dd_read('2014-01-*.csv', include_path_column='filename',
converters={'filename': parse_filename})
filenames = df.filename.compute().unique()
assert '2014-01-01.csv' in filenames
assert '2014-01-02.csv' not in filenames
assert '2014-01-03.csv' in filenames


@pytest.mark.parametrize('dd_read,files',
[(dd.read_csv, csv_files),
(dd.read_table, tsv_files)])
def test_read_csv_include_path_column_with_duplicate_name(dd_read, files):
with filetexts(files, mode='b'):
with pytest.raises(ValueError):
dd_read('2014-01-*.csv', include_path_column='name')


@pytest.mark.parametrize('dd_read,files',
[(dd.read_csv, csv_files),
(dd.read_table, tsv_files)])
def test_read_csv_include_path_column_is_dtype_category(dd_read, files):
with filetexts(files, mode='b'):
df = dd_read('2014-01-*.csv', include_path_column=True)
assert df.path.dtype == 'category'
assert has_known_categories(df.path)

dfs = dd_read('2014-01-*.csv', include_path_column=True, collection=False)
result = dfs[0].compute()
assert result.path.dtype == 'category'
assert has_known_categories(result.path)


# After this point, we test just using read_csv, as all functionality
# for both is implemented using the same code.

Expand Down

0 comments on commit 9d4b99d

Please sign in to comment.