Skip to content

Commit

Permalink
improve csv row skipping and fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 1, 2022
1 parent be32b14 commit 621b5c7
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 64 deletions.
41 changes: 33 additions & 8 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ where
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
quote_char: Option<u8>,
offset_schema_inference: usize,
skip_rows_after_header: usize,
#[cfg(feature = "temporal")]
parse_dates: bool,
}
Expand All @@ -238,9 +238,9 @@ impl<'a, R> CsvReader<'a, R>
where
R: 'a + MmapBytesReader,
{
/// Start schema parsing of the header and dtypes at this offset.
pub fn with_offset_schema_inference(mut self, offset: usize) -> Self {
self.offset_schema_inference = offset;
/// Skip these rows after the header
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
self.skip_rows_after_header = offset;
self
}

Expand Down Expand Up @@ -278,7 +278,7 @@ where
self
}

/// Skip the first `n` rows during parsing.
/// Skip the first `n` rows during parsing. The header will be parsed an `n` lines.
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
self
Expand Down Expand Up @@ -449,7 +449,7 @@ where
predicate: None,
aggregate: None,
quote_char: Some(b'"'),
offset_schema_inference: 0,
skip_rows_after_header: 0,
#[cfg(feature = "temporal")]
parse_dates: false,
}
Expand Down Expand Up @@ -527,7 +527,7 @@ where
self.predicate,
self.aggregate,
&to_cast,
self.offset_schema_inference,
self.skip_rows_after_header,
)?;
csv_reader.as_df()?
} else {
Expand Down Expand Up @@ -556,7 +556,7 @@ where
self.predicate,
self.aggregate,
&[],
self.offset_schema_inference,
self.skip_rows_after_header,
)?;
csv_reader.as_df()?
};
Expand Down Expand Up @@ -1476,4 +1476,29 @@ b5bbf310dffe3372fd5d37a18339fea5,e3fd7b95be3453a34361da84f815687d,-2,0.0335936,8

Ok(())
}

#[test]
fn test_skip_inference() -> Result<()> {
let csv = r#"metadata
line
foo,bar
1,2
3,4
5,6
"#;
let file = Cursor::new(csv);
let df = CsvReader::new(file.clone()).with_skip_rows(2).finish()?;
assert_eq!(df.get_column_names(), &["foo", "bar"]);
assert_eq!(df.shape(), (3, 2));
let df = CsvReader::new(file.clone())
.with_skip_rows(2)
.with_skip_rows_after_header(2)
.finish()?;
assert_eq!(df.get_column_names(), &["foo", "bar"]);
assert_eq!(df.shape(), (1, 2));
let df = CsvReader::new(file).finish()?;
assert_eq!(df.shape(), (5, 1));

Ok(())
}
}
11 changes: 4 additions & 7 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<'a> CoreReader<'a> {
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
to_cast: &'a [&'a Field],
schema_inference_offset: usize,
skip_rows_after_header: usize,
) -> Result<CoreReader<'a>> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;
Expand All @@ -175,7 +175,6 @@ impl<'a> CoreReader<'a> {
// check if schema should be inferred
let delimiter = delimiter.unwrap_or(b',');

let mut add_skip_rows = 0;
let mut schema = match schema {
Some(schema) => Cow::Borrowed(schema),
None => {
Expand All @@ -194,11 +193,10 @@ impl<'a> CoreReader<'a> {
max_records,
has_header,
schema_overwrite,
&mut add_skip_rows,
&mut skip_rows,
comment_char,
quote_char,
null_values.as_ref(),
schema_inference_offset,
)?;
Cow::Owned(inferred_schema)
}
Expand All @@ -210,17 +208,16 @@ impl<'a> CoreReader<'a> {
max_records,
has_header,
schema_overwrite,
&mut add_skip_rows,
&mut skip_rows,
comment_char,
quote_char,
null_values.as_ref(),
schema_inference_offset,
)?;
Cow::Owned(inferred_schema)
}
}
};
skip_rows += add_skip_rows;
skip_rows += skip_rows_after_header;
if let Some(dtypes) = dtype_overwrite {
let mut s = schema.into_owned();
let fields = s.fields_mut();
Expand Down
5 changes: 1 addition & 4 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,13 @@ pub fn infer_file_schema(
comment_char: Option<u8>,
quote_char: Option<u8>,
null_values: Option<&NullValues>,
// start schema and header inference after `offset` lines
offset: usize,
) -> Result<(Schema, usize)> {
// We use lossy utf8 here because we don't want the schema inference to fail on utf8.
// It may later.
let encoding = CsvEncoding::LossyUtf8;

let bytes = skip_line_ending(skip_bom(reader_bytes)).0;
let mut lines = SplitLines::new(bytes, b'\n').skip(*skip_rows + offset);
let mut lines = SplitLines::new(bytes, b'\n').skip(*skip_rows);

// get or create header names
// when has_header is false, creates default column names with column_ prefix
Expand Down Expand Up @@ -325,7 +323,6 @@ pub fn infer_file_schema(
comment_char,
quote_char,
null_values,
offset,
);
}

Expand Down
20 changes: 10 additions & 10 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct LazyCsvReader<'a> {
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
rechunk: bool,
offset_schema_inference: usize,
skip_rows_after_header: usize,
}

#[cfg(feature = "csv-file")]
Expand All @@ -46,14 +46,14 @@ impl<'a> LazyCsvReader<'a> {
null_values: None,
infer_schema_length: Some(100),
rechunk: true,
offset_schema_inference: 0,
skip_rows_after_header: 0,
}
}

/// Start schema parsing of the header at this offset
/// Skip this number of rows after the header location.
#[must_use]
pub fn with_offset_schema_inference(mut self, offset: usize) -> Self {
self.offset_schema_inference = offset;
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
self.skip_rows_after_header = offset;
self
}

Expand Down Expand Up @@ -88,7 +88,7 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Skip the first `n` rows during parsing.
/// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
#[must_use]
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
Expand Down Expand Up @@ -162,24 +162,24 @@ impl<'a> LazyCsvReader<'a> {
/// Modify a schema before we run the lazy scanning.
///
/// Important! Run this function latest in the builder!
pub fn with_schema_modify<F>(mut self, f: F) -> Result<Self>
pub fn with_schema_modify<F>(self, f: F) -> Result<Self>
where
F: Fn(Schema) -> Result<Schema>,
{
let mut file = std::fs::File::open(&self.path)?;
let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let mut skip_rows = self.skip_rows;

let (schema, _) = infer_file_schema(
&reader_bytes,
self.delimiter,
self.infer_schema_length,
self.has_header,
self.schema_overwrite,
&mut self.skip_rows,
&mut skip_rows,
self.comment_char,
self.quote_char,
None,
self.offset_schema_inference,
)?;
let schema = f(schema)?;
Ok(self.with_schema(Arc::new(schema)))
Expand All @@ -202,7 +202,7 @@ impl<'a> LazyCsvReader<'a> {
self.null_values,
self.infer_schema_length,
self.rechunk,
self.offset_schema_inference,
self.skip_rows_after_header,
)?
.build()
.into();
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl LogicalPlanBuilder {
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
rechunk: bool,
offset_schema_inference: usize,
skip_rows_after_header: usize,
) -> Result<Self> {
let path = path.into();
let mut file = std::fs::File::open(&path)?;
Expand All @@ -125,11 +125,11 @@ impl LogicalPlanBuilder {
comment_char,
quote_char,
null_values.as_ref(),
offset_schema_inference,
)
.expect("could not read schema");
Arc::new(schema)
});
skip_rows += skip_rows_after_header;
Ok(LogicalPlan::CsvScan {
path,
schema,
Expand Down
13 changes: 7 additions & 6 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _read_csv(
encoding: str = "utf8",
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
skip_rows_after_header: int = 0,
) -> "DataFrame":
"""
Read a CSV file into a Dataframe.
Expand All @@ -410,7 +410,8 @@ def _read_csv(
Single byte character used for csv quoting, default = ''.
Set to None to turn off special handling and escaping of quotes.
skip_rows
Start reading after ``skip_rows`` lines.
Start reading after ``skip_rows`` lines. The header is also inferred at
this offset
dtypes
Overwrite dtypes during inference.
null_values
Expand Down Expand Up @@ -449,8 +450,8 @@ def _read_csv(
rechunk
Make sure that all columns are contiguous in memory by
aggregating the chunks into a single array.
offset_schema_inference
Start schema parsing of the header at this offset
skip_rows_after_header
These number of rows will be skipped when the header is parsed.
Returns
-------
Expand Down Expand Up @@ -512,7 +513,7 @@ def _read_csv(
n_rows=n_rows,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
skip_rows_after_header=skip_rows_after_header,
)
if columns is None:
return scan.collect()
Expand Down Expand Up @@ -547,7 +548,7 @@ def _read_csv(
quote_char,
processed_null_values,
parse_dates,
offset_schema_inference,
skip_rows_after_header,
)
return self

Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def scan_csv(
n_rows: Optional[int] = None,
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
skip_rows_after_header: int = 0,
) -> "LazyFrame":
"""
See Also: `pl.scan_csv`
Expand Down Expand Up @@ -101,7 +101,7 @@ def scan_csv(
infer_schema_length,
with_column_names,
rechunk,
offset_schema_inference,
skip_rows_after_header,
)
return self

Expand Down
18 changes: 9 additions & 9 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def read_csv(
rechunk: bool = True,
use_pyarrow: bool = False,
storage_options: Optional[Dict] = None,
offset_schema_inference: int = 0,
skip_rows_after_header: int = 0,
**kwargs: Any,
) -> DataFrame:
"""
Expand Down Expand Up @@ -243,8 +243,8 @@ def read_csv(
Extra options that make sense for ``fsspec.open()`` or a
particular storage connection.
e.g. host, port, username, password, etc.
offset_schema_inference
Start schema parsing of the header at this offset
skip_rows_after_header
Skip these number of rows when the header is parsed
Returns
-------
Expand Down Expand Up @@ -406,7 +406,7 @@ def read_csv(
encoding=encoding,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
skip_rows_after_header=skip_rows_after_header,
)

if new_columns:
Expand All @@ -430,7 +430,7 @@ def scan_csv(
n_rows: Optional[int] = None,
low_memory: bool = False,
rechunk: bool = True,
offset_schema_inference: int = 0,
skip_rows_after_header: int = 0,
**kwargs: Any,
) -> LazyFrame:
"""
Expand Down Expand Up @@ -458,7 +458,7 @@ def scan_csv(
Single byte character used for csv quoting, default = ''.
Set to None to turn off special handling and escaping of quotes.
skip_rows
Start reading after ``skip_rows`` lines.
Start reading after ``skip_rows`` lines. The header will be parsed at this offset.
dtypes
Overwrite dtypes during inference.
null_values
Expand Down Expand Up @@ -487,8 +487,8 @@ def scan_csv(
Reduce memory usage in expense of performance.
rechunk
Reallocate to contiguous memory when all chunks/ files are parsed.
offset_schema_inference
Start schema parsing of the header at this offset
skip_rows_after_header
Skip these number of rows when the header is parsed
Examples
--------
Expand Down Expand Up @@ -554,7 +554,7 @@ def scan_csv(
n_rows=n_rows,
low_memory=low_memory,
rechunk=rechunk,
offset_schema_inference=offset_schema_inference,
skip_rows_after_header=skip_rows_after_header,
)


Expand Down

0 comments on commit 621b5c7

Please sign in to comment.