Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
c-peters committed Apr 14, 2024
1 parent 791d9b3 commit 4655a3c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ io_ipc = ["arrow-format", "polars-error/arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd", "io_ipc"]
io_flight = ["io_ipc", "arrow-format/flight-data","io_ipc_read_async"]
io_flight = ["io_ipc", "arrow-format/flight-data", "io_ipc_read_async"]

io_avro = ["avro-schema", "polars-error/avro-schema"]
io_avro_compression = [
Expand Down
3 changes: 0 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ fn read_dictionary_block<R: Read + Seek>(
message_scratch: &mut Vec<u8>,
dictionary_scratch: &mut Vec<u8>,
) -> PolarsResult<()> {
let message = read_ipc_message_from_block(reader, block, message_scratch)?;
let batch = get_dictionary_batch(&message)?;

let offset: u64 = block
.offset
.try_into()
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-arrow/src/io/ipc/read/flight_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use polars_error::{polars_bail, polars_err, PolarsResult};

use super::OutOfSpecKind;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::ArrowSchema;
use crate::io::ipc::read::schema::deserialize_stream_metadata;
use crate::io::ipc::read::{read_dictionary, read_record_batch, Dictionaries, StreamMetadata};
use crate::io::ipc::write::common::EncodedData;

use crate::record_batch::RecordBatch;
pub struct FlightStreamReader {
metadata: Option<StreamMetadata>,
dictionaries: Dictionaries,
Expand All @@ -33,7 +32,10 @@ impl FlightStreamReader {
.map(|metadata| &metadata.schema)
}

pub fn parse(&mut self, data: EncodedData) -> PolarsResult<Option<Chunk<Box<dyn Array>>>> {
pub fn parse(
&mut self,
data: EncodedData,
) -> PolarsResult<Option<RecordBatch<Box<dyn Array>>>> {
// First message should be the schema
if self.metadata.is_none() {
let message = arrow_format::ipc::MessageRef::read_as_root(data.ipc_message.as_ref())
Expand Down Expand Up @@ -63,7 +65,7 @@ fn convert_to_arrow_chunk(
metadata: &StreamMetadata,
dictionaries: &mut Dictionaries,
flight_data: EncodedData,
) -> PolarsResult<Option<Chunk<Box<dyn Array>>>> {
) -> PolarsResult<Option<RecordBatch<Box<dyn Array>>>> {
let EncodedData {
ipc_message,
arrow_data,
Expand Down

0 comments on commit 4655a3c

Please sign in to comment.