Skip to content

Commit

Permalink
Fix reading a df with an empty series (#2685)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxyme committed Feb 24, 2022
1 parent 223c9b9 commit bd424be
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ mod test {

#[test]
fn test_series_equal() {
let a = Series::new("a", &[1, 2, 3]);
let b = Series::new("a", &[1, 2, 3]);
let a = Series::new("a", &[1_u32, 2, 3]);
let b = Series::new("a", &[1_u32, 2, 3]);
assert!(a.series_equal(&b));

let s = Series::new("foo", &[None, Some(1i64)]);
Expand Down
15 changes: 15 additions & 0 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,19 @@ mod test {
assert!(df.frame_equal(&df_read));
}
}

#[test]
fn write_and_read_ipc_empty_series() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let chunked_array = Float64Chunked::new("empty", &[0_f64; 0]);
let mut df = DataFrame::new(vec![chunked_array.into_series()]).unwrap();
IpcWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");

buf.set_position(0);

let df_read = IpcReader::new(buf).finish().unwrap();
assert!(df.frame_equal(&df_read));
}
}
30 changes: 25 additions & 5 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub(crate) mod utils;

pub use options::*;

use arrow::error::Result as ArrowResult;

#[cfg(any(
feature = "ipc",
feature = "parquet",
Expand All @@ -54,6 +52,9 @@ use crate::aggregations::{apply_aggregations, ScanAggregation};
feature = "avro"
))]
use crate::predicates::PhysicalIoExpr;
#[allow(unused)] // remove when updating to rust nightly >= 1.61
use arrow::array::new_empty_array;
use arrow::error::Result as ArrowResult;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;
use std::io::{Read, Seek, Write};
Expand Down Expand Up @@ -134,10 +135,29 @@ pub(crate) fn finish_reader<R: ArrowReader>(
}
}
}
let mut df = accumulate_dataframes_vertical(parsed_dfs)?;

// Aggregations must be applied a final time to aggregate the partitions
apply_aggregations(&mut df, aggregate)?;
let df = {
if parsed_dfs.is_empty() {
// Create an empty dataframe with the correct data types
let empty_cols = arrow_schema
.fields
.iter()
.map(|fld| {
Series::try_from((
fld.name.as_str(),
Arc::from(new_empty_array(fld.data_type.clone())),
))
})
.collect::<Result<_>>()?;
DataFrame::new(empty_cols)?
} else {
// If there are any rows, accumulate them into a df
let mut df = accumulate_dataframes_vertical(parsed_dfs)?;
// Aggregations must be applied a final time to aggregate the partitions
apply_aggregations(&mut df, aggregate)?;
df
}
};

match rechunk {
true => Ok(df.agg_chunks()),
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ fn test_with_row_count_opts() -> Result<()> {
.tail(5)
.collect()?;
let expected = df![
"row_nr" => [5, 6, 7, 8, 9],
"row_nr" => [5_u32, 6, 7, 8, 9],
"a" => [5, 6, 7, 8, 9],
]?;

assert!(out.frame_equal(&expected));
let out = df
.clone()
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,10 @@ fn test_filter_and_alias() -> Result<()> {

let expected = df![
"a" => [2, 2],
"a_squared" => [4, 4]
"a_squared" => [4.0, 4.0]
]?;

println!("{:?}", out);
println!("{:?}", expected);
assert!(out.frame_equal(&expected));
Ok(())
}
Expand Down

0 comments on commit bd424be

Please sign in to comment.