Skip to content

Commit

Permalink
add helpers to convert record batch to flight data proto message
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me committed Jan 29, 2020
1 parent 6beb4ea commit 516b66d
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 35 deletions.
4 changes: 3 additions & 1 deletion rust/arrow/Cargo.toml
Expand Up @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true }
chrono = "0.4"
flatbuffers = "0.6.0"
hex = "0.4"
arrow-flight = { path = "../arrow-flight", optional = true }

[features]
simd = ["packed_simd"]
default = ["simd"]
flight = ["arrow-flight"]
default = ["simd", "flight"]

[dev-dependencies]
criterion = "0.2"
Expand Down
38 changes: 38 additions & 0 deletions rust/arrow/src/flight/mod.rs
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities to assist with reading and writing Arrow data as Flight messages

use flight::FlightData;

use crate::ipc::writer;
use crate::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: header,
data_body: body,
}
}
}

// TODO: add more explicit conversion that expoess flight descriptor and metadata options
26 changes: 16 additions & 10 deletions rust/arrow/src/ipc/writer.rs
Expand Up @@ -266,13 +266,8 @@ fn write_padded_data<R: Write>(
Ok(total_len as usize)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data
pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
let mut fbb = FlatBufferBuilder::new();

let mut nodes: Vec<ipc::FieldNode> = vec![];
Expand Down Expand Up @@ -313,13 +308,24 @@ fn write_record_batch<R: Write>(
let root = message.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();

(finished_data.to_vec(), arrow_data)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
let (meta_data, arrow_data) = record_batch_to_bytes(batch);
// write the length of data if writing to stream
if is_stream {
let total_len: u32 = finished_data.len() as u32;
let total_len: u32 = meta_data.len() as u32;
writer.write(&total_len.to_le_bytes()[..])?;
}
let meta_written =
write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?;
let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?;
let arrow_data_written =
write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
Ok((meta_written, arrow_data_written))
Expand Down
2 changes: 2 additions & 0 deletions rust/arrow/src/lib.rs
Expand Up @@ -33,6 +33,8 @@ pub mod compute;
pub mod csv;
pub mod datatypes;
pub mod error;
#[cfg(feature = "flight")]
pub mod flight;
pub mod ipc;
pub mod json;
pub mod memory;
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/Cargo.toml
Expand Up @@ -57,9 +57,9 @@ crossbeam = "0.7.1"
criterion = "0.2.0"
tempdir = "0.3.7"
futures = "0.3"
prost = "0.5"
prost = "0.6"
tokio = { version = "0.2", features = ["full"] }
tonic = "0.1.1"
tonic = "0.1"
flatbuffers = "0.6.0"
arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" }

Expand Down
28 changes: 6 additions & 22 deletions rust/datafusion/examples/flight-server.rs
Expand Up @@ -6,7 +6,6 @@ use tonic::{Request, Response, Status, Streaming};

use datafusion::execution::context::ExecutionContext;

use arrow::record_batch::RecordBatch;
use flight::{
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
Expand Down Expand Up @@ -62,14 +61,16 @@ impl FlightService for FlightServiceImpl {
// execute the query
let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;

let flights: Vec<Result<FlightData, Status>> =
results.iter().map(|batch| to_flight_data(batch)).collect();
let flights: Vec<Result<FlightData, Status>> = results
.iter()
.map(|batch| Ok(FlightData::from(batch)))
.collect();

let output = futures::stream::iter(flights);

Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
Err(e) => Err(Status::unimplemented(format!("Invalid ticket: {:?}", e))),
Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))),
}
}

Expand Down Expand Up @@ -123,25 +124,8 @@ impl FlightService for FlightServiceImpl {
}
}

fn to_flight_data(_batch: &RecordBatch) -> Result<FlightData, Status> {
//TODO implement .. need help on how to encode the batches using IPC here

Ok(FlightData {
flight_descriptor: None,
app_metadata: vec![],
/// Header for message data as described in Message.fbs::Message
data_header: vec![],
/// The actual batch of Arrow data. Preferably handled with minimal-copies
/// coming last in the definition to help with sidecar patterns (it is
/// expected that some implementations will fetch this field off the wire
/// with specialized code to avoid extra memory copies).
///
data_body: vec![],
})
}

fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status {
Status::unimplemented(format!("{:?}", e))
Status::internal(format!("{:?}", e))
}

#[tokio::main]
Expand Down

0 comments on commit 516b66d

Please sign in to comment.