Skip to content

Commit

Permalink
fix ipc ordering (#3947)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 8, 2022
1 parent 6e325e7 commit 3542369
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 62 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "Arrow interfaces for Polars DataFrame library"
[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ package = "arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "98e49133b2e56e51e30335830485b3cf768eb5a2"
# path = "../../../arrow2"
branch = "fix_cast_dict"
branch = "polars"
# version = "0.12"
default-features = false
features = [
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private = ["polars-time/private"]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
75 changes: 18 additions & 57 deletions polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,19 @@ impl<R: Read + Seek> IpcReader<R> {
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
projection: Option<Vec<usize>>,
) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

let sorted_projection = projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let reader =
read::FileReader::new(&mut self.reader, metadata, sorted_projection, self.n_rows);
read::FileReader::new(&mut self.reader, metadata, self.projection, self.n_rows);

let include_row_count = self.row_count.is_some();
finish_reader(
reader,
rechunk,
Expand All @@ -144,7 +137,6 @@ impl<R: Read + Seek> IpcReader<R> {
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, projection, include_row_count))
}
}

Expand Down Expand Up @@ -187,22 +179,16 @@ where
self.projection = Some(prj);
}

let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let include_row_count = self.row_count.is_some();
let ipc_reader = read::FileReader::new(
&mut self.reader,
metadata.clone(),
sorted_projection,
self.projection,
self.n_rows,
);
finish_reader(
Expand All @@ -214,31 +200,6 @@ where
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, self.projection, include_row_count))
}
}

fn fix_column_order(df: DataFrame, projection: Option<Vec<usize>>, row_count: bool) -> DataFrame {
if let Some(proj) = projection {
let offset = if row_count { 1 } else { 0 };
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if row_count {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

DataFrame::new_no_checks(cols)
} else {
df
}
}

Expand Down Expand Up @@ -415,33 +376,33 @@ mod test {

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"letters" => ["x", "y", "z"],
"ints" => [123, 456, 789],
"floats" => [4.5, 10.0, 10.0],
"other" => ["misc", "other", "value"],
]
.unwrap();
IpcWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
"letters" => ["x", "y", "z"],
"floats" => [4.5, 10.0, 10.0],
"other" => ["misc", "other", "value"],
"ints" => [123, 456, 789],
]
.unwrap();
let df_read = IpcReader::new(buf)
let df_read = IpcReader::new(&mut buf)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
"letters".to_string(),
"floats".to_string(),
"other".to_string(),
"ints".to_string(),
]))
.finish()
.unwrap();
df_read.frame_equal(&expected);
assert!(df_read.frame_equal(&expected));
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl IpcExec {
.with_n_rows(n_rows)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.finish_with_scan_ops(predicate, aggregate, projection)
.with_projection(projection)
.finish_with_scan_ops(predicate, aggregate)
}
}

Expand Down
3 changes: 2 additions & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3542369

Please sign in to comment.