Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumped to new arrow-format.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 17, 2021
1 parent 6cb32f6 commit f5de334
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 51 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "4.0", optional = true, default-features = false }

arrow-format = { version = "0.1.3", optional = true, features = ["ipc"] }
arrow-format = { version = "*", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down Expand Up @@ -111,7 +111,7 @@ io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["arrow-format"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
io_parquet_compression = [
"parquet2/zstd",
"parquet2/snappy",
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.1.3", features = ["ipc", "flight"] }
arrow-format = { version = "*", features = ["ipc", "flight-service"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
Expand Down
Expand Up @@ -17,9 +17,9 @@

use crate::{AUTH_PASSWORD, AUTH_USERNAME};

use arrow_format::flight::{
Action,
flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest,
use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth};
use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use futures::{stream, StreamExt};
use prost::Message;
Expand Down
Expand Up @@ -20,10 +20,11 @@ use crate::{read_json_file, ArrowFile};
use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch};
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow_format::flight::{
flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient,
use arrow_format::flight::data::{
flight_descriptor::DescriptorType,
FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down
6 changes: 3 additions & 3 deletions integration-testing/src/flight_client_scenarios/middleware.rs
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use arrow_format::flight::{
flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient,
FlightDescriptor,
use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor};
use tonic::{Request, Status};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/flight_server_scenarios.rs
Expand Up @@ -17,7 +17,7 @@

use std::net::SocketAddr;

use arrow_format::flight::{FlightEndpoint, Location, Ticket};
use arrow_format::flight::data::{FlightEndpoint, Location, Ticket};
use tokio::net::TcpListener;

pub mod auth_basic_proto;
Expand Down
Expand Up @@ -18,16 +18,14 @@
use std::pin::Pin;
use std::sync::Arc;

use arrow_format::flight::{
flight_service_server::FlightService, flight_service_server::FlightServiceServer,
Action, ActionType, BasicAuth, Criteria, Empty, FlightData, FlightDescriptor,
FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket,
};
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
use tonic::{
metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming,
};

type TonicStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -102,7 +100,7 @@ impl FlightService for AuthBasicProtoScenarioImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down Expand Up @@ -202,7 +200,7 @@ impl FlightService for AuthBasicProtoScenarioImpl {
let flight_context = self.check_auth(request.metadata()).await?;
// Respond with the authenticated username.
let buf = flight_context.peer_identity().as_bytes().to_vec();
let result = arrow_format::flight::Result { body: buf };
let result = arrow_format::flight::data::Result { body: buf };
let output = futures::stream::once(async { Ok(result) });
Ok(Response::new(Box::pin(output) as Self::DoActionStream))
}
Expand Down
Expand Up @@ -20,10 +20,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::convert::TryFrom;

use arrow2::io::flight::{serialize_schema_to_result, serialize_batch, serialize_schema};
use arrow_format::flight::flight_descriptor::*;
use arrow_format::flight::flight_service_server::*;
use arrow_format::flight::*;
use arrow2::io::flight::{serialize_batch, serialize_schema};
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::flight::data::*;
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message};

Expand Down Expand Up @@ -85,7 +85,7 @@ impl FlightService for FlightServiceImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down
11 changes: 4 additions & 7 deletions integration-testing/src/flight_server_scenarios/middleware.rs
Expand Up @@ -17,12 +17,9 @@

use std::pin::Pin;

use arrow_format::flight::{
flight_descriptor::DescriptorType, flight_service_server::FlightService,
flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
};
use arrow_format::flight::data::flight_descriptor::DescriptorType;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use futures::Stream;
use tonic::{transport::Server, Request, Response, Status, Streaming};

Expand Down Expand Up @@ -53,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down
1 change: 1 addition & 0 deletions src/doc/lib.md
Expand Up @@ -77,6 +77,7 @@ functionality, such as:
* `io_ipc_compression`: to read and write compressed Arrow IPC (v2)
* `io_csv` to read and write CSV
* `io_json` to read and write JSON
* `io_flight` to read and write to Arrow's Flight protocol
* `io_parquet` to read and write parquet
* `io_parquet_compression` to read and write compressed parquet
* `io_print` to write batches to formatted ASCII tables
Expand Down
2 changes: 1 addition & 1 deletion src/io/flight/mod.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::convert::TryFrom;

use arrow_format::flight::{FlightData, SchemaResult};
use arrow_format::flight::data::{FlightData, SchemaResult};
use arrow_format::ipc;

use crate::{
Expand Down
8 changes: 2 additions & 6 deletions src/io/ipc/read/read_basic.rs
@@ -1,19 +1,15 @@
use std::io::{Read, Seek, SeekFrom};
use std::{collections::VecDeque, convert::TryInto};

use arrow_format::ipc::Message::{BodyCompression, CompressionType};
use arrow_format::ipc;
use arrow_format::ipc::Message::{BodyCompression, CompressionType};

use crate::buffer::Buffer;
use crate::error::{ArrowError, Result};
<<<<<<< HEAD
use crate::io::ipc::endianess::is_native_little_endian;
use crate::io::ipc::gen::Message::{BodyCompression, CompressionType};
=======
>>>>>>> Migrated to arrow_format crate.
use crate::{bitmap::Bitmap, buffer::MutableBuffer, types::NativeType};

use super::super::compression;
use super::super::endianess::is_native_little_endian;

fn read_swapped<T: NativeType, R: Read + Seek>(
reader: &mut R,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/stream.rs
Expand Up @@ -18,16 +18,16 @@
use std::io::Read;
use std::sync::Arc;

use arrow_format::ipc::Schema::MetadataVersion;
use arrow_format::ipc;
use arrow_format::ipc::Schema::MetadataVersion;

use crate::array::*;
use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

use super::super::CONTINUATION_MARKER;
use super::super::convert;
use super::super::CONTINUATION_MARKER;
use super::common::*;

type ArrayRef = Arc<dyn Array>;
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/common.rs
Expand Up @@ -19,8 +19,8 @@
use std::io::Write;
use std::{collections::HashMap, sync::Arc};

use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::FlatBufferBuilder;

use crate::array::Array;
use crate::error::{ArrowError, Result};
Expand Down
11 changes: 1 addition & 10 deletions src/io/ipc/write/serialize.rs
Expand Up @@ -15,26 +15,17 @@
// specific language governing permissions and limitations
// under the License.

<<<<<<< HEAD
use crate::io::ipc::endianess::is_native_little_endian;
use crate::io::ipc::gen::Schema;
=======
use arrow_format::ipc::{Message, Schema};

>>>>>>> Migrated to arrow_format crate.
use crate::{
array::*,
bitmap::Bitmap,
datatypes::{DataType, PhysicalType},
<<<<<<< HEAD
io::ipc::gen::Message,
=======
endianess::is_native_little_endian,
>>>>>>> Migrated to arrow_format crate.
trusted_len::TrustedLen,
types::NativeType,
};

use super::super::endianess::is_native_little_endian;
use super::common::pad_to_8;

fn _write_primitive<T: NativeType>(
Expand Down

0 comments on commit f5de334

Please sign in to comment.