Skip to content

Commit

Permalink
fix: Respect user passed 'reader_schema' in 'scan_csv' (#16080)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 6, 2024
1 parent 575e917 commit dd6e2ee
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 18 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ chrono-tz = "0.8.1"
ciborium = "0.2"
crossbeam-channel = "0.5.8"
crossbeam-queue = "0.3"
either = "1.9"
either = "1.11"
ethnum = "1.3.2"
fallible-streaming-iterator = "0.1.9"
futures = "0.3.25"
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::*;

pub struct CsvExec {
pub path: PathBuf,
pub schema: SchemaRef,
pub file_info: FileInfo,
pub options: CsvReaderOptions,
pub file_options: FileScanOptions,
pub predicate: Option<Arc<dyn PhysicalExpr>>,
Expand All @@ -26,7 +26,9 @@ impl CsvExec {
CsvReader::from_path(&self.path)
.unwrap()
.has_header(self.options.has_header)
.with_dtypes(Some(self.schema.clone()))
.with_schema(Some(
self.file_info.reader_schema.clone().unwrap().unwrap_right(),
))
.with_separator(self.options.separator)
.with_ignore_errors(self.options.ignore_errors)
.with_skip_rows(self.options.skip_rows)
Expand Down
18 changes: 15 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ impl ParquetExec {
);

let mut reader = ParquetReader::new(file)
.with_schema(self.file_info.reader_schema.clone())
.with_schema(
self.file_info
.reader_schema
.clone()
.map(|either| either.unwrap_left()),
)
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
Expand Down Expand Up @@ -163,7 +168,9 @@ impl ParquetExec {
.file_info
.reader_schema
.as_ref()
.expect("should be set");
.expect("should be set")
.as_ref()
.unwrap_left();
let first_metadata = &self.metadata;
let cloud_options = self.cloud_options.as_ref();
let with_columns = self
Expand Down Expand Up @@ -343,7 +350,12 @@ impl ParquetExec {
);
return Ok(materialize_empty_df(
projection.as_deref(),
self.file_info.reader_schema.as_ref().unwrap(),
self.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left(),
hive_partitions.as_deref(),
self.file_options.row_index.as_ref(),
));
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub fn create_physical_plan(
let path = paths[0].clone();
Ok(Box::new(executors::CsvExec {
path,
schema: file_info.schema,
file_info,
options: csv_options,
predicate,
file_options,
Expand Down
9 changes: 7 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ParquetSource {
file_options,
projection,
chunk_size,
reader_schema,
reader_schema.map(|either| either.unwrap_left()),
hive_partitions,
))
}
Expand Down Expand Up @@ -151,7 +151,12 @@ impl ParquetSource {
.map(|v| v.as_slice());
check_projected_arrow_schema(
batched_reader.schema().as_ref(),
self.file_info.reader_schema.as_ref().unwrap(),
self.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left(),
with_columns,
"schema of all files in a single scan_parquet must be equal",
)?;
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bytemuck = { workspace = true }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
ciborium = { workspace = true, optional = true }
either = { workspace = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
once_cell = { workspace = true }
Expand All @@ -53,6 +54,7 @@ serde = [
"polars-time/serde",
"polars-io/serde",
"polars-ops/serde",
"either/serde",
]
streaming = []
parquet = ["polars-io/parquet", "polars-parquet"]
Expand Down
24 changes: 17 additions & 7 deletions crates/polars-plan/src/logical_plan/conversion/scans.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::Read;
use std::path::PathBuf;

use either::Either;
#[cfg(feature = "cloud")]
use polars_io::pl_async::get_runtime;
use polars_io::prelude::*;
Expand Down Expand Up @@ -66,7 +67,7 @@ pub(super) fn parquet_file_info(

let mut file_info = FileInfo::new(
schema,
Some(reader_schema),
Some(Either::Left(reader_schema)),
(num_rows, num_rows.unwrap_or(0)),
);

Expand Down Expand Up @@ -110,7 +111,7 @@ pub(super) fn ipc_file_info(
metadata.schema.as_ref().into(),
file_options.row_index.as_ref(),
),
Some(Arc::clone(&metadata.schema)),
Some(Either::Left(Arc::clone(&metadata.schema))),
(None, 0),
);

Expand Down Expand Up @@ -171,14 +172,23 @@ pub(super) fn csv_file_info(
.clone()
.unwrap_or_else(|| Arc::new(inferred_schema));

if let Some(rc) = &file_options.row_index {
let schema = Arc::make_mut(&mut schema);
schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?;
}
let reader_schema = if let Some(rc) = &file_options.row_index {
let reader_schema = schema.clone();
let mut output_schema = (*reader_schema).clone();
output_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?;
schema = Arc::new(output_schema);
reader_schema
} else {
schema.clone()
};

let n_bytes = reader_bytes.len();
let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize;

csv_options.skip_rows += csv_options.skip_rows_after_header;
Ok(FileInfo::new(schema, None, (None, estimated_n_rows)))
Ok(FileInfo::new(
schema,
Some(Either::Right(reader_schema)),
(None, estimated_n_rows),
))
}
5 changes: 3 additions & 2 deletions crates/polars-plan/src/logical_plan/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;
use std::sync::Mutex;

use arrow::datatypes::ArrowSchemaRef;
use either::Either;
use polars_core::prelude::*;
use polars_utils::format_smartstring;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -43,7 +44,7 @@ pub struct FileInfo {
pub schema: SchemaRef,
/// Stores the schema used for the reader, as the main schema can contain
/// extra hive columns.
pub reader_schema: Option<ArrowSchemaRef>,
pub reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
/// - known size
/// - estimated size
pub row_estimation: (Option<usize>, usize),
Expand All @@ -54,7 +55,7 @@ impl FileInfo {
/// Constructs a new [`FileInfo`].
pub fn new(
schema: SchemaRef,
reader_schema: Option<ArrowSchemaRef>,
reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
row_estimation: (Option<usize>, usize),
) -> Self {
Self {
Expand Down
25 changes: 25 additions & 0 deletions py-polars/tests/unit/io/test_lazy_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,28 @@ def test_csv_null_values_with_projection_15515() -> None:
"SireKey": [None],
"BirthDate": [19940315],
}


@pytest.mark.write_disk()
def test_csv_respect_user_schema_ragged_lines_15254() -> None:
with tempfile.NamedTemporaryFile() as f:
f.write(
b"""
A,B,C
1,2,3
4,5,6,7,8
9,10,11
""".strip()
)
f.seek(0)

df = pl.scan_csv(
f.name, schema=dict.fromkeys("ABCDE", pl.String), truncate_ragged_lines=True
).collect()
assert df.to_dict(as_series=False) == {
"A": ["1", "4", "9"],
"B": ["2", "5", "10"],
"C": ["3", "6", "11"],
"D": [None, "7", None],
"E": [None, "8", None],
}

0 comments on commit dd6e2ee

Please sign in to comment.