Skip to content

Commit

Permalink
dtypes consistent across multiple csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed May 10, 2015
1 parent 265d485 commit 6afbd2b
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions dask/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,31 @@ def file_size(fn, compression=None):

@wraps(pd.read_csv)
def read_csv(fn, *args, **kwargs):
if '*' in fn:
return concat([read_csv(f, *args, **kwargs) for f in sorted(glob(fn))])

chunkbytes = kwargs.pop('chunkbytes', 2**26) # 100 MB
compression = kwargs.pop('compression', None)
categorize = kwargs.pop('categorize', None)
index = kwargs.pop('index', None)
if index and categorize == None:
categorize = True

compression = kwargs.pop('compression', None)

# Chunk sizes and numbers
chunkbytes = kwargs.pop('chunkbytes', 2**28) # 500 MB
total_bytes = file_size(fn, compression)
nchunks = int(ceil(float(total_bytes) / chunkbytes))
divisions = [None] * (nchunks - 1)

# Let pandas infer on the first 100 rows
head = pd.read_csv(fn, *args, nrows=100, compression=compression, **kwargs)
if '*' in fn:
first_fn = sorted(glob(fn))[0]
else:
first_fn = fn

if names not in kwargs:
kwargs['names'] = csv_names(fn, compression=compression, **kwargs)
kwargs['names'] = csv_names(first_fn, compression=compression, **kwargs)
if 'header' not in kwargs:
header = infer_header(fn, compression=compression, **kwargs)
header = infer_header(first_fn, compression=compression, **kwargs)
if header is True:
header = 0
else:
header = kwargs.pop('header')

head = pd.read_csv(first_fn, *args, nrows=1000, compression=compression,
header=header, **kwargs)

if 'parse_dates' not in kwargs:
parse_dates = [col for col in head.dtypes.index
if np.issubdtype(head.dtypes[col], np.datetime64)]
Expand All @@ -85,6 +82,15 @@ def read_csv(fn, *args, **kwargs):

kwargs['dtype'] = dtype

# Handle glob strings
if '*' in fn:
return concat([read_csv(f, *args, **kwargs) for f in sorted(glob(fn))])

# Chunk sizes and numbers
total_bytes = file_size(fn, compression)
nchunks = int(ceil(float(total_bytes) / chunkbytes))
divisions = [None] * (nchunks - 1)

first_read_csv = curry(pd.read_csv, *args, header=header, **kwargs)
rest_read_csv = curry(pd.read_csv, *args, header=None, **kwargs)

Expand Down Expand Up @@ -134,7 +140,7 @@ def infer_header(fn, encoding='utf-8', compression=None, **kwargs):


def csv_names(fn, encoding='utf-8', compression=None, names=None,
parse_dates=None, usecols=None, **kwargs):
parse_dates=None, usecols=None, dtype=None, **kwargs):
try:
df = pd.read_csv(fn, encoding=encoding, compression=compression,
names=names, parse_dates=parse_dates, nrows=5, **kwargs)
Expand All @@ -145,7 +151,7 @@ def csv_names(fn, encoding='utf-8', compression=None, names=None,


def categories_and_quantiles(fn, args, kwargs, index=None, categorize=None,
chunkbytes=2**28):
chunkbytes=2**26):
"""
Categories of Object columns and quantiles of index column for CSV
Expand Down

0 comments on commit 6afbd2b

Please sign in to comment.