Skip to content

Commit

Permalink
add option to offset schema inference separate from skip_rows (#2378)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 14, 2022
1 parent a3e0a8a commit 510f468
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 1 deletion.
9 changes: 9 additions & 0 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ where
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
quote_char: Option<u8>,
offset_schema_inference: usize,
#[cfg(feature = "temporal")]
parse_dates: bool,
}
Expand All @@ -236,6 +237,11 @@ impl<'a, R> CsvReader<'a, R>
where
R: 'a + MmapBytesReader,
{
pub fn with_offset_schema_inference(mut self, offset: usize) -> Self {
self.offset_schema_inference = offset;
self
}

/// Sets the chunk size used by the parser. This influences performance
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
Expand Down Expand Up @@ -441,6 +447,7 @@ where
predicate: None,
aggregate: None,
quote_char: Some(b'"'),
offset_schema_inference: 0,
#[cfg(feature = "temporal")]
parse_dates: false,
}
Expand Down Expand Up @@ -518,6 +525,7 @@ where
self.predicate,
self.aggregate,
&to_cast,
self.offset_schema_inference,
)?;
csv_reader.as_df()?
} else {
Expand Down Expand Up @@ -546,6 +554,7 @@ where
self.predicate,
self.aggregate,
&[],
self.offset_schema_inference,
)?;
csv_reader.as_df()?
};
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl<'a> CoreReader<'a> {
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
to_cast: &'a [&'a Field],
schema_inference_offset: usize,
) -> Result<CoreReader<'a>> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;
Expand Down Expand Up @@ -197,6 +198,7 @@ impl<'a> CoreReader<'a> {
comment_char,
quote_char,
null_values.as_ref(),
schema_inference_offset,
)?;
Cow::Owned(inferred_schema)
}
Expand All @@ -212,6 +214,7 @@ impl<'a> CoreReader<'a> {
comment_char,
quote_char,
null_values.as_ref(),
schema_inference_offset,
)?;
Cow::Owned(inferred_schema)
}
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ pub fn infer_file_schema(
comment_char: Option<u8>,
quote_char: Option<u8>,
null_values: Option<&NullValues>,
// start schema and header inference after `offset` lines
offset: usize,
) -> Result<(Schema, usize)> {
// We use lossy utf8 here because we don't want the schema inference to fail on utf8.
// It may later.
let encoding = CsvEncoding::LossyUtf8;

let bytes = skip_line_ending(skip_bom(reader_bytes)).0;
let mut lines = SplitLines::new(bytes, b'\n').skip(*skip_rows);
let mut lines = SplitLines::new(bytes, b'\n').skip(*skip_rows + offset);

// get or create header names
// when has_header is false, creates default column names with column_ prefix
Expand Down Expand Up @@ -323,6 +325,7 @@ pub fn infer_file_schema(
comment_char,
quote_char,
null_values,
offset,
);
}

Expand Down
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct LazyCsvReader<'a> {
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
rechunk: bool,
offset_schema_inference: usize,
}

#[cfg(feature = "csv-file")]
Expand All @@ -45,9 +46,17 @@ impl<'a> LazyCsvReader<'a> {
null_values: None,
infer_schema_length: Some(100),
rechunk: true,
offset_schema_inference: 0,
}
}

/// Start schema parsing of the header at this offset
#[must_use]
pub fn with_offset_schema_inference(mut self, offset: usize) -> Self {
self.offset_schema_inference = offset;
self
}

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
#[must_use]
Expand Down Expand Up @@ -170,6 +179,7 @@ impl<'a> LazyCsvReader<'a> {
self.comment_char,
self.quote_char,
None,
self.offset_schema_inference,
)?;
let schema = f(schema)?;
Ok(self.with_schema(Arc::new(schema)))
Expand All @@ -192,6 +202,7 @@ impl<'a> LazyCsvReader<'a> {
self.null_values,
self.infer_schema_length,
self.rechunk,
self.offset_schema_inference,
)?
.build()
.into();
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl LogicalPlanBuilder {
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
rechunk: bool,
offset_schema_inference: usize,
) -> Result<Self> {
let path = path.into();
let mut file = std::fs::File::open(&path)?;
Expand All @@ -125,6 +126,7 @@ impl LogicalPlanBuilder {
comment_char,
quote_char,
null_values.as_ref(),
offset_schema_inference,
)
.expect("could not read schema");
Arc::new(schema)
Expand Down
5 changes: 5 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ def _read_csv(
encoding: str = "utf8",
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
) -> "DataFrame":
"""
Read a CSV file into a Dataframe.
Expand Down Expand Up @@ -449,6 +450,8 @@ def _read_csv(
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
offset_schema_inference
Start schema parsing of the header at this offset
Returns
-------
Expand Down Expand Up @@ -510,6 +513,7 @@ def _read_csv(
n_rows=n_rows,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
)
if columns is None:
return scan.collect()
Expand Down Expand Up @@ -544,6 +548,7 @@ def _read_csv(
quote_char,
processed_null_values,
parse_dates,
offset_schema_inference,
)
return self

Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def scan_csv(
n_rows: Optional[int] = None,
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
) -> "LazyFrame":
"""
See Also: `pl.scan_csv`
Expand Down Expand Up @@ -100,6 +101,7 @@ def scan_csv(
infer_schema_length,
with_column_names,
rechunk,
offset_schema_inference,
)
return self

Expand Down
8 changes: 8 additions & 0 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def read_csv(
rechunk: bool = True,
use_pyarrow: bool = False,
storage_options: Optional[Dict] = None,
offset_schema_inference: int = 0,
**kwargs: Any,
) -> DataFrame:
"""
Expand Down Expand Up @@ -242,6 +243,8 @@ def read_csv(
Extra options that make sense for ``fsspec.open()`` or a
particular storage connection.
e.g. host, port, username, password, etc.
offset_schema_inference
Start schema parsing of the header at this offset
Returns
-------
Expand Down Expand Up @@ -403,6 +406,7 @@ def read_csv(
encoding=encoding,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
)

if new_columns:
Expand All @@ -426,6 +430,7 @@ def scan_csv(
n_rows: Optional[int] = None,
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
**kwargs: Any,
) -> LazyFrame:
"""
Expand Down Expand Up @@ -482,6 +487,8 @@ def scan_csv(
Reduce memory usage in expense of performance.
rechunk
Reallocate to contiguous memory when all chunks/ files are parsed.
offset_schema_inference
Start schema parsing of the header at this offset
Examples
--------
Expand Down Expand Up @@ -547,6 +554,7 @@ def scan_csv(
n_rows=n_rows,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
)


Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl PyDataFrame {
quote_char: Option<&str>,
null_values: Option<Wrap<NullValues>>,
parse_dates: bool,
offset_schema_inference: usize,
) -> PyResult<Self> {
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
Expand Down Expand Up @@ -163,6 +164,7 @@ impl PyDataFrame {
.with_null_values(null_values)
.with_parse_dates(parse_dates)
.with_quote_char(quote_char)
.with_offset_schema_inference(offset_schema_inference)
.finish()
.map_err(PyPolarsEr::from)?;
Ok(df.into())
Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl PyLazyFrame {
infer_schema_length: Option<usize>,
with_schema_modify: Option<PyObject>,
rechunk: bool,
offset_schema_inference: usize,
) -> PyResult<Self> {
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
Expand Down Expand Up @@ -135,6 +136,7 @@ impl PyLazyFrame {
.with_comment_char(comment_char)
.with_quote_char(quote_char)
.with_rechunk(rechunk)
.with_offset_schema_inference(offset_schema_inference)
.with_null_values(null_values);

if let Some(lambda) = with_schema_modify {
Expand Down
14 changes: 14 additions & 0 deletions py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,3 +552,17 @@ def test_csv_globbing() -> None:

df = pl.read_csv(path, dtypes=dtypes)
assert df.dtypes == list(dtypes.values())


def test_csv_schema_offset() -> None:
csv = r"""
c,d,3
a,b,c
--
-
a,b,c
1,2,3
1,2,3
""".encode()
df = pl.read_csv(csv, offset_schema_inference=4, skip_rows=4)
assert df.columns == ["a", "b", "c"]

0 comments on commit 510f468

Please sign in to comment.