diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 2982961088990..57fc261985c51 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true } chrono = "0.4" flatbuffers = "0.6.0" hex = "0.4" +arrow-flight = { path = "../arrow-flight", optional = true } [features] simd = ["packed_simd"] -default = ["simd"] +flight = ["arrow-flight"] +default = ["simd", "flight"] [dev-dependencies] criterion = "0.2" diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs new file mode 100644 index 0000000000000..a421e6edef556 --- /dev/null +++ b/rust/arrow/src/flight/mod.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities to assist with reading and writing Arrow data as Flight messages + +use flight::FlightData; + +use crate::ipc::writer; +use crate::record_batch::RecordBatch; + +/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes +impl From<&RecordBatch> for FlightData { + fn from(batch: &RecordBatch) -> Self { + let (header, body) = writer::record_batch_to_bytes(batch); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: header, + data_body: body, + } + } +} + +// TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 36c89c7a66624..e79a344bbcf78 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -266,13 +266,8 @@ fn write_padded_data( Ok(total_len as usize) } -/// Write a record batch to the writer, writing the message size before the message -/// if the record batch is being written to a stream -fn write_record_batch( - writer: &mut BufWriter, - batch: &RecordBatch, - is_stream: bool, -) -> Result<(usize, usize)> { +/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data +pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -313,13 +308,24 @@ fn write_record_batch( let root = message.finish(); fbb.finish(root, None); let finished_data = fbb.finished_data(); + + (finished_data.to_vec(), arrow_data) +} + +/// Write a record batch to the writer, writing the message size before the message +/// if the record batch is being written to a stream +fn write_record_batch( + writer: &mut BufWriter, + batch: &RecordBatch, + is_stream: bool, +) -> Result<(usize, usize)> { + let (meta_data, arrow_data) = record_batch_to_bytes(batch); // write the length of data if writing to stream if is_stream { - let total_len: u32 = finished_data.len() as u32; + let total_len: u32 = meta_data.len() as u32; writer.write(&total_len.to_le_bytes()[..])?; } - let meta_written = - write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?; + let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?; let arrow_data_written = write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?; Ok((meta_written, arrow_data_written)) diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 899bb62f08aa9..4383922be096e 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -33,6 +33,8 @@ pub mod compute; pub mod csv; pub mod datatypes; pub mod error; +#[cfg(feature = "flight")] +pub mod flight; pub mod ipc; pub mod json; pub mod memory; diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index f9f9f5d717ff5..3a81877e1ba96 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -57,9 +57,9 @@ crossbeam = "0.7.1" criterion = "0.2.0" tempdir = "0.3.7" futures = "0.3" -prost = "0.5" +prost = "0.6" tokio = { version = "0.2", features = ["full"] } -tonic = "0.1.1" +tonic = "0.1" flatbuffers = "0.6.0" arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" } diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index f68a8311eb8b5..5431ff09e7cc6 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -6,7 +6,6 @@ use tonic::{Request, Response, Status, Streaming}; use datafusion::execution::context::ExecutionContext; -use arrow::record_batch::RecordBatch; use flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -62,14 +61,16 @@ impl FlightService for FlightServiceImpl { // execute the query let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; - let flights: Vec> = - results.iter().map(|batch| to_flight_data(batch)).collect(); + let flights: Vec> = results + .iter() + .map(|batch| Ok(FlightData::from(batch))) + .collect(); let output = futures::stream::iter(flights); Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } - Err(e) => Err(Status::unimplemented(format!("Invalid ticket: {:?}", e))), + Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))), } } @@ -123,25 +124,8 @@ impl FlightService for FlightServiceImpl { } } -fn to_flight_data(_batch: &RecordBatch) -> Result { - //TODO implement .. need help on how to encode the batches using IPC here - - Ok(FlightData { - flight_descriptor: None, - app_metadata: vec![], - /// Header for message data as described in Message.fbs::Message - data_header: vec![], - /// The actual batch of Arrow data. Preferably handled with minimal-copies - /// coming last in the definition to help with sidecar patterns (it is - /// expected that some implementations will fetch this field off the wire - /// with specialized code to avoid extra memory copies). - /// - data_body: vec![], - }) -} - fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { - Status::unimplemented(format!("{:?}", e)) + Status::internal(format!("{:?}", e)) } #[tokio::main]