Skip to content

Commit

Permalink
fix ipc column order (#3706)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 15, 2022
1 parent e177a75 commit 55acdb6
Showing 1 changed file with 73 additions and 6 deletions.
79 changes: 73 additions & 6 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl<R: Read + Seek> IpcReader<R> {
self
}

#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
mut self,
Expand All @@ -120,14 +119,20 @@ impl<R: Read + Seek> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

let schema = if let Some(projection) = &projection {
let sorted_projection = projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let reader = read::FileReader::new(&mut self.reader, metadata, projection);
let reader = read::FileReader::new(&mut self.reader, metadata, sorted_projection);

let include_row_count = self.row_count.is_some();
finish_reader(
reader,
rechunk,
Expand All @@ -137,6 +142,7 @@ impl<R: Read + Seek> IpcReader<R> {
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, projection, include_row_count))
}
}

Expand Down Expand Up @@ -179,13 +185,20 @@ where
self.projection = Some(prj);
}

let schema = if let Some(projection) = &self.projection {
let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection);
let include_row_count = self.row_count.is_some();
let ipc_reader =
read::FileReader::new(&mut self.reader, metadata.clone(), sorted_projection);
finish_reader(
ipc_reader,
rechunk,
Expand All @@ -195,6 +208,31 @@ where
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, self.projection, include_row_count))
}
}

fn fix_column_order(df: DataFrame, projection: Option<Vec<usize>>, row_count: bool) -> DataFrame {
if let Some(proj) = projection {
let offset = if row_count { 1 } else { 0 };
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if row_count {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

DataFrame::new_no_checks(cols)
} else {
df
}
}

Expand Down Expand Up @@ -367,7 +405,36 @@ mod test {
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
]
.unwrap();
IpcWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
]
.unwrap();
let df_read = IpcReader::new(buf)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
]))
.finish()
.unwrap();
df_read.frame_equal(&expected);
}

Expand Down

0 comments on commit 55acdb6

Please sign in to comment.