Skip to content

Commit

Permalink
feat[python]: Add support for reading non-utf8 encoded CSV files. (#4464
Browse files Browse the repository at this point in the history
)

Add support for reading non-utf8 encoded CSV files in read_csv.
  - Decode CSV files when encoding is not set to "utf8" or
    "utf8-lossy" in _prepare_file_arg if use_pyarrow=False.
    In that case, decoding is done by python, so fast path readers,
    of Polars are not used.
    If use_pyarrow=True, pass the encoding parameter directly to
    pa.csv.read_csv.

Other fixes/features:
  - Return BytesIO object for bytes input in _prepare_file_arg
    when using pyarrow (read_csv, read_ipc, read_parquet) as
    pyarrow only works with file like objects.
  - Check if eol_char argument value for read_csv is 1 byte.
  - Add quote_char support for read_csv:
    - Expand test_csv_quote_char: Check if fields surrounded
      by quotes keep the quotes with quote_char=None, both when
      reading with polars and with pyarrow.
  - Do no check the value of parse_dates when use_pyarrow=True
    as date parsing can't be disabled.
  • Loading branch information
ghuls committed Aug 17, 2022
1 parent b7e5d64 commit 90f2f51
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 29 deletions.
53 changes: 44 additions & 9 deletions py-polars/polars/internals/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
pass


def _process_http_file(path: str) -> BytesIO:
def _process_http_file(path: str, encoding: str | None = None) -> BytesIO:
with urlopen(path) as f:
return BytesIO(f.read())
if not encoding or encoding in {"utf8", "utf8-lossy"}:
return BytesIO(f.read())
else:
return BytesIO(f.read().decode(encoding).encode("utf8"))


@overload
Expand All @@ -52,7 +55,10 @@ def _prepare_file_arg(


def _prepare_file_arg(
file: str | list[str] | TextIO | Path | BinaryIO | bytes, **kwargs: Any
file: str | list[str] | TextIO | Path | BinaryIO | bytes,
encoding: str | None = None,
use_pyarrow: bool | None = None,
**kwargs: Any,
) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]:
"""
Prepare file argument.
Expand All @@ -64,8 +70,16 @@ def _prepare_file_arg(
A local path is returned as a string.
An http URL is read into a buffer and returned as a `BytesIO`.
When ``encoding`` is not ``utf8`` or ``utf8-lossy``, the whole file is
first read in python and decoded using the specified encoding and
returned as a `BytesIO` (for usage with ``read_csv``).
A `bytes` file is returned as a `BytesIO` if ``use_pyarrow=True``.
When fsspec is installed, remote file(s) is (are) opened with
`fsspec.open(file, **kwargs)` or `fsspec.open_files(file, **kwargs)`.
If encoding is not ``utf8`` or ``utf8-lossy``, decoding is handled by
fsspec too.
"""
# Small helper to use a variable as context
Expand All @@ -76,29 +90,50 @@ def managed_file(file: Any) -> Iterator[Any]:
finally:
pass

has_non_utf8_non_utf8_lossy_encoding = (
encoding not in {"utf8", "utf8-lossy"} if encoding else False
)
encoding_str = encoding if encoding else "utf8"

if isinstance(file, bytes):
if has_non_utf8_non_utf8_lossy_encoding:
return BytesIO(file.decode(encoding_str).encode("utf8"))
if use_pyarrow:
return BytesIO(file)
if isinstance(file, StringIO):
return BytesIO(file.read().encode("utf8"))
return BytesIO(file.getvalue().encode("utf8"))
if isinstance(file, BytesIO):
if has_non_utf8_non_utf8_lossy_encoding:
return BytesIO(file.getvalue().decode(encoding_str).encode("utf8"))
return managed_file(file)
if isinstance(file, Path):
if has_non_utf8_non_utf8_lossy_encoding:
return BytesIO(file.read_bytes().decode(encoding_str).encode("utf8"))
return managed_file(format_path(file))
if isinstance(file, str):
# make sure that this is before fsspec
# as fsspec needs requests to be installed
# to read from http
if file.startswith("http"):
return _process_http_file(file)
return _process_http_file(file, encoding_str)
if _WITH_FSSPEC:
if infer_storage_options(file)["protocol"] == "file":
return managed_file(format_path(file))
if not has_non_utf8_non_utf8_lossy_encoding:
if infer_storage_options(file)["protocol"] == "file":
return managed_file(format_path(file))
kwargs["encoding"] = encoding
return fsspec.open(file, **kwargs)
if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file):
if _WITH_FSSPEC:
if all(infer_storage_options(f)["protocol"] == "file" for f in file):
return managed_file([format_path(f) for f in file])
if not has_non_utf8_non_utf8_lossy_encoding:
if all(infer_storage_options(f)["protocol"] == "file" for f in file):
return managed_file([format_path(f) for f in file])
kwargs["encoding"] = encoding
return fsspec.open_files(file, **kwargs)
if isinstance(file, str):
file = format_path(file)
if has_non_utf8_non_utf8_lossy_encoding:
with open(file, encoding=encoding_str) as f:
return BytesIO(f.read().encode("utf8"))
return managed_file(file)


Expand Down
43 changes: 29 additions & 14 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def read_csv(
infer_schema_length: int | None = 100,
batch_size: int = 8192,
n_rows: int | None = None,
encoding: CsvEncoding = "utf8",
encoding: CsvEncoding | str = "utf8",
low_memory: bool = False,
rechunk: bool = True,
use_pyarrow: bool = False,
Expand Down Expand Up @@ -142,6 +142,7 @@ def read_csv(
parse_dates
Try to automatically parse dates. If this does not succeed,
the column remains of data type ``pl.Utf8``.
If ``use_pyarrow=True``, dates will always be parsed.
n_threads
Number of threads to use in csv parsing.
Defaults to the number of physical cpu's of your system.
Expand All @@ -156,16 +157,19 @@ def read_csv(
Stop reading from CSV file after reading ``n_rows``.
During multi-threaded parsing, an upper bound of ``n_rows``
rows cannot be guaranteed.
encoding : {'utf8', 'utf8-lossy'}
encoding : {'utf8', 'utf8-lossy', ...}
Lossy means that invalid utf8 values are replaced with ``�``
characters. Defaults to "utf8".
characters. When using other encodings than ``utf8`` or
``utf8-lossy``, the input is first decoded im memory with
python. Defaults to ``utf8``.
low_memory
Reduce memory usage at expense of performance.
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
use_pyarrow
Try to use pyarrow's native CSV parser.
Try to use pyarrow's native CSV parser. This will always
parse dates, even if ``parse_dates=False``.
This is not always possible. The set of arguments given to
this function determines if it is possible to use pyarrow's
native parser. Note that pyarrow and polars may have a
Expand All @@ -185,7 +189,7 @@ def read_csv(
Set the sample size. This is used to sample statistics to estimate the
allocation needed.
eol_char
Single byte end of line character
Single byte end of line character.
Returns
-------
Expand All @@ -207,6 +211,7 @@ def read_csv(
_check_arg_is_1byte("sep", sep, False)
_check_arg_is_1byte("comment_char", comment_char, False)
_check_arg_is_1byte("quote_char", quote_char, True)
_check_arg_is_1byte("eol_char", eol_char, False)

projection, columns = handle_projection_columns(columns)

Expand All @@ -233,10 +238,8 @@ def read_csv(
and dtypes is None
and n_rows is None
and n_threads is None
and encoding == "utf8"
and not low_memory
and null_values is None
and parse_dates
):
include_columns = None

Expand All @@ -253,13 +256,21 @@ def read_csv(
# for pyarrow.
include_columns = [f"f{column_idx}" for column_idx in projection]

with _prepare_file_arg(file, **storage_options) as data:
with _prepare_file_arg(
file, encoding=None, use_pyarrow=True, **storage_options
) as data:
tbl = pa.csv.read_csv(
data,
pa.csv.ReadOptions(
skip_rows=skip_rows, autogenerate_column_names=not has_header
skip_rows=skip_rows,
autogenerate_column_names=not has_header,
encoding=encoding,
),
pa.csv.ParseOptions(
delimiter=sep,
quote_char=quote_char if quote_char else False,
double_quote=quote_char is not None and quote_char == '"',
),
pa.csv.ParseOptions(delimiter=sep),
pa.csv.ConvertOptions(
column_types=None,
include_columns=include_columns,
Expand Down Expand Up @@ -370,7 +381,9 @@ def read_csv(
for column_name, column_dtype in dtypes.items()
}

with _prepare_file_arg(file, **storage_options) as data:
with _prepare_file_arg(
file, encoding=encoding, use_pyarrow=False, **storage_options
) as data:
df = DataFrame._read_csv(
file=data,
has_header=has_header,
Expand All @@ -387,7 +400,7 @@ def read_csv(
infer_schema_length=infer_schema_length,
batch_size=batch_size,
n_rows=n_rows,
encoding=encoding,
encoding=encoding if encoding == "utf8-lossy" else "utf8",
low_memory=low_memory,
rechunk=rechunk,
skip_rows_after_header=skip_rows_after_header,
Expand Down Expand Up @@ -799,7 +812,7 @@ def read_ipc(
)

storage_options = storage_options or {}
with _prepare_file_arg(file, **storage_options) as data:
with _prepare_file_arg(file, use_pyarrow=use_pyarrow, **storage_options) as data:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ImportError(
Expand Down Expand Up @@ -893,7 +906,9 @@ def read_parquet(
raise ValueError("``n_rows`` cannot be used with ``use_pyarrow=True``.")

storage_options = storage_options or {}
with _prepare_file_arg(source, **storage_options) as source_prep:
with _prepare_file_arg(
source, use_pyarrow=use_pyarrow, **storage_options
) as source_prep:
if use_pyarrow:
if not _PYARROW_AVAILABLE:
raise ImportError(
Expand Down
81 changes: 75 additions & 6 deletions py-polars/tests/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ def test_read_csv_columns_argument(


def test_read_csv_buffer_ownership() -> None:
buf = io.BytesIO(b"\xf0\x9f\x98\x80,5.55,333\n\xf0\x9f\x98\x86,-5.0,666")
bts = b"\xf0\x9f\x98\x80,5.55,333\n\xf0\x9f\x98\x86,-5.0,666"
buf = io.BytesIO(bts)
df = pl.read_csv(
buf,
has_header=False,
Expand All @@ -206,6 +207,35 @@ def test_read_csv_buffer_ownership() -> None:
assert df.shape == (2, 3)
assert df.rows() == [("😀", 5.55, 333), ("😆", -5.0, 666)]
assert not buf.closed
assert buf.read() == bts


def test_read_csv_encoding() -> None:
bts = (
b"Value1,Value2,Value3,Value4,Region\n"
b"-30,7.5,2578,1,\xa5x\xa5_\n-32,7.97,3006,1,\xa5x\xa4\xa4\n"
b"-31,8,3242,2,\xb7s\xa6\xcb\n-33,7.97,3300,3,\xb0\xaa\xb6\xaf\n"
b"-20,7.91,3384,4,\xac\xfc\xb0\xea\n"
)

file_path = os.path.join(os.path.dirname(__file__), "encoding.csv")
file_str = str(file_path)

with open(file_path, "wb") as f:
f.write(bts)

bytesio = io.BytesIO(bts)

for use_pyarrow in (False, True):
for file in (file_path, file_str, bts, bytesio):
print(type(file))
assert pl.read_csv(
file, # type: ignore[arg-type]
encoding="big5",
use_pyarrow=use_pyarrow,
).get_column("Region") == pl.Series(
"Region", ["台北", "台中", "新竹", "高雄", "美國"]
)


def test_column_rename_and_dtype_overwrite() -> None:
Expand Down Expand Up @@ -313,23 +343,62 @@ def test_empty_bytes() -> None:
pl.read_csv(b)


def test_csq_quote_char() -> None:
def test_csv_quote_char() -> None:
expected = pl.DataFrame(
[
pl.Series("linenum", [1, 2, 3, 4, 5, 6, 7, 8, 9]),
pl.Series(
"last_name",
[
"Jagger",
'O"Brian',
"Richards",
'L"Etoile',
"Watts",
"Smith",
'"Wyman"',
"Woods",
'J"o"ne"s',
],
),
pl.Series(
"first_name",
[
"Mick",
'"Mary"',
"Keith",
"Bennet",
"Charlie",
'D"Shawn',
"Bill",
"Ron",
"Brian",
],
),
]
)

rolling_stones = textwrap.dedent(
"""\
linenum,last_name,first_name
1,Jagger,Mick
2,O"Brian,Mary
2,O"Brian,"Mary"
3,Richards,Keith
4,L"Etoile,Bennet
5,Watts,Charlie
6,Smith,D"Shawn
7,Wyman,Bill
7,"Wyman",Bill
8,Woods,Ron
9,Jones,Brian
9,J"o"ne"s,Brian
"""
)

assert pl.read_csv(rolling_stones.encode(), quote_char=None).shape == (9, 3)
for use_pyarrow in (False, True):
out = pl.read_csv(
rolling_stones.encode(), quote_char=None, use_pyarrow=use_pyarrow
)
assert out.shape == (9, 3)
out.frame_equal(expected)


def test_csv_empty_quotes_char() -> None:
Expand Down

0 comments on commit 90f2f51

Please sign in to comment.