Skip to content

Commit

Permalink
apacheGH-34852: [C++][FlightRPC] Add support for ordered data
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 20, 2023
1 parent 0bf777a commit 93cd973
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 42 deletions.
21 changes: 11 additions & 10 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,27 @@ TEST(FlightTypes, FlightInfo) {
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc2, {}, -1, -1),
MakeFlightInfo(schema2, desc1, {}, -1, -1),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1),
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
MakeFlightInfo(schema2, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
"total_records=-1 total_bytes=42>",
"total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1>",
"[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
"ordered=false>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ class MiddlewareServer : public FlightServerBase {
// Return a fake location - the test doesn't read it
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 10010));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}
Expand Down Expand Up @@ -382,8 +382,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand All @@ -407,8 +407,8 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle, sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
return std::make_unique<FlightInfo>(result);
}

Expand Down Expand Up @@ -851,7 +851,7 @@ class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase {
perf_request.stream_count() * perf_request.records_per_stream();

*info = std::make_unique<FlightInfo>(
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1));
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false));
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {

info->total_records = pb_info.total_records();
info->total_bytes = pb_info.total_bytes();
info->ordered = pb_info.ordered();
return Status::OK();
}

Expand Down Expand Up @@ -236,6 +237,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {

pb_info->set_total_records(info.total_records());
pb_info->set_total_bytes(info.total_bytes());
pb_info->set_ordered(info.ordered());
return Status::OK();
}

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/flight/sql/example/acero_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ class AceroFlightSqlServer : public FlightSqlServerBase {
ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(plan));
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}};
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1));
ARROW_ASSIGN_OR_RAISE(
auto info,
FlightInfo::Make(schema, descriptor, std::move(endpoints),
/*total_records=*/-1, /*total_bytes=*/-1, /*ordered=*/false));
return std::make_unique<FlightInfo>(std::move(info));
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -305,7 +305,7 @@ class SQLiteFlightSqlServer::Impl {
EncodeTransactionQuery(query, command.transaction_id));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{std::move(ticket), {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down Expand Up @@ -392,7 +392,7 @@ class SQLiteFlightSqlServer::Impl {
auto result,
FlightInfo::Make(include_schema ? *SqlSchema::GetTablesSchemaWithIncludedSchema()
: *SqlSchema::GetTablesSchema(),
descriptor, endpoints, -1, -1))
descriptor, endpoints, -1, -1, false))

return std::make_unique<FlightInfo>(std::move(result));
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,9 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlServerBase::GetFlightInfoSql
}

std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(),
descriptor, endpoints, -1, -1))
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints,
-1, -1, false))

return std::make_unique<FlightInfo>(result);
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ std::unique_ptr<FlightServerBase> ExampleTestServer() {

FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes) {
int64_t total_records, int64_t total_bytes, bool ordered) {
EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor, endpoints,
total_records, total_bytes));
total_records, total_bytes, ordered));
return info;
}

Expand Down Expand Up @@ -600,10 +600,10 @@ std::vector<FlightInfo> ExampleFlightInfo() {
auto schema4 = ExampleFloatSchema();

return {
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000),
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false),
};
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ std::vector<ActionType> ExampleActionTypes();
ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes);
int64_t total_records, int64_t total_bytes, bool ordered);

// ----------------------------------------------------------------------
// A pair of authentication handlers that check for a predefined password
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,14 @@ Status Ticket::Deserialize(const std::string& serialized, Ticket* out) {
arrow::Result<FlightInfo> FlightInfo::Make(const Schema& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes) {
int64_t total_records, int64_t total_bytes,
bool ordered) {
FlightInfo::Data data;
data.descriptor = descriptor;
data.endpoints = endpoints;
data.total_records = total_records;
data.total_bytes = total_bytes;
data.ordered = ordered;
RETURN_NOT_OK(internal::SchemaToString(schema, &data.schema));
return FlightInfo(data);
}
Expand Down Expand Up @@ -355,6 +357,7 @@ std::string FlightInfo::ToString() const {
}
ss << "] total_records=" << data_.total_records;
ss << " total_bytes=" << data_.total_bytes;
ss << " ordered=" << (data_.ordered ? "true" : "false");
ss << '>';
return ss.str();
}
Expand All @@ -364,7 +367,8 @@ bool FlightInfo::Equals(const FlightInfo& other) const {
data_.descriptor == other.data_.descriptor &&
data_.endpoints == other.data_.endpoints &&
data_.total_records == other.data_.total_records &&
data_.total_bytes == other.data_.total_bytes;
data_.total_bytes == other.data_.total_bytes &&
data_.ordered == other.data_.ordered;
}

Location::Location() { uri_ = std::make_shared<arrow::internal::Uri>(); }
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
std::vector<FlightEndpoint> endpoints;
int64_t total_records;
int64_t total_bytes;
bool ordered;
};

explicit FlightInfo(Data data) : data_(std::move(data)), reconstructed_schema_(false) {}
Expand All @@ -524,7 +525,8 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
static arrow::Result<FlightInfo> Make(const Schema& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes);
int64_t total_records, int64_t total_bytes,
bool ordered = false);

/// \brief Deserialize the Arrow schema of the dataset. Populate any
/// dictionary encoded fields into a DictionaryMemo for
Expand Down Expand Up @@ -554,6 +556,9 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
/// The total number of bytes in the dataset. If unknown, set to -1
int64_t total_bytes() const { return data_.total_bytes; }

/// Whether endpoints are in the same order as the data.
bool ordered() const { return data_.ordered; }

/// \brief Get the wire-format representation of this type.
///
/// Useful when interoperating with non-Flight systems (e.g. REST
Expand Down
12 changes: 8 additions & 4 deletions docs/source/format/Flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ A client that wishes to download the data would:
An endpoint contains a list of locations (server addresses) where
this data can be retrieved from, and a ``Ticket``, an opaque binary
token that the server will use to identify the data being
requested. There is no ordering defined on endpoints or the data
within, so if the dataset is sorted, applications should return
data in a single endpoint.
requested. If ``FlightInfo.ordered`` is set, returned endpoints are
in the same order as the data. The client can read ordered data by
reading data from returned endpoints in order from front to
back. Note that a client may ignore ``FlightInfo.ordered``. If an
ordering is important and the client may ignore
``FlightInfo.ordered``, applications should return data in a single
endpoint.

The response also contains other metadata, like the schema, and
optionally an estimate of the dataset size.
Expand Down Expand Up @@ -216,7 +220,7 @@ Flight is primarily defined in terms of its Protobuf and gRPC
specification below, but Arrow implementations may also support
alternative transports (see :ref:`status-flight-rpc`). In that case,
implementations should use the following URI schemes for the given
transport implemenatations:
transport implementations:

+----------------------------+----------------------------+
| Transport | URI Scheme |
Expand Down
18 changes: 16 additions & 2 deletions format/Flight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,28 @@ message FlightInfo {
* In other words, an application can use multiple endpoints to
* represent partitioned data.
*
* There is no ordering defined on endpoints. Hence, if the returned
* data has an ordering, it should be returned in a single endpoint.
* If the returned data has an ordering, an application can use
* "FlightInfo.ordered = true" or should return the all data in a
* single endpoint.
*
* Note that a client may ignore "FlightInfo.ordered = true". If an
* ordering is important for an application, an application must
* choose one of them:
*
* * An application requires that all clients must read data in
* returned endpoints order.
* * An application must return the all data in a single endpoint.
*/
repeated FlightEndpoint endpoint = 3;

// Set these to -1 if unknown.
int64 total_records = 4;
int64 total_bytes = 5;

/*
* FlightEndpoints are in the same order as the data.
*/
bool ordered = 6;
}

/*
Expand Down

0 comments on commit 93cd973

Please sign in to comment.