Skip to content

Commit

Permalink
respect ipc column ordering (#3591)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 6, 2022
1 parent 61e7627 commit 585e2ad
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 18 deletions.
12 changes: 2 additions & 10 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,10 @@ impl<R: Read + Seek> IpcReader<R> {
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
projection: Option<&[usize]>,
projection: Option<Vec<usize>>,
) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let projection = projection.map(|x| {
let mut x = x.to_vec();
x.sort_unstable();
x
});

let schema = if let Some(projection) = &projection {
apply_projection(&metadata.schema, projection)
Expand Down Expand Up @@ -180,10 +175,7 @@ where
let schema = &metadata.schema;

if let Some(columns) = self.columns {
let mut prj = columns_to_projection(columns, schema)?;

// Ipc reader panics if the projection is not in increasing order, so sorting is the safer way.
prj.sort_unstable();
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ fn update_scan_schema(
schema: &Schema,
// this is only needed for parsers that sort the projections
// currently these are:
// sorting parsers: csv and ipc,
// non-sorting: parquet
// sorting parsers: csv,
// non-sorting: parquet, ipc
sort_projections: bool,
) -> Schema {
let mut new_schema = Schema::with_capacity(acc_projections.len());
Expand Down Expand Up @@ -379,7 +379,7 @@ impl ProjectionPushDown {
&acc_projections,
expr_arena,
&*schema,
true,
false,
)))
};
options.with_columns = with_columns;
Expand Down
6 changes: 1 addition & 5 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ impl Executor for IpcExec {
.with_n_rows(n_rows)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.finish_with_scan_ops(
predicate,
aggregate,
projection.as_ref().map(|v| v.as_ref()),
)?;
.finish_with_scan_ops(predicate, aggregate, projection)?;

if self.options.cache {
state.store_cache(cache_key, df.clone())
Expand Down
18 changes: 18 additions & 0 deletions py-polars/tests/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import List, Union

import pyarrow as pa
import pytest

import polars as pl
Expand Down Expand Up @@ -109,3 +110,20 @@ def test_ipc_schema_from_file(
"time": pl.Time,
"cat": pl.Categorical,
}


def test_ipc_column_order() -> None:
df = pl.DataFrame(
{
"cola": ["x", "y", "z"],
"colb": [1, 2, 3],
"colc": [4.5, 5.6, 6.7],
}
)
f = io.BytesIO()
df.write_ipc(f)
f.seek(0)

columns = ["colc", "colb", "cola"]
# read file into polars; the specified column order is no longer respected
assert pl.read_ipc(f, columns=columns).columns == columns

0 comments on commit 585e2ad

Please sign in to comment.