From 711685f661e7894426203d53f2ea1c47438aa1d8 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 28 May 2021 15:03:06 +0200 Subject: [PATCH 1/4] k/response: introduced process_result_stages type Introduced a type aggregating futures representing two stages of kafka request processing. This way request handler will be able to decide which part of the processing should be executed in foreground (blocking other requests from being handled) and which can be executed in background asynchronous to other requests processing. Signed-off-by: Michal Maslanka --- src/v/kafka/server/response.h | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index 0691a50f2f0c..9b43a4721ef3 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -16,6 +16,7 @@ #include "kafka/protocol/types.h" #include "seastarx.h" +#include #include #include @@ -54,4 +55,43 @@ class response { using response_ptr = ss::foreign_ptr>; +struct process_result_stages { + process_result_stages( + ss::future<> dispatched_f, ss::future response_f) + : dispatched(std::move(dispatched_f)) + , response(std::move(response_f)) {} + + explicit process_result_stages(response_ptr response) + : dispatched(ss::now()) + , response(ss::make_ready_future(std::move(response))) {} + + /** + * Single stage method is a helper to execute whole request in foreground. + * The dispatch phase if finished after response phase this way when + * response is processed in background it is already resolved future. + */ + static process_result_stages single_stage(ss::future f) { + ss::promise response; + auto response_f = response.get_future(); + auto dispatch = f.then_wrapped( + [response = std::move(response)](ss::future f) mutable { + try { + auto r = f.get(); + response.set_value(std::move(r)); + } catch (...) { + response.set_exception(std::current_exception()); + } + }); + + return process_result_stages( + std::move(dispatch), std::move(response_f)); + } + + // after this future resolved request is dispatched for processing and + // processing order is set + ss::future<> dispatched; + // the response future is intended to be executed in background + ss::future response; +}; + } // namespace kafka From 74125369ccf0291a6e2f79690dbaa31044921252 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 28 May 2021 15:14:50 +0200 Subject: [PATCH 2/4] k/requests: split execution of request processing in two stages Split handling of Kafka request into two stages. Dispatch stage is executed in foreground while the second stage is executed in background. This way we can leverage the fact that request processing order is established before its processing completely finished and handle multiple requests at the time without compromising correct ordering. Signed-off-by: Michal Maslanka --- src/v/kafka/server/connection_context.cc | 88 +++++++++++++----------- src/v/kafka/server/request_context.h | 3 +- src/v/kafka/server/requests.cc | 57 +++++++-------- 3 files changed, 77 insertions(+), 71 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 8bce82c328bd..4f5be8130876 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -115,7 +115,8 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { std::move(request_buf), 0s); auto resp = co_await kafka::process_request( - std::move(ctx), _proto.smp_group()); + std::move(ctx), _proto.smp_group()) + .response; auto data = std::move(*resp).release(); response.decode(std::move(data), version); } @@ -224,16 +225,14 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { std::move(buf), sres.backpressure_delay); /* - * until authentication is complete, process requests in order - * since all subsequent requests are dependent on authentication - * having completed. + * we process requests in order since all subsequent requests + * are dependent on authentication having completed. * - * the other important reason for disabling pipeling before - * authentication completes is because when a sasl handshake - * with version=0 is processed, the next data on the wire is - * _not_ another request: it is a size-prefixed authentication - * payload without a request envelope, and requires special - * handling. + * the other important reason for disabling pipeling is because + * when a sasl handshake with version=0 is processed, the next + * data on the wire is _not_ another request: it is a + * size-prefixed authentication payload without a request + * envelope, and requires special handling. * * a well behaved client should implicitly provide a data stream * that invokes this behavior in the server: that is, it won't @@ -245,47 +244,52 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { * stream at best and at worst some odd behavior. */ - if (unlikely(!sasl().complete())) { - return do_process(std::move(rctx)) - .handle_exception([self](std::exception_ptr e) { - vlog( - klog.info, - "Detected error processing request: {}", - e); - self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(sres), self] {}); - } - - (void)ss::try_with_gate( - _rs.conn_gate(), - [this, rctx = std::move(rctx)]() mutable { - return do_process(std::move(rctx)); + const auto correlation = rctx.header().correlation; + const sequence_id seq = _seq_idx; + _seq_idx = _seq_idx + sequence_id(1); + auto res = kafka::process_request( + std::move(rctx), _proto.smp_group()); + /** + * first stage processed in a foreground. + */ + return res.dispatched + .then([this, + f = std::move(res.response), + seq, + correlation, + self, + s = std::move(sres)]() mutable { + /** + * second stage processed in background. + */ + (void)ss::try_with_gate( + _rs.conn_gate(), + [this, f = std::move(f), seq, correlation]() mutable { + return f.then( + [this, seq, correlation](response_ptr r) mutable { + r->set_correlation(correlation); + _responses.insert({seq, std::move(r)}); + return process_next_response(); + }); + }) + .handle_exception([self](std::exception_ptr e) { + vlog( + klog.info, + "Detected error processing request: {}", + e); + self->_rs.conn->shutdown_input(); + }) + .finally([s = std::move(s), self] {}); }) .handle_exception([self](std::exception_ptr e) { vlog( klog.info, "Detected error processing request: {}", e); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(sres), self] {}); - - return ss::now(); + }); }); }); } -ss::future<> connection_context::do_process(request_context ctx) { - const auto correlation = ctx.header().correlation; - const sequence_id seq = _seq_idx; - _seq_idx = _seq_idx + sequence_id(1); - return kafka::process_request(std::move(ctx), _proto.smp_group()) - .then([this, seq, correlation](response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return process_next_response(); - }); -} - ss::future<> connection_context::process_next_response() { return ss::repeat([this]() mutable { auto it = _responses.find(_next_response); diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 176024940764..0a38f4c8bf5e 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -184,7 +184,6 @@ class request_context { }; // Executes the API call identified by the specified request_context. -ss::future -process_request(request_context&&, ss::smp_service_group); +process_result_stages process_request(request_context&&, ss::smp_service_group); } // namespace kafka diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 22e56f3281e6..e49c243bd34f 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -24,18 +24,18 @@ namespace kafka { template CONCEPT(requires(KafkaApiHandler)) struct process_dispatch { - static ss::future + static process_result_stages process(request_context&& ctx, ss::smp_service_group g) { if ( ctx.header().version < Request::min_supported || ctx.header().version > Request::max_supported) { - return ss::make_exception_future( - std::runtime_error(fmt::format( - "Unsupported version {} for {} API", - ctx.header().version, - Request::api::name))); + throw std::runtime_error(fmt::format( + "Unsupported version {} for {} API", + ctx.header().version, + Request::api::name)); } - return Request::handle(std::move(ctx), g); + return process_result_stages::single_stage( + Request::handle(std::move(ctx), g)); } }; @@ -48,16 +48,17 @@ struct process_dispatch { */ template<> struct process_dispatch { - static ss::future + static process_result_stages process(request_context&& ctx, ss::smp_service_group g) { - return api_versions_handler::handle(std::move(ctx), g); + return process_result_stages::single_stage( + api_versions_handler::handle(std::move(ctx), g)); } }; template CONCEPT(requires(KafkaApiHandler)) -ss::future do_process( - request_context&& ctx, ss::smp_service_group g) { +process_result_stages + do_process(request_context&& ctx, ss::smp_service_group g) { vlog( klog.trace, "Processing name:{}, key:{}, version:{} for {}", @@ -85,7 +86,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) { conn->sasl().set_handshake_v0(); } return do_process(std::move(ctx), g) - .then([conn = std::move(conn)](response_ptr r) { + .response.then([conn = std::move(conn)](response_ptr r) { if (conn->sasl().has_mechanism()) { conn->sasl().set_state( security::sasl_server::sasl_state::authenticate); @@ -146,7 +147,7 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { } auto conn = ctx.connection(); return do_process(std::move(ctx), g) - .then([conn = std::move(conn)](response_ptr r) { + .response.then([conn = std::move(conn)](response_ptr r) { /* * there may be multiple authentication round-trips so it is fine * to return without entering an end state like complete/failed. @@ -179,7 +180,7 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { } } -ss::future +process_result_stages process_request(request_context&& ctx, ss::smp_service_group g) { /* * requests are handled as normal when auth is disabled. otherwise no @@ -187,14 +188,15 @@ process_request(request_context&& ctx, ss::smp_service_group g) { */ if (unlikely(!ctx.sasl().complete())) { auto conn = ctx.connection(); - return handle_auth(std::move(ctx), g) - .then_wrapped([conn](ss::future f) { - if (f.failed()) { - conn->sasl().set_state( - security::sasl_server::sasl_state::failed); - } - return f; - }); + return process_result_stages::single_stage( + handle_auth(std::move(ctx), g) + .then_wrapped([conn](ss::future f) { + if (f.failed()) { + conn->sasl().set_state( + security::sasl_server::sasl_state::failed); + } + return f; + })); } switch (ctx.header().key) { @@ -235,14 +237,15 @@ process_request(request_context&& ctx, ss::smp_service_group g) { case describe_groups_handler::api::key: return do_process(std::move(ctx), g); case sasl_handshake_handler::api::key: - return ctx.respond( - sasl_handshake_response(error_code::illegal_sasl_state, {})); + return process_result_stages::single_stage(ctx.respond( + sasl_handshake_response(error_code::illegal_sasl_state, {}))); case sasl_authenticate_handler::api::key: { sasl_authenticate_response_data data{ .error_code = error_code::illegal_sasl_state, .error_message = "Authentication process already completed", }; - return ctx.respond(sasl_authenticate_response(std::move(data))); + return process_result_stages::single_stage( + ctx.respond(sasl_authenticate_response(std::move(data)))); } case init_producer_id_handler::api::key: return do_process(std::move(ctx), g); @@ -267,8 +270,8 @@ process_request(request_context&& ctx, ss::smp_service_group g) { case end_txn_handler::api::key: return do_process(std::move(ctx), g); }; - return ss::make_exception_future( - std::runtime_error(fmt::format("Unsupported API {}", ctx.header().key))); + throw std::runtime_error( + fmt::format("Unsupported API {}", ctx.header().key)); } std::ostream& operator<<(std::ostream& os, const request_header& header) { From 275091011e00dbaf3ffb128fb1a93e1149f8a521 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 28 May 2021 15:40:44 +0200 Subject: [PATCH 3/4] k/produce: handling produce request in two stages Implemented two stage handling of produce request. The two phases of produce request processing are reflected in two phases of `cluster::partition::replicate` this way redpanda can handle multiple requests per connection while still not changing the request processing ordering. Signed-off-by: Michal Maslanka --- src/v/kafka/server/handlers/handler.h | 9 + src/v/kafka/server/handlers/produce.cc | 408 +++++++++++++-------- src/v/kafka/server/handlers/produce.h | 9 +- src/v/kafka/server/replicated_partition.cc | 15 +- src/v/kafka/server/replicated_partition.h | 2 +- src/v/kafka/server/requests.cc | 15 +- 6 files changed, 302 insertions(+), 156 deletions(-) diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 6a902b02b483..61fbd120dc82 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -41,6 +41,15 @@ concept KafkaApiHandler = requires (T h, request_context&& ctx, ss::smp_service_ { T::handle(std::move(ctx), g) } -> std::same_as>; }; ) +CONCEPT( +template +concept KafkaApiTwoPhaseHandler = requires (T h, request_context&& ctx, ss::smp_service_group g) { + KafkaApi; + { T::min_supported } -> std::convertible_to; + { T::max_supported } -> std::convertible_to; + { T::handle(std::move(ctx), g) } -> std::same_as; +}; +) // clang-format on } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index a75bdd4e14d3..296284e3d7dc 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -78,6 +78,24 @@ struct produce_ctx { , ssg(ssg) {} }; +struct partition_produce_stages { + ss::future<> dispatched; + ss::future produced; +}; + +struct topic_produce_stages { + ss::future<> dispatched; + ss::future produced; +}; + +partition_produce_stages make_ready_stage(produce_response::partition p) { + return partition_produce_stages{ + .dispatched = ss::now(), + .produced = ss::make_ready_future( + std::move(p)), + }; +} + static raft::replicate_options acks_to_replicate_options(int16_t acks) { switch (acks) { case -1: @@ -136,40 +154,45 @@ static error_code map_produce_error_code(std::error_code ec) { * Caller is expected to catch errors that may be thrown while the kafka * batch is being deserialized (see reader_from_kafka_batch). */ -static ss::future partition_append( +static partition_produce_stages partition_append( model::partition_id id, ss::lw_shared_ptr partition, model::batch_identity bid, model::record_batch_reader reader, int16_t acks, int32_t num_records) { - return partition - ->replicate(bid, std::move(reader), acks_to_replicate_options(acks)) - .then_wrapped([partition, id, num_records = num_records]( - ss::future> f) { - produce_response::partition p{.partition_index = id}; - try { - auto r = f.get0(); - if (r.has_value()) { - // have to subtract num_of_records - 1 as base_offset - // is inclusive - p.base_offset = model::offset(r.value() - (num_records - 1)); - p.error_code = error_code::none; - partition->probe().add_records_produced(num_records); - } else { - p.error_code = map_produce_error_code(r.error()); - } - } catch (...) { - p.error_code = error_code::unknown_server_error; - } - return p; - }); + auto stages = partition->replicate( + bid, std::move(reader), acks_to_replicate_options(acks)); + return partition_produce_stages{ + .dispatched = std::move(stages.request_enqueued), + .produced = stages.replicate_finished.then_wrapped( + [partition, id, num_records = num_records]( + ss::future> f) { + produce_response::partition p{.partition_index = id}; + try { + auto r = f.get0(); + if (r.has_value()) { + // have to subtract num_of_records - 1 as base_offset + // is inclusive + p.base_offset = model::offset( + r.value().last_offset - (num_records - 1)); + p.error_code = error_code::none; + partition->probe().add_records_produced(num_records); + } else { + p.error_code = map_produce_error_code(r.error()); + } + } catch (...) { + p.error_code = error_code::unknown_server_error; + } + return p; + }), + }; } /** * \brief handle writing to a single topic partition. */ -static ss::future produce_topic_partition( +static partition_produce_stages produce_topic_partition( produce_ctx& octx, produce_request::topic& topic, produce_request::partition& part) { @@ -183,10 +206,9 @@ static ss::future produce_topic_partition( auto shard = octx.rctx.shards().shard_for(ntp); if (!shard) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::unknown_topic_or_partition}); + return make_ready_stage(produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::unknown_topic_or_partition}); } // steal the batch from the adapter @@ -214,55 +236,106 @@ static ss::future produce_topic_partition( auto num_records = batch.record_count(); auto reader = reader_from_lcore_batch(std::move(batch)); auto start = std::chrono::steady_clock::now(); - auto f = octx.rctx.partition_manager().invoke_on( - *shard, - octx.ssg, - [reader = std::move(reader), - ntp = std::move(ntp), - num_records, - bid, - acks = octx.request.data.acks](cluster::partition_manager& mgr) mutable { - auto partition = mgr.get(ntp); - if (!partition) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::unknown_topic_or_partition}); - } - if (unlikely(!partition->is_leader())) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::not_leader_for_partition}); - } - return partition_append( - ntp.tp.partition, - ss::make_lw_shared(std::move(partition)), - bid, - std::move(reader), - acks, - num_records); - }); - return f.then([&octx, start](produce_response::partition p) { - if (p.error_code == error_code::none) { - auto dur = std::chrono::steady_clock::now() - start; - octx.rctx.connection()->server().update_produce_latency(dur); - } - return p; - }); + + auto dispatch = std::make_unique>(); + auto dispatch_f = dispatch->get_future(); + auto f + = octx.rctx.partition_manager() + .invoke_on( + *shard, + octx.ssg, + [reader = std::move(reader), + ntp = std::move(ntp), + dispatch = std::move(dispatch), + num_records, + bid, + acks = octx.request.data.acks, + source_shard = ss::this_shard_id()]( + cluster::partition_manager& mgr) mutable { + auto partition = mgr.get(ntp); + if (!partition) { + // submit back to promise source shard + (void)ss::smp::submit_to( + source_shard, [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + return ss::make_ready_future( + produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::unknown_topic_or_partition}); + } + if (unlikely(!partition->is_leader())) { + // submit back to promise source shard + (void)ss::smp::submit_to( + source_shard, [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + return ss::make_ready_future( + produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::not_leader_for_partition}); + } + auto stages = partition_append( + ntp.tp.partition, + ss::make_lw_shared( + std::move(partition)), + bid, + std::move(reader), + acks, + num_records); + return stages.dispatched + .then_wrapped([source_shard, dispatch = std::move(dispatch)]( + ss::future<> f) mutable { + if (f.failed()) { + (void)ss::smp::submit_to( + source_shard, + [dispatch = std::move(dispatch), + e = f.get_exception()]() mutable { + dispatch->set_exception(e); + dispatch.reset(); + }); + return; + } + (void)ss::smp::submit_to( + source_shard, + [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + }) + .then([f = std::move(stages.produced)]() mutable { + return std::move(f); + }); + }) + .then([&octx, start](produce_response::partition p) { + if (p.error_code == error_code::none) { + auto dur = std::chrono::steady_clock::now() - start; + octx.rctx.connection()->server().update_produce_latency(dur); + } + return p; + }); + return partition_produce_stages{ + .dispatched = std::move(dispatch_f), + .produced = std::move(f), + }; } /** * \brief Dispatch and collect topic partition produce responses */ -static ss::future +static topic_produce_stages produce_topic(produce_ctx& octx, produce_request::topic& topic) { - std::vector> partitions; - partitions.reserve(topic.partitions.size()); + std::vector> partitions_produced; + std::vector> partitions_dispatched; + partitions_produced.reserve(topic.partitions.size()); + partitions_dispatched.reserve(topic.partitions.size()); for (auto& part : topic.partitions) { if (!octx.rctx.authorized(security::acl_operation::write, topic.name)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -273,7 +346,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if (!octx.rctx.metadata_cache().contains( model::topic_namespace_view(model::kafka_namespace, topic.name), part.partition_index)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -283,7 +357,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { // the record data on the wire was null value if (unlikely(!part.records)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -293,7 +368,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { // an error occured handling legacy messages (magic 0 or 1) if (unlikely(part.records->adapter.legacy_error)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -302,7 +378,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { } if (unlikely(!part.records->adapter.valid_crc)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -320,7 +397,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if (unlikely( !part.records->adapter.v2_format || !part.records->adapter.batch)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -329,38 +407,42 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { } auto pr = produce_topic_partition(octx, topic, part); - partitions.push_back(std::move(pr)); + partitions_produced.push_back(std::move(pr.produced)); + partitions_dispatched.push_back(std::move(pr.dispatched)); } // collect partition responses and build the topic response - return when_all_succeed(partitions.begin(), partitions.end()) - .then([name = std::move(topic.name)]( - std::vector parts) mutable { - return produce_response::topic{ - .name = std::move(name), - .partitions = std::move(parts), - }; - }); + return topic_produce_stages{ + .dispatched = ss::when_all_succeed( + partitions_dispatched.begin(), partitions_dispatched.end()), + .produced + = ss::when_all_succeed( + partitions_produced.begin(), partitions_produced.end()) + .then([name = std::move(topic.name)]( + std::vector parts) mutable { + return produce_response::topic{ + .name = std::move(name), + .partitions = std::move(parts), + }; + }), + }; } /** * \brief Dispatch and collect topic produce responses */ -static std::vector> -produce_topics(produce_ctx& octx) { - std::vector> topics; +static std::vector produce_topics(produce_ctx& octx) { + std::vector topics; topics.reserve(octx.request.data.topics.size()); for (auto& topic : octx.request.data.topics) { - auto tr = produce_topic(octx, topic); - topics.push_back(std::move(tr)); + topics.push_back(produce_topic(octx, topic)); } return topics; } -template<> -ss::future +process_result_stages produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { produce_request request; request.decode(ctx.reader(), ctx.header().version); @@ -390,8 +472,9 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { */ if (request.has_transactional) { if (!ctx.are_transactions_enabled()) { - return ctx.respond(request.make_error_response( - error_code::transactional_id_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::transactional_id_authorization_failed))); } if ( @@ -399,8 +482,9 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { || !ctx.authorized( security::acl_operation::write, transactional_id(*request.data.transactional_id))) { - return ctx.respond(request.make_error_response( - error_code::transactional_id_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::transactional_id_authorization_failed))); } // Note that authorization to a transactionalId implies // ProducerId authorization @@ -409,13 +493,15 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { if (!ctx.authorized( security::acl_operation::idempotent_write, security::default_cluster_name)) { - return ctx.respond(request.make_error_response( - error_code::cluster_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::cluster_authorization_failed))); } if (!ctx.is_idempotence_enabled()) { - return ctx.respond(request.make_error_response( - error_code::cluster_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::cluster_authorization_failed))); } } else if (request.data.acks < -1 || request.data.acks > 1) { @@ -428,65 +514,95 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { "configuration/" "producer-configs.html", request.data.acks); - return ctx.respond( - request.make_error_response(error_code::invalid_required_acks)); + return process_result_stages::single_stage(ctx.respond( + request.make_error_response(error_code::invalid_required_acks))); } - - return ss::do_with( + ss::promise<> dispatched_promise; + auto dispatched_f = dispatched_promise.get_future(); + auto produced_f = ss::do_with( produce_ctx(std::move(ctx), std::move(request), ssg), - [](produce_ctx& octx) { + [dispatched_promise = std::move(dispatched_promise)]( + produce_ctx& octx) mutable { vlog(klog.trace, "handling produce request {}", octx.request); // dispatch produce requests for each topic - auto topics = produce_topics(octx); - - // collect topic responses - return when_all_succeed(topics.begin(), topics.end()) - .then([&octx](std::vector topics) { - octx.response.data.responses = std::move(topics); - }) - .then([&octx] { - // send response immediately - if (octx.request.data.acks != 0) { - return octx.rctx.respond(std::move(octx.response)); - } - - // acks = 0 is handled separately. first, check for - // errors - bool has_error = false; - for (const auto& topic : octx.response.data.responses) { - for (const auto& p : topic.partitions) { - if (p.error_code != error_code::none) { - has_error = true; - break; - } - } - } - - // in the absense of errors, acks = 0 results in the - // response being dropped, as the client does not expect - // a response. here we mark the response as noop, but - // let it flow back so that it can be accounted for in - // quota and stats tracking. it is dropped later during - // processing. - if (!has_error) { - return octx.rctx.respond(std::move(octx.response)) - .then([](response_ptr resp) { - resp->mark_noop(); - return resp; + auto stages = produce_topics(octx); + std::vector> dispatched; + std::vector> produced; + dispatched.reserve(stages.size()); + produced.reserve(stages.size()); + + for (auto& s : stages) { + dispatched.push_back(std::move(s.dispatched)); + produced.push_back(std::move(s.produced)); + } + return seastar::when_all_succeed(dispatched.begin(), dispatched.end()) + .then_wrapped([&octx, + dispatched_promise = std::move(dispatched_promise), + produced = std::move(produced)]( + ss::future<> f) mutable { + try { + f.get(); + dispatched_promise.set_value(); + // collect topic responses + return when_all_succeed(produced.begin(), produced.end()) + .then( + [&octx](std::vector topics) { + octx.response.data.responses = std::move(topics); + }) + .then([&octx] { + // send response immediately + if (octx.request.data.acks != 0) { + return octx.rctx.respond( + std::move(octx.response)); + } + + // acks = 0 is handled separately. first, check for + // errors + bool has_error = false; + for (const auto& topic : + octx.response.data.responses) { + for (const auto& p : topic.partitions) { + if (p.error_code != error_code::none) { + has_error = true; + break; + } + } + } + + // in the absense of errors, acks = 0 results in the + // response being dropped, as the client does not + // expect a response. here we mark the response as + // noop, but let it flow back so that it can be + // accounted for in quota and stats tracking. it is + // dropped later during processing. + if (!has_error) { + return octx.rctx.respond(std::move(octx.response)) + .then([](response_ptr resp) { + resp->mark_noop(); + return resp; + }); + } + + // errors in a response from an acks=0 produce request + // result in the connection being dropped to signal an + // issue to the client + return ss::make_exception_future( + std::runtime_error(fmt::format( + "Closing connection due to error in produce " + "response: {}", + octx.response))); }); + } catch (...) { + dispatched_promise.set_exception(std::current_exception()); + return ss::make_exception_future( + std::current_exception()); } - - // errors in a response from an acks=0 produce request - // result in the connection being dropped to signal an - // issue to the client - return ss::make_exception_future( - std::runtime_error(fmt::format( - "Closing connection due to error in produce " - "response: {}", - octx.response))); }); }); + + return process_result_stages( + std::move(dispatched_f), std::move(produced_f)); } } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.h b/src/v/kafka/server/handlers/produce.h index 179deb8e170e..a8a5aa73baea 100644 --- a/src/v/kafka/server/handlers/produce.h +++ b/src/v/kafka/server/handlers/produce.h @@ -14,6 +14,11 @@ namespace kafka { -using produce_handler = handler; +struct produce_handler { + using api = produce_api; + static constexpr api_version min_supported = api_version(0); + static constexpr api_version max_supported = api_version(7); + static process_result_stages handle(request_context, ss::smp_service_group); +}; -} +} // namespace kafka diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 8aa38bf61cc5..0cd8c7787b50 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -12,6 +12,7 @@ #include "kafka/protocol/errors.h" #include "model/fundamental.h" +#include "raft/types.h" #include "storage/types.h" #include @@ -108,17 +109,21 @@ ss::future> replicated_partition::replicate( return ret_t(_translator->to_kafka_offset(r.value().last_offset)); }); } -ss::future> replicated_partition::replicate( + +raft::replicate_stages replicated_partition::replicate( model::batch_identity batch_id, model::record_batch_reader&& rdr, raft::replicate_options opts) { - using ret_t = result; - return _partition->replicate(batch_id, std::move(rdr), opts) - .then([this](result r) { + using ret_t = result; + auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); + res.replicate_finished = res.replicate_finished.then( + [this](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->to_kafka_offset(r.value().last_offset)); + return ret_t(raft::replicate_result{ + _translator->to_kafka_offset(r.value().last_offset)}); }); + return res; } } // namespace kafka diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 5af41407da29..1499eed46bc4 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -49,7 +49,7 @@ class replicated_partition final : public kafka::partition_proxy::impl { ss::future> replicate(model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + raft::replicate_stages replicate( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index e49c243bd34f..2dbb5e653bf8 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "kafka/server/handlers/handlers.h" +#include "kafka/server/handlers/produce.h" #include "kafka/server/request_context.h" #include "kafka/types.h" #include "utils/to_string.h" @@ -22,7 +23,7 @@ namespace kafka { * Dispatch request with version bounds checking. */ template -CONCEPT(requires(KafkaApiHandler)) +CONCEPT(requires(KafkaApiHandler || KafkaApiTwoPhaseHandler)) struct process_dispatch { static process_result_stages process(request_context&& ctx, ss::smp_service_group g) { @@ -54,9 +55,19 @@ struct process_dispatch { api_versions_handler::handle(std::move(ctx), g)); } }; +/** + * Produce request is currently the only one leveraging two stage processing + */ +template<> +struct process_dispatch { + static process_result_stages + process(request_context&& ctx, ss::smp_service_group g) { + return produce_handler::handle(std::move(ctx), g); + } +}; template -CONCEPT(requires(KafkaApiHandler)) +CONCEPT(requires(KafkaApiHandler || KafkaApiTwoPhaseHandler)) process_result_stages do_process(request_context&& ctx, ss::smp_service_group g) { vlog( From d0fab2c6b38b71b92810ca73dd4450395a17f9ab Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Mon, 31 May 2021 11:50:05 +0200 Subject: [PATCH 4/4] k/offset_commit: processing offset commit request in two stages Offsets commit handler uses raft to replicate offset commit requests. Leveraging raft two stage replicate processing to to handle multiple in-flight offset commit requests and prevent contention. Signed-off-by: Michal Maslanka --- src/v/kafka/server/group.cc | 35 ++++++------ src/v/kafka/server/group.h | 21 ++++++-- src/v/kafka/server/group_manager.cc | 7 ++- src/v/kafka/server/group_manager.h | 3 +- src/v/kafka/server/group_router.h | 57 +++++++++++++++++++- src/v/kafka/server/handlers/offset_commit.cc | 28 ++++++---- src/v/kafka/server/handlers/offset_commit.h | 8 ++- src/v/kafka/server/requests.cc | 10 +++- 8 files changed, 128 insertions(+), 41 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 61f1000f07c3..dd25389eb53c 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1600,8 +1600,7 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response(r, error_code::none); } -ss::future -group::store_offsets(offset_commit_request&& r) { +group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { cluster::simple_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); @@ -1642,12 +1641,13 @@ group::store_offsets(offset_commit_request&& r) { auto batch = std::move(builder).build(); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - return _partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) - .then([this, req = std::move(r), commits = std::move(offset_commits)]( - result r) mutable { + auto replicate_stages = _partition->replicate_in_stages( + std::move(reader), + raft::replicate_options(raft::consistency_level::quorum_ack)); + + auto f = replicate_stages.replicate_finished.then( + [this, req = std::move(r), commits = std::move(offset_commits)]( + result r) mutable { error_code error = r ? error_code::none : error_code::not_coordinator; if (in_state(group_state::dead)) { return offset_commit_response(req, error); @@ -1666,6 +1666,8 @@ group::store_offsets(offset_commit_request&& r) { return offset_commit_response(req, error); }); + return offset_commit_stages( + std::move(replicate_stages.request_enqueued), std::move(f)); } ss::future @@ -1780,10 +1782,10 @@ group::handle_abort_tx(cluster::abort_group_tx_request r) { } } -ss::future +group::offset_commit_stages group::handle_offset_commit(offset_commit_request&& r) { if (in_state(group_state::dead)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::coordinator_not_available)); } else if (r.data.generation_id < 0 && in_state(group_state::empty)) { @@ -1791,11 +1793,11 @@ group::handle_offset_commit(offset_commit_request&& r) { return store_offsets(std::move(r)); } else if (!contains_member(r.data.member_id)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::unknown_member_id)); } else if (r.data.generation_id != generation()) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::illegal_generation)); } else if ( in_state(group_state::stable) @@ -1807,12 +1809,13 @@ group::handle_offset_commit(offset_commit_request&& r) { schedule_next_heartbeat_expiration(member); return store_offsets(std::move(r)); } else if (in_state(group_state::completing_rebalance)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::rebalance_in_progress)); } else { - return ss::make_exception_future( - std::runtime_error( - fmt::format("Unexpected group state {} for {}", _state, *this))); + return offset_commit_stages( + ss::now(), + ss::make_exception_future(std::runtime_error( + fmt::format("Unexpected group state {} for {}", _state, *this)))); } } diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index d30ae62d2f66..1d39610feab9 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -14,6 +14,7 @@ #include "cluster/partition.h" #include "cluster/tx_utils.h" #include "kafka/protocol/fwd.h" +#include "kafka/protocol/offset_commit.h" #include "kafka/server/logger.h" #include "kafka/server/member.h" #include "kafka/types.h" @@ -114,6 +115,21 @@ class group { static constexpr int8_t commit_tx_record_version{0}; static constexpr int8_t aborted_tx_record_version{0}; + struct offset_commit_stages { + explicit offset_commit_stages(offset_commit_response resp) + : dispatched(ss::now()) + , committed( + ss::make_ready_future(std::move(resp))) {} + + offset_commit_stages( + ss::future<> dispatched, ss::future resp) + : dispatched(std::move(dispatched)) + , committed(std::move(resp)) {} + + ss::future<> dispatched; + ss::future committed; + }; + struct offset_metadata { model::offset log_offset; model::offset offset; @@ -430,7 +446,7 @@ class group { ss::future store_txn_offsets(txn_offset_commit_request r); - ss::future store_offsets(offset_commit_request&& r); + offset_commit_stages store_offsets(offset_commit_request&& r); ss::future handle_txn_offset_commit(txn_offset_commit_request r); @@ -444,8 +460,7 @@ class group { ss::future handle_abort_tx(cluster::abort_group_tx_request r); - ss::future - handle_offset_commit(offset_commit_request&& r); + offset_commit_stages handle_offset_commit(offset_commit_request&& r); ss::future handle_commit_tx(cluster::commit_group_tx_request r); diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 4d0914f45a2b..e441dd453af3 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -856,13 +856,12 @@ group_manager::abort_tx(cluster::abort_group_tx_request&& r) { }); } -ss::future +group::offset_commit_stages group_manager::offset_commit(offset_commit_request&& r) { auto error = validate_group_status( r.ntp, r.data.group_id, offset_commit_api::key); if (error != error_code::none) { - return ss::make_ready_future( - offset_commit_response(r, error)); + return group::offset_commit_stages(offset_commit_response(r, error)); } auto group = get_group(r.data.group_id); @@ -878,7 +877,7 @@ group_manager::offset_commit(offset_commit_request&& r) { } else { // or this is a request coming from an older generation. // either way, reject the commit - return ss::make_ready_future( + return group::offset_commit_stages( offset_commit_response(r, error_code::illegal_generation)); } } diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index d8fe7f03c0eb..0d1f6f06fa4f 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -135,8 +135,7 @@ class group_manager { ss::future leave_group(leave_group_request&& request); /// \brief Handle a OffsetCommit request - ss::future - offset_commit(offset_commit_request&& request); + group::offset_commit_stages offset_commit(offset_commit_request&& request); ss::future txn_offset_commit(txn_offset_commit_request&& request); diff --git a/src/v/kafka/server/group_router.h b/src/v/kafka/server/group_router.h index 1e4897ac55c6..0a4ea5d8d90f 100644 --- a/src/v/kafka/server/group_router.h +++ b/src/v/kafka/server/group_router.h @@ -29,7 +29,9 @@ #include #include #include +#include +#include #include namespace kafka { @@ -129,8 +131,59 @@ class group_router final { return route(std::move(request), &group_manager::leave_group); } - auto offset_commit(offset_commit_request&& request) { - return route(std::move(request), &group_manager::offset_commit); + group::offset_commit_stages offset_commit(offset_commit_request&& request) { + auto m = shard_for(request.data.group_id); + if (!m) { + return group::offset_commit_stages( + offset_commit_response(request, error_code::not_coordinator)); + } + request.ntp = std::move(m->first); + auto dispatched = std::make_unique>(); + auto dispatched_f = dispatched->get_future(); + auto f = with_scheduling_group( + _sg, + [this, + shard = m->second, + request = std::move(request), + dispatched = std::move(dispatched)]() mutable { + return _group_manager.invoke_on( + shard, + _ssg, + [request = std::move(request), + dispatched = std::move(dispatched), + source_shard = ss::this_shard_id()]( + group_manager& mgr) mutable { + auto stages = mgr.offset_commit(std::move(request)); + /** + * dispatched future is always ready before committed one, + * we do not have to use gate in here + */ + return stages.dispatched + .then_wrapped([source_shard, d = std::move(dispatched)]( + ss::future<> f) mutable { + if (f.failed()) { + (void)ss::smp::submit_to( + source_shard, + [d = std::move(d), + e = f.get_exception()]() mutable { + d->set_exception(e); + d.reset(); + }); + return; + } + (void)ss::smp::submit_to( + source_shard, [d = std::move(d)]() mutable { + d->set_value(); + d.reset(); + }); + }) + .then([f = std::move(stages.committed)]() mutable { + return std::move(f); + }); + }); + }); + return group::offset_commit_stages( + std::move(dispatched_f), std::move(f)); } auto txn_offset_commit(txn_offset_commit_request&& request) { diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 61327ca7285f..a9f2f94ac1f3 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -53,16 +53,15 @@ struct offset_commit_ctx { , ssg(ssg) {} }; -template<> -ss::future +process_result_stages offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { offset_commit_request request; request.decode(ctx.reader(), ctx.header().version); klog.trace("Handling request {}", request); if (request.data.group_instance_id) { - return ctx.respond( - offset_commit_response(request, error_code::unsupported_version)); + return process_result_stages::single_stage(ctx.respond( + offset_commit_response(request, error_code::unsupported_version))); } // check authorization for this group @@ -186,13 +185,18 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { .partitions = std::move(topic.second), }); } - return octx.rctx.respond(std::move(resp)); + return process_result_stages::single_stage( + octx.rctx.respond(std::move(resp))); } - - return ss::do_with(std::move(octx), [](offset_commit_ctx& octx) { - return octx.rctx.groups() - .offset_commit(std::move(octx.request)) - .then([&octx](offset_commit_response resp) { + ss::promise<> dispatch; + auto dispatch_f = dispatch.get_future(); + auto f = ss::do_with( + std::move(octx), + [dispatch = std::move(dispatch)](offset_commit_ctx& octx) mutable { + auto stages = octx.rctx.groups().offset_commit( + std::move(octx.request)); + stages.dispatched.forward_to(std::move(dispatch)); + return stages.committed.then([&octx](offset_commit_response resp) { if (unlikely(!octx.nonexistent_tps.empty())) { /* * copy over partitions for topics that had some partitions @@ -229,7 +233,9 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { } return octx.rctx.respond(std::move(resp)); }); - }); + }); + + return process_result_stages(std::move(dispatch_f), std::move(f)); } } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.h b/src/v/kafka/server/handlers/offset_commit.h index 6812be690fde..8882b78e380a 100644 --- a/src/v/kafka/server/handlers/offset_commit.h +++ b/src/v/kafka/server/handlers/offset_commit.h @@ -17,6 +17,10 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -using offset_commit_handler = handler; - +struct offset_commit_handler { + using api = offset_commit_api; + static constexpr api_version min_supported = api_version(1); + static constexpr api_version max_supported = api_version(7); + static process_result_stages handle(request_context, ss::smp_service_group); +}; } // namespace kafka diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 2dbb5e653bf8..49af16449e0b 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -56,7 +56,7 @@ struct process_dispatch { } }; /** - * Produce request is currently the only one leveraging two stage processing + * Requests processed in two stages */ template<> struct process_dispatch { @@ -66,6 +66,14 @@ struct process_dispatch { } }; +template<> +struct process_dispatch { + static process_result_stages + process(request_context&& ctx, ss::smp_service_group g) { + return offset_commit_handler::handle(std::move(ctx), g); + } +}; + template CONCEPT(requires(KafkaApiHandler || KafkaApiTwoPhaseHandler)) process_result_stages