Skip to content

Commit

Permalink
WIP: Handling csv reader options
Browse files Browse the repository at this point in the history
Mostly there, just need to flesh out tests for coverage.
  • Loading branch information
wence- committed Jul 5, 2024
1 parent 964ff72 commit 79a2605
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 13 deletions.
99 changes: 87 additions & 12 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import dataclasses
import itertools
import json
import types
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar

import pyarrow as pa
Expand Down Expand Up @@ -185,8 +185,10 @@ class Scan(IR):

typ: str
"""What type of file are we reading? Parquet, CSV, etc..."""
options: tuple[Any, ...]
"""Type specific options, as json-encoded strings."""
reader_options: dict[str, Any]
"""Reader-specific options, as dictionary."""
cloud_options: dict[str, Any] | None
"""Cloud-related authentication options, currently ignored."""
paths: list[str]
"""List of paths to read from."""
file_options: Any
Expand All @@ -206,24 +208,97 @@ def __post_init__(self) -> None:
if self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.typ not in ("csv", "parquet"):
raise NotImplementedError(
f"Unhandled scan type: {self.typ}"
) # pragma: no cover; polars raises on the rust side for now
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.cloud_options is not None and any(
self.cloud_options[k] is not None for k in ("aws", "azure", "gcp")
):
raise NotImplementedError("Read from cloud storage")
if self.typ == "csv":
if self.reader_options["skip_rows_after_header"] != 0:
raise NotImplementedError("Skipping rows after header in CSV reader")
parse_options = self.reader_options["parse_options"]
if (
null_values := parse_options["null_values"]
) is not None and "Named" in null_values:
raise NotImplementedError(
"Per column null value specification not supported for CSV reader"
)
if (
comment := parse_options["comment_prefix"]
) is not None and "Multi" in comment:
raise NotImplementedError(
"Multi-character comment prefix not supported for CSV reader"
)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
options = self.file_options
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
opts, cloud_opts = map(json.loads, self.options)
df = DataFrame.from_cudf(
cudf.concat(
[cudf.read_csv(p, usecols=with_columns) for p in self.paths]
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
eol = chr(parse_options["eol_char"])
if self.reader_options["schema"] is not None:
# Reader schema provides names
column_names = list(self.reader_options["schema"]["inner"].keys())
else:
# file provides column names
column_names = None
usecols = with_columns
header = 0 if self.reader_options["has_header"] else None

# polars defaults to no null recognition
null_values = [""]
if parse_options["null_values"] is not None:
((typ, nulls),) = parse_options["null_values"].items()
if typ == "AllColumnsSingle":
# Single value
null_values.append(nulls)
else:
# List of values
null_values.extend(nulls)
if parse_options["comment_prefix"] is not None:
comment = parse_options["comment_prefix"]["Single"]
else:
comment = None
decimal = "," if parse_options["decimal_comma"] else "."

# polars skips blank lines at the beginning of the file
pieces = []
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
path = Path(p)
with path.open() as f:
while f.readline() == "\n":
skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
)
)
df = DataFrame.from_cudf(cudf.concat(pieces))
elif self.typ == "parquet":
opts, cloud_opts = map(json.loads, self.options)
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
df = DataFrame.from_cudf(cdf)
Expand Down
9 changes: 8 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import json
from contextlib import AbstractContextManager, nullcontext
from functools import singledispatch
from typing import Any
Expand Down Expand Up @@ -88,10 +89,16 @@ def _(
node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType]
) -> ir.IR:
typ, *options = node.scan_type
if typ == "ndjson":
(reader_options,) = map(json.loads, options)
cloud_options = None
else:
reader_options, cloud_options = map(json.loads, options)
return ir.Scan(
schema,
typ,
tuple(options),
reader_options,
cloud_options,
node.paths,
node.file_options,
translate_named_expr(visitor, n=node.predicate)
Expand Down
17 changes: 17 additions & 0 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,20 @@ def test_scan_row_index_projected_out(tmp_path):
q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a"))

assert_gpu_result_equal(q)


def test_scan_csv_column_renames_projection_schema(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,\n3,4,5""")

q = pl.scan_csv(
tmp_path / "test.csv",
with_column_names=lambda names: [f"{n}_suffix" for n in names],
schema_overrides={
"foo_suffix": pl.String(),
"bar_suffix": pl.Int8(),
"baz_suffix": pl.UInt16(),
},
)

assert_gpu_result_equal(q)

0 comments on commit 79a2605

Please sign in to comment.