Skip to content

Commit

Permalink
expose and fix utf8-lossy encoding (#2570)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 7, 2022
1 parent 1ceffe9 commit 7330e3f
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 15 deletions.
6 changes: 4 additions & 2 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ where
}

#[derive(Copy, Clone)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub enum CsvEncoding {
/// Utf8 encoding
Utf8,
/// Utf8 encoding and unknown bytes are replaced with �
LossyUtf8,
}

#[derive(Clone, Debug)]
#[derive(Clone)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub enum NullValues {
/// A single value that's used for all columns
AllColumns(String),
Expand Down Expand Up @@ -250,7 +252,7 @@ where
self
}

/// Sets the CsvEncoding
/// Set [`CsvEncoding`]
pub fn with_encoding(mut self, enc: CsvEncoding) -> Self {
self.encoding = enc;
self
Expand Down
10 changes: 9 additions & 1 deletion polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ impl Utf8Field {
}
}

/// We delay validation if we expect utf8 and no errors
/// In case of `ignore-error`
#[inline]
fn delay_utf8_validation(encoding: CsvEncoding, ignore_errors: bool) -> bool {
!matches!((encoding, ignore_errors), (CsvEncoding::LossyUtf8, true))
!(matches!(encoding, CsvEncoding::LossyUtf8) || ignore_errors)
}

impl ParsedBuffer<Utf8Type> for Utf8Field {
Expand All @@ -149,6 +152,8 @@ impl ParsedBuffer<Utf8Type> for Utf8Field {
self.data
.reserve(std::cmp::max(self.data.capacity(), bytes.len()))
}

// note that one branch writes without updating the length, so we must do that later.
let n_written = if needs_escaping {
// Safety:
// we just allocated enough capacity and data_len is correct.
Expand All @@ -173,6 +178,9 @@ impl ParsedBuffer<Utf8Type> for Utf8Field {
)
.into_owned();
let b = s.as_bytes();
// Make sure that we extend at the proper location,
// otherwise we append valid bytes to invalid utf8 bytes.
unsafe { self.data.set_len(data_len) }
self.data.extend_from_slice(b);
self.offsets.push(self.data.len() as i64);
self.validity.push(true);
Expand Down
12 changes: 11 additions & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::NullValues;
use polars_io::csv::{CsvEncoding, NullValues};
use polars_io::csv_core::utils::get_reader_bytes;
use polars_io::csv_core::utils::infer_file_schema;

Expand All @@ -24,6 +24,7 @@ pub struct LazyCsvReader<'a> {
infer_schema_length: Option<usize>,
rechunk: bool,
skip_rows_after_header: usize,
encoding: CsvEncoding,
}

#[cfg(feature = "csv-file")]
Expand All @@ -47,6 +48,7 @@ impl<'a> LazyCsvReader<'a> {
infer_schema_length: Some(100),
rechunk: true,
skip_rows_after_header: 0,
encoding: CsvEncoding::Utf8,
}
}

Expand Down Expand Up @@ -159,6 +161,13 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Set [`CsvEncoding`]
#[must_use]
pub fn with_encoding(mut self, enc: CsvEncoding) -> Self {
self.encoding = enc;
self
}

/// Modify a schema before we run the lazy scanning.
///
/// Important! Run this function latest in the builder!
Expand Down Expand Up @@ -203,6 +212,7 @@ impl<'a> LazyCsvReader<'a> {
self.infer_schema_length,
self.rechunk,
self.skip_rows_after_header,
self.encoding,
)?
.build()
.into();
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::utils;
use crate::utils::{combine_predicates_expr, has_expr};
use ahash::RandomState;
use polars_core::prelude::*;
use polars_io::csv::CsvEncoding;
#[cfg(feature = "csv-file")]
use polars_io::csv_core::utils::infer_file_schema;
#[cfg(feature = "ipc")]
Expand Down Expand Up @@ -101,6 +102,7 @@ impl LogicalPlanBuilder {
infer_schema_length: Option<usize>,
rechunk: bool,
skip_rows_after_header: usize,
encoding: CsvEncoding,
) -> Result<Self> {
let path = path.into();
let mut file = std::fs::File::open(&path)?;
Expand Down Expand Up @@ -146,6 +148,7 @@ impl LogicalPlanBuilder {
quote_char,
null_values,
rechunk,
encoding,
},
predicate: None,
aggregate: vec![],
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::NullValues;
use polars_io::csv::{CsvEncoding, NullValues};

#[derive(Clone, Debug)]
pub struct CsvParserOptions {
Expand All @@ -16,6 +16,7 @@ pub struct CsvParserOptions {
pub(crate) cache: bool,
pub(crate) null_values: Option<NullValues>,
pub(crate) rechunk: bool,
pub(crate) encoding: CsvEncoding,
}
#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl Executor for CsvExec {
.with_encoding(CsvEncoding::LossyUtf8)
.with_comment_char(self.options.comment_char)
.with_quote_char(self.options.quote_char)
.with_encoding(self.options.encoding)
.with_rechunk(self.options.rechunk)
.finish()?;

Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4887,7 +4887,7 @@ def count(self) -> DataFrame:
"""
Count the number of values in each group.
"""
return self.agg(pli.all().count())
return self.agg(pli.lazy_functions.count())

def mean(self) -> DataFrame:
"""
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def scan_csv(
with_column_names: Optional[Callable[[List[str]], List[str]]] = None,
infer_schema_length: Optional[int] = 100,
n_rows: Optional[int] = None,
encoding: str = "utf8",
low_memory: bool = False,
rechunk: bool = True,
skip_rows_after_header: int = 0,
Expand Down Expand Up @@ -102,6 +103,7 @@ def scan_csv(
with_column_names,
rechunk,
skip_rows_after_header,
encoding,
)
return self

Expand Down
6 changes: 6 additions & 0 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def scan_csv(
with_column_names: Optional[Callable[[List[str]], List[str]]] = None,
infer_schema_length: Optional[int] = 100,
n_rows: Optional[int] = None,
encoding: str = "utf8",
low_memory: bool = False,
rechunk: bool = True,
skip_rows_after_header: int = 0,
Expand Down Expand Up @@ -483,6 +484,10 @@ def scan_csv(
If set to ``None``, a full table scan will be done (slow).
n_rows
Stop reading from CSV file after reading ``n_rows``.
encoding
Allowed encodings: ``utf8`` or ``utf8-lossy``.
Lossy means that invalid utf8 values are replaced with ``�``
characters.
low_memory
Reduce memory usage in expense of performance.
rechunk
Expand Down Expand Up @@ -555,6 +560,7 @@ def scan_csv(
low_memory=low_memory,
rechunk=rechunk,
skip_rows_after_header=skip_rows_after_header,
encoding=encoding,
)


Expand Down
10 changes: 5 additions & 5 deletions py-polars/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,18 @@ pub fn get_mmap_bytes_reader<'a>(py_f: &'a PyAny) -> PyResult<Box<dyn MmapBytesR
let gil = Python::acquire_gil();
let py = gil.python();

// bytes object
if let Ok(bytes) = py_f.downcast::<PyBytes>() {
Ok(Box::new(Cursor::new(bytes.as_bytes())))
}
// string so read file
if let Ok(pstring) = py_f.downcast::<PyString>() {
else if let Ok(pstring) = py_f.downcast::<PyString>() {
let s = pstring.to_string();
let p = std::path::Path::new(&s);
let p = resolve_homedir(p);
let f = File::open(&p)?;
Ok(Box::new(f))
}
// bytes object
else if let Ok(bytes) = py_f.downcast::<PyBytes>() {
Ok(Box::new(Cursor::new(bytes.as_bytes())))
}
// a normal python file: with open(...) as f:.
else if py_f.getattr("read").is_ok() {
// we van still get a file name so open the file instead of go through read
Expand Down
14 changes: 13 additions & 1 deletion py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::prelude::{NullValues, ScanArgsIpc, ScanArgsParquet};
use crate::utils::str_to_polarstype;
use polars::lazy::frame::{AllowedOptimizations, LazyCsvReader, LazyFrame, LazyGroupBy};
use polars::lazy::prelude::col;
use polars::prelude::{ClosedWindow, DataFrame, Field, JoinType, Schema};
use polars::prelude::{ClosedWindow, CsvEncoding, DataFrame, Field, JoinType, Schema};
use polars::time::*;
use polars_core::frame::DistinctKeepStrategy;
use polars_core::prelude::QuantileInterpolOptions;
Expand Down Expand Up @@ -107,12 +107,23 @@ impl PyLazyFrame {
with_schema_modify: Option<PyObject>,
rechunk: bool,
skip_rows_after_header: usize,
encoding: &str,
) -> PyResult<Self> {
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
let quote_char = quote_char.map(|s| s.as_bytes()[0]);
let delimiter = sep.as_bytes()[0];

let encoding = match encoding {
"utf8" => CsvEncoding::Utf8,
"utf8-lossy" => CsvEncoding::LossyUtf8,
e => {
return Err(
PyPolarsEr::Other(format!("encoding not {} not implemented.", e)).into(),
)
}
};

let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
let fields = overwrite_dtype
.iter()
Expand All @@ -138,6 +149,7 @@ impl PyLazyFrame {
.with_quote_char(quote_char)
.with_rechunk(rechunk)
.with_skip_rows_after_header(skip_rows_after_header)
.with_encoding(encoding)
.with_null_values(null_values);

if let Some(lambda) = with_schema_modify {
Expand Down
3 changes: 2 additions & 1 deletion py-polars/tests/lazy_io/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.parquet
*.parquet
*.csv
18 changes: 18 additions & 0 deletions py-polars/tests/lazy_io/test_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from os import path

import numpy as np

import polars as pl


def test_invalid_utf8() -> None:
np.random.seed(1)
bts = bytes(np.random.randint(0, 255, 200))
file = path.join(path.dirname(__file__), "nonutf8.csv")

with open(file, "wb") as f:
f.write(bts)

a = pl.read_csv(file, has_headers=False, encoding="utf8-lossy")
b = pl.scan_csv(file, has_headers=False, encoding="utf8-lossy").collect()
assert a.frame_equal(b, null_equal=True)
4 changes: 2 additions & 2 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def test_groupby() -> None:
assert df.groupby("b").last().shape == (2, 3)
assert df.groupby("b").max().shape == (2, 3)
assert df.groupby("b").min().shape == (2, 3)
assert df.groupby("b").count().shape == (2, 3)
assert df.groupby("b").count().shape == (2, 2)
assert df.groupby("b").mean().shape == (2, 3)
assert df.groupby("b").n_unique().shape == (2, 3)
assert df.groupby("b").median().shape == (2, 3)
Expand Down Expand Up @@ -1581,7 +1581,7 @@ def __repr__(self): # type: ignore

def test_groupby_order_dispatch() -> None:
df = pl.DataFrame({"x": list("bab"), "y": range(3)})
expected = pl.DataFrame({"x": ["b", "a"], "y_count": [2, 1]})
expected = pl.DataFrame({"x": ["b", "a"], "count": [2, 1]})
assert df.groupby("x", maintain_order=True).count().frame_equal(expected)
expected = pl.DataFrame({"x": ["b", "a"], "y_agg_list": [[0, 2], [1]]})
assert df.groupby("x", maintain_order=True).agg_list().frame_equal(expected)
Expand Down

0 comments on commit 7330e3f

Please sign in to comment.