Skip to content

Commit

Permalink
fix[rust]: process columns arg in memmapped ipc (#4713)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 3, 2022
1 parent 6576807 commit 583b112
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 8 deletions.
2 changes: 1 addition & 1 deletion polars/polars-io/src/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
avro::avro_schema::read::read_metadata(&mut self.reader).map_err(convert_err)?;
let schema = read::infer_schema(&metadata.record)?;

if let Some(columns) = self.columns {
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &schema)?);
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct IpcReader<R: MmapBytesReader> {
rechunk: bool,
pub(super) n_rows: Option<usize>,
pub(super) projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
pub(crate) columns: Option<Vec<String>>,
pub(super) row_count: Option<RowCount>,
memmap: bool,
}
Expand Down Expand Up @@ -216,7 +216,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
let metadata = read::read_file_metadata(&mut self.reader)?;
let schema = &metadata.schema;

if let Some(columns) = self.columns {
if let Some(columns) = &self.columns {
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ where
let schema = &metadata.schema;

if let Some(columns) = self.columns {
let prj = columns_to_projection(columns, schema)?;
let prj = columns_to_projection(&columns, schema)?;
self.projection = Some(prj);
}

Expand Down
10 changes: 8 additions & 2 deletions polars/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use memmap::Mmap;

use super::*;
use crate::mmap::MmapBytesReader;
use crate::utils::apply_projection;
use crate::utils::{apply_projection, columns_to_projection};

struct MMapChunkIter<'a> {
dictionaries: Dictionaries,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl ArrowReader for MMapChunkIter<'_> {

impl<R: MmapBytesReader> IpcReader<R> {
pub(super) fn finish_memmapped(
&self,
&mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
) -> Result<DataFrame> {
Expand All @@ -74,6 +74,12 @@ impl<R: MmapBytesReader> IpcReader<R> {
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;

if let Some(columns) = &self.columns {
let schema = &metadata.schema;
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}

let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;

if let Some(cols) = self.columns {
if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, &schema)?);
}

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> Ar
feature = "parquet"
))]
pub(crate) fn columns_to_projection(
columns: Vec<String>,
columns: &[String],
schema: &ArrowSchema,
) -> Result<Vec<usize>> {
use ahash::AHashMap;
Expand Down
6 changes: 6 additions & 0 deletions py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def test_from_to_file(
assert_frame_equal_local_categoricals(df, df_read)


def test_columns_arg(io_test_dir: str) -> None:
if os.name != "nt":
f_ipc = os.path.join(io_test_dir, "small.ipc")
assert pl.read_ipc(f_ipc, columns=["bools"]).columns == ["bools"]


def test_select_columns() -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
Expand Down

0 comments on commit 583b112

Please sign in to comment.