Skip to content

Commit

Permalink
apacheGH-35375: [C++][FlightRPC] Add `arrow::flight::ServerCallContex…
Browse files Browse the repository at this point in the history
…t::incoming_headers()`
  • Loading branch information
kou committed May 1, 2023
1 parent 0ea1a10 commit d907f5f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
8 changes: 1 addition & 7 deletions cpp/src/arrow/flight/middleware.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,17 @@

#pragma once

#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <utility>

#include "arrow/flight/visibility.h" // IWYU pragma: keep
#include "arrow/flight/types.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

/// \brief Headers sent from the client or server.
///
/// Header values are ordered.
using CallHeaders = std::multimap<std::string_view, std::string_view>;

/// \brief A write-only wrapper around headers for an RPC call.
class ARROW_FLIGHT_EXPORT AddCallHeaders {
public:
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class ARROW_FLIGHT_EXPORT ServerCallContext {
/// \brief Check if the current RPC has been cancelled (by the client, by
/// a network error, etc.).
virtual bool is_cancelled() const = 0;
/// \brief The headers sent by the client for this call.
virtual const CallHeaders& incoming_headers() const = 0;
};

class ARROW_FLIGHT_EXPORT FlightServerOptions {
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/arrow/flight/transport/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,18 @@ class GrpcServerAuthSender : public ServerAuthSender {

class GrpcServerCallContext : public ServerCallContext {
explicit GrpcServerCallContext(::grpc::ServerContext* context)
: context_(context), peer_(context_->peer()) {}
: context_(context), peer_(context_->peer()) {
for (const auto& entry : context->client_metadata()) {
incoming_headers_.insert(
{std::string_view(entry.first.data(), entry.first.length()),
std::string_view(entry.second.data(), entry.second.length())});
}
}

const std::string& peer_identity() const override { return peer_identity_; }
const std::string& peer() const override { return peer_; }
bool is_cancelled() const override { return context_->IsCancelled(); }
const CallHeaders& incoming_headers() const override { return incoming_headers_; }

// Helper method that runs interceptors given the result of an RPC,
// then returns the final gRPC status to send to the client
Expand Down Expand Up @@ -156,6 +163,7 @@ class GrpcServerCallContext : public ServerCallContext {
std::string peer_identity_;
std::vector<std::shared_ptr<ServerMiddleware>> middleware_;
std::unordered_map<std::string, std::shared_ptr<ServerMiddleware>> middleware_map_;
CallHeaders incoming_headers_;
};

class GrpcAddServerHeaders : public AddCallHeaders {
Expand Down Expand Up @@ -310,17 +318,12 @@ class GrpcServiceHandler final : public FlightService::Service {
GrpcServerCallContext& flight_context) {
// Run server middleware
const CallInfo info{method};
CallHeaders incoming_headers;
for (const auto& entry : context->client_metadata()) {
incoming_headers.insert(
{std::string_view(entry.first.data(), entry.first.length()),
std::string_view(entry.second.data(), entry.second.length())});
}

GrpcAddServerHeaders outgoing_headers(context);
for (const auto& factory : middleware_) {
std::shared_ptr<ServerMiddleware> instance;
Status result = factory.second->StartCall(info, incoming_headers, &instance);
Status result =
factory.second->StartCall(info, flight_context.incoming_headers(), &instance);
if (!result.ok()) {
// Interceptor rejected call, end the request on all existing
// interceptors
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/transport/ucx/ucx_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ class UcxServerCallContext : public flight::ServerCallContext {
return nullptr;
}
bool is_cancelled() const override { return false; }
const CallHeaders& incoming_headers() const override { return incoming_headers_; }

private:
std::string peer_;
CallHeaders incoming_headers_;
};

class UcxServerStream : public internal::ServerDataStream {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -123,6 +124,11 @@ ARROW_FLIGHT_EXPORT
Status MakeFlightError(FlightStatusCode code, std::string message,
std::string extra_info = {});

/// \brief Headers sent from the client or server.
///
/// Header values are ordered.
using CallHeaders = std::multimap<std::string_view, std::string_view>;

/// \brief A TLS certificate plus key.
struct ARROW_FLIGHT_EXPORT CertKeyPair {
/// \brief The certificate in PEM format.
Expand Down

0 comments on commit d907f5f

Please sign in to comment.