Skip to content

Commit

Permalink
check read_csv dtypes at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 26, 2016
1 parent e7e3035 commit 1957894
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
27 changes: 23 additions & 4 deletions dask/dataframe/csv.py
Expand Up @@ -3,6 +3,7 @@
from io import BytesIO
from warnings import warn

import numpy as np
import pandas as pd

from ..delayed import delayed
Expand Down Expand Up @@ -39,13 +40,31 @@ def bytes_read_csv(b, header, kwargs, dtypes=None):
bio.write(b)
bio.seek(0)
df = pd.read_csv(bio, **kwargs)
if dtypes is not None:
d = dict((c, df[c].astype(dt)) for c, dt in dtypes.items()
if df[c].dtype != dt)
df = df.assign(**d)
if dtypes:
coerce_dtypes(df, dtypes)
return df


def coerce_dtypes(df, dtypes):
""" Coerce dataframe to dtypes safely
Operates in place
Parameters
----------
df: Pandas DataFrame
dtypes: dict like {'x': float}
"""
for c in df.columns:
if c in dtypes and df.dtypes[c] != dtypes[c]:
if (np.issubdtype(df.dtypes[c], np.floating) and
np.issubdtype(dtypes[c], np.integer)):
if (df[c] % 1).any():
raise TypeError("Runtime type mismatch. "
"Add {'%s': float} to dtype= keyword in read_csv" % c)
df[c] = df[c].astype(dtypes[c])


def read_csv_from_bytes(block_lists, header, head, kwargs, collection=True,
enforce_dtypes=True):
""" Convert blocks of bytes to a dask.dataframe or other high-level object
Expand Down
18 changes: 18 additions & 0 deletions dask/dataframe/tests/test_csv.py
Expand Up @@ -176,3 +176,21 @@ def test_windows_line_terminator():
df = read_csv(fn, blocksize=5, lineterminator='\r\n')
assert df.b.sum().compute() == 2 + 3 + 4 + 5 + 6 + 7
assert df.a.sum().compute() == 1 + 2 + 3 + 4 + 5 + 6


def test_late_dtypes():
text = 'a,b\n1,2\n2,3\n3,4\n4,5\n5.5,6\n6,7.5'
with filetext(text) as fn:
df = read_csv(fn, blocksize=5, sample=10)
try:
df.b.sum().compute()
assert False
except TypeError as e:
assert ("'b': float" in str(e) or
"'a': float" in str(e))

df = read_csv(fn, blocksize=5, sample=10,
dtype={'a': float, 'b': float})

assert df.a.sum().compute() == 1 + 2 + 3 + 4 + 5.5 + 6
assert df.b.sum().compute() == 2 + 3 + 4 + 5 + 6 + 7.5

0 comments on commit 1957894

Please sign in to comment.