diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 8bce82c328bd..4f5be8130876 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -115,7 +115,8 @@ ss::future<> connection_context::handle_auth_v0(const size_t size) { std::move(request_buf), 0s); auto resp = co_await kafka::process_request( - std::move(ctx), _proto.smp_group()); + std::move(ctx), _proto.smp_group()) + .response; auto data = std::move(*resp).release(); response.decode(std::move(data), version); } @@ -224,16 +225,14 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { std::move(buf), sres.backpressure_delay); /* - * until authentication is complete, process requests in order - * since all subsequent requests are dependent on authentication - * having completed. + * we process requests in order since all subsequent requests + * are dependent on authentication having completed. * - * the other important reason for disabling pipeling before - * authentication completes is because when a sasl handshake - * with version=0 is processed, the next data on the wire is - * _not_ another request: it is a size-prefixed authentication - * payload without a request envelope, and requires special - * handling. + * the other important reason for disabling pipeling is because + * when a sasl handshake with version=0 is processed, the next + * data on the wire is _not_ another request: it is a + * size-prefixed authentication payload without a request + * envelope, and requires special handling. * * a well behaved client should implicitly provide a data stream * that invokes this behavior in the server: that is, it won't @@ -245,47 +244,52 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { * stream at best and at worst some odd behavior. */ - if (unlikely(!sasl().complete())) { - return do_process(std::move(rctx)) - .handle_exception([self](std::exception_ptr e) { - vlog( - klog.info, - "Detected error processing request: {}", - e); - self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(sres), self] {}); - } - - (void)ss::try_with_gate( - _rs.conn_gate(), - [this, rctx = std::move(rctx)]() mutable { - return do_process(std::move(rctx)); + const auto correlation = rctx.header().correlation; + const sequence_id seq = _seq_idx; + _seq_idx = _seq_idx + sequence_id(1); + auto res = kafka::process_request( + std::move(rctx), _proto.smp_group()); + /** + * first stage processed in a foreground. + */ + return res.dispatched + .then([this, + f = std::move(res.response), + seq, + correlation, + self, + s = std::move(sres)]() mutable { + /** + * second stage processed in background. + */ + (void)ss::try_with_gate( + _rs.conn_gate(), + [this, f = std::move(f), seq, correlation]() mutable { + return f.then( + [this, seq, correlation](response_ptr r) mutable { + r->set_correlation(correlation); + _responses.insert({seq, std::move(r)}); + return process_next_response(); + }); + }) + .handle_exception([self](std::exception_ptr e) { + vlog( + klog.info, + "Detected error processing request: {}", + e); + self->_rs.conn->shutdown_input(); + }) + .finally([s = std::move(s), self] {}); }) .handle_exception([self](std::exception_ptr e) { vlog( klog.info, "Detected error processing request: {}", e); self->_rs.conn->shutdown_input(); - }) - .finally([s = std::move(sres), self] {}); - - return ss::now(); + }); }); }); } -ss::future<> connection_context::do_process(request_context ctx) { - const auto correlation = ctx.header().correlation; - const sequence_id seq = _seq_idx; - _seq_idx = _seq_idx + sequence_id(1); - return kafka::process_request(std::move(ctx), _proto.smp_group()) - .then([this, seq, correlation](response_ptr r) mutable { - r->set_correlation(correlation); - _responses.insert({seq, std::move(r)}); - return process_next_response(); - }); -} - ss::future<> connection_context::process_next_response() { return ss::repeat([this]() mutable { auto it = _responses.find(_next_response); diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 61f1000f07c3..dd25389eb53c 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1600,8 +1600,7 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response(r, error_code::none); } -ss::future -group::store_offsets(offset_commit_request&& r) { +group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { cluster::simple_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); @@ -1642,12 +1641,13 @@ group::store_offsets(offset_commit_request&& r) { auto batch = std::move(builder).build(); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - return _partition - ->replicate( - std::move(reader), - raft::replicate_options(raft::consistency_level::quorum_ack)) - .then([this, req = std::move(r), commits = std::move(offset_commits)]( - result r) mutable { + auto replicate_stages = _partition->replicate_in_stages( + std::move(reader), + raft::replicate_options(raft::consistency_level::quorum_ack)); + + auto f = replicate_stages.replicate_finished.then( + [this, req = std::move(r), commits = std::move(offset_commits)]( + result r) mutable { error_code error = r ? error_code::none : error_code::not_coordinator; if (in_state(group_state::dead)) { return offset_commit_response(req, error); @@ -1666,6 +1666,8 @@ group::store_offsets(offset_commit_request&& r) { return offset_commit_response(req, error); }); + return offset_commit_stages( + std::move(replicate_stages.request_enqueued), std::move(f)); } ss::future @@ -1780,10 +1782,10 @@ group::handle_abort_tx(cluster::abort_group_tx_request r) { } } -ss::future +group::offset_commit_stages group::handle_offset_commit(offset_commit_request&& r) { if (in_state(group_state::dead)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::coordinator_not_available)); } else if (r.data.generation_id < 0 && in_state(group_state::empty)) { @@ -1791,11 +1793,11 @@ group::handle_offset_commit(offset_commit_request&& r) { return store_offsets(std::move(r)); } else if (!contains_member(r.data.member_id)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::unknown_member_id)); } else if (r.data.generation_id != generation()) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::illegal_generation)); } else if ( in_state(group_state::stable) @@ -1807,12 +1809,13 @@ group::handle_offset_commit(offset_commit_request&& r) { schedule_next_heartbeat_expiration(member); return store_offsets(std::move(r)); } else if (in_state(group_state::completing_rebalance)) { - return ss::make_ready_future( + return offset_commit_stages( offset_commit_response(r, error_code::rebalance_in_progress)); } else { - return ss::make_exception_future( - std::runtime_error( - fmt::format("Unexpected group state {} for {}", _state, *this))); + return offset_commit_stages( + ss::now(), + ss::make_exception_future(std::runtime_error( + fmt::format("Unexpected group state {} for {}", _state, *this)))); } } diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index d30ae62d2f66..1d39610feab9 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -14,6 +14,7 @@ #include "cluster/partition.h" #include "cluster/tx_utils.h" #include "kafka/protocol/fwd.h" +#include "kafka/protocol/offset_commit.h" #include "kafka/server/logger.h" #include "kafka/server/member.h" #include "kafka/types.h" @@ -114,6 +115,21 @@ class group { static constexpr int8_t commit_tx_record_version{0}; static constexpr int8_t aborted_tx_record_version{0}; + struct offset_commit_stages { + explicit offset_commit_stages(offset_commit_response resp) + : dispatched(ss::now()) + , committed( + ss::make_ready_future(std::move(resp))) {} + + offset_commit_stages( + ss::future<> dispatched, ss::future resp) + : dispatched(std::move(dispatched)) + , committed(std::move(resp)) {} + + ss::future<> dispatched; + ss::future committed; + }; + struct offset_metadata { model::offset log_offset; model::offset offset; @@ -430,7 +446,7 @@ class group { ss::future store_txn_offsets(txn_offset_commit_request r); - ss::future store_offsets(offset_commit_request&& r); + offset_commit_stages store_offsets(offset_commit_request&& r); ss::future handle_txn_offset_commit(txn_offset_commit_request r); @@ -444,8 +460,7 @@ class group { ss::future handle_abort_tx(cluster::abort_group_tx_request r); - ss::future - handle_offset_commit(offset_commit_request&& r); + offset_commit_stages handle_offset_commit(offset_commit_request&& r); ss::future handle_commit_tx(cluster::commit_group_tx_request r); diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 4d0914f45a2b..e441dd453af3 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -856,13 +856,12 @@ group_manager::abort_tx(cluster::abort_group_tx_request&& r) { }); } -ss::future +group::offset_commit_stages group_manager::offset_commit(offset_commit_request&& r) { auto error = validate_group_status( r.ntp, r.data.group_id, offset_commit_api::key); if (error != error_code::none) { - return ss::make_ready_future( - offset_commit_response(r, error)); + return group::offset_commit_stages(offset_commit_response(r, error)); } auto group = get_group(r.data.group_id); @@ -878,7 +877,7 @@ group_manager::offset_commit(offset_commit_request&& r) { } else { // or this is a request coming from an older generation. // either way, reject the commit - return ss::make_ready_future( + return group::offset_commit_stages( offset_commit_response(r, error_code::illegal_generation)); } } diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index d8fe7f03c0eb..0d1f6f06fa4f 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -135,8 +135,7 @@ class group_manager { ss::future leave_group(leave_group_request&& request); /// \brief Handle a OffsetCommit request - ss::future - offset_commit(offset_commit_request&& request); + group::offset_commit_stages offset_commit(offset_commit_request&& request); ss::future txn_offset_commit(txn_offset_commit_request&& request); diff --git a/src/v/kafka/server/group_router.h b/src/v/kafka/server/group_router.h index 1e4897ac55c6..0a4ea5d8d90f 100644 --- a/src/v/kafka/server/group_router.h +++ b/src/v/kafka/server/group_router.h @@ -29,7 +29,9 @@ #include #include #include +#include +#include #include namespace kafka { @@ -129,8 +131,59 @@ class group_router final { return route(std::move(request), &group_manager::leave_group); } - auto offset_commit(offset_commit_request&& request) { - return route(std::move(request), &group_manager::offset_commit); + group::offset_commit_stages offset_commit(offset_commit_request&& request) { + auto m = shard_for(request.data.group_id); + if (!m) { + return group::offset_commit_stages( + offset_commit_response(request, error_code::not_coordinator)); + } + request.ntp = std::move(m->first); + auto dispatched = std::make_unique>(); + auto dispatched_f = dispatched->get_future(); + auto f = with_scheduling_group( + _sg, + [this, + shard = m->second, + request = std::move(request), + dispatched = std::move(dispatched)]() mutable { + return _group_manager.invoke_on( + shard, + _ssg, + [request = std::move(request), + dispatched = std::move(dispatched), + source_shard = ss::this_shard_id()]( + group_manager& mgr) mutable { + auto stages = mgr.offset_commit(std::move(request)); + /** + * dispatched future is always ready before committed one, + * we do not have to use gate in here + */ + return stages.dispatched + .then_wrapped([source_shard, d = std::move(dispatched)]( + ss::future<> f) mutable { + if (f.failed()) { + (void)ss::smp::submit_to( + source_shard, + [d = std::move(d), + e = f.get_exception()]() mutable { + d->set_exception(e); + d.reset(); + }); + return; + } + (void)ss::smp::submit_to( + source_shard, [d = std::move(d)]() mutable { + d->set_value(); + d.reset(); + }); + }) + .then([f = std::move(stages.committed)]() mutable { + return std::move(f); + }); + }); + }); + return group::offset_commit_stages( + std::move(dispatched_f), std::move(f)); } auto txn_offset_commit(txn_offset_commit_request&& request) { diff --git a/src/v/kafka/server/handlers/handler.h b/src/v/kafka/server/handlers/handler.h index 6a902b02b483..61fbd120dc82 100644 --- a/src/v/kafka/server/handlers/handler.h +++ b/src/v/kafka/server/handlers/handler.h @@ -41,6 +41,15 @@ concept KafkaApiHandler = requires (T h, request_context&& ctx, ss::smp_service_ { T::handle(std::move(ctx), g) } -> std::same_as>; }; ) +CONCEPT( +template +concept KafkaApiTwoPhaseHandler = requires (T h, request_context&& ctx, ss::smp_service_group g) { + KafkaApi; + { T::min_supported } -> std::convertible_to; + { T::max_supported } -> std::convertible_to; + { T::handle(std::move(ctx), g) } -> std::same_as; +}; +) // clang-format on } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 61327ca7285f..a9f2f94ac1f3 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -53,16 +53,15 @@ struct offset_commit_ctx { , ssg(ssg) {} }; -template<> -ss::future +process_result_stages offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { offset_commit_request request; request.decode(ctx.reader(), ctx.header().version); klog.trace("Handling request {}", request); if (request.data.group_instance_id) { - return ctx.respond( - offset_commit_response(request, error_code::unsupported_version)); + return process_result_stages::single_stage(ctx.respond( + offset_commit_response(request, error_code::unsupported_version))); } // check authorization for this group @@ -186,13 +185,18 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { .partitions = std::move(topic.second), }); } - return octx.rctx.respond(std::move(resp)); + return process_result_stages::single_stage( + octx.rctx.respond(std::move(resp))); } - - return ss::do_with(std::move(octx), [](offset_commit_ctx& octx) { - return octx.rctx.groups() - .offset_commit(std::move(octx.request)) - .then([&octx](offset_commit_response resp) { + ss::promise<> dispatch; + auto dispatch_f = dispatch.get_future(); + auto f = ss::do_with( + std::move(octx), + [dispatch = std::move(dispatch)](offset_commit_ctx& octx) mutable { + auto stages = octx.rctx.groups().offset_commit( + std::move(octx.request)); + stages.dispatched.forward_to(std::move(dispatch)); + return stages.committed.then([&octx](offset_commit_response resp) { if (unlikely(!octx.nonexistent_tps.empty())) { /* * copy over partitions for topics that had some partitions @@ -229,7 +233,9 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { } return octx.rctx.respond(std::move(resp)); }); - }); + }); + + return process_result_stages(std::move(dispatch_f), std::move(f)); } } // namespace kafka diff --git a/src/v/kafka/server/handlers/offset_commit.h b/src/v/kafka/server/handlers/offset_commit.h index 6812be690fde..8882b78e380a 100644 --- a/src/v/kafka/server/handlers/offset_commit.h +++ b/src/v/kafka/server/handlers/offset_commit.h @@ -17,6 +17,10 @@ namespace kafka { // in version 0 kafka stores offsets in zookeeper. if we ever need to // support version 0 then we need to do some code review to see if this has // any implications on semantics. -using offset_commit_handler = handler; - +struct offset_commit_handler { + using api = offset_commit_api; + static constexpr api_version min_supported = api_version(1); + static constexpr api_version max_supported = api_version(7); + static process_result_stages handle(request_context, ss::smp_service_group); +}; } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index a75bdd4e14d3..296284e3d7dc 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -78,6 +78,24 @@ struct produce_ctx { , ssg(ssg) {} }; +struct partition_produce_stages { + ss::future<> dispatched; + ss::future produced; +}; + +struct topic_produce_stages { + ss::future<> dispatched; + ss::future produced; +}; + +partition_produce_stages make_ready_stage(produce_response::partition p) { + return partition_produce_stages{ + .dispatched = ss::now(), + .produced = ss::make_ready_future( + std::move(p)), + }; +} + static raft::replicate_options acks_to_replicate_options(int16_t acks) { switch (acks) { case -1: @@ -136,40 +154,45 @@ static error_code map_produce_error_code(std::error_code ec) { * Caller is expected to catch errors that may be thrown while the kafka * batch is being deserialized (see reader_from_kafka_batch). */ -static ss::future partition_append( +static partition_produce_stages partition_append( model::partition_id id, ss::lw_shared_ptr partition, model::batch_identity bid, model::record_batch_reader reader, int16_t acks, int32_t num_records) { - return partition - ->replicate(bid, std::move(reader), acks_to_replicate_options(acks)) - .then_wrapped([partition, id, num_records = num_records]( - ss::future> f) { - produce_response::partition p{.partition_index = id}; - try { - auto r = f.get0(); - if (r.has_value()) { - // have to subtract num_of_records - 1 as base_offset - // is inclusive - p.base_offset = model::offset(r.value() - (num_records - 1)); - p.error_code = error_code::none; - partition->probe().add_records_produced(num_records); - } else { - p.error_code = map_produce_error_code(r.error()); - } - } catch (...) { - p.error_code = error_code::unknown_server_error; - } - return p; - }); + auto stages = partition->replicate( + bid, std::move(reader), acks_to_replicate_options(acks)); + return partition_produce_stages{ + .dispatched = std::move(stages.request_enqueued), + .produced = stages.replicate_finished.then_wrapped( + [partition, id, num_records = num_records]( + ss::future> f) { + produce_response::partition p{.partition_index = id}; + try { + auto r = f.get0(); + if (r.has_value()) { + // have to subtract num_of_records - 1 as base_offset + // is inclusive + p.base_offset = model::offset( + r.value().last_offset - (num_records - 1)); + p.error_code = error_code::none; + partition->probe().add_records_produced(num_records); + } else { + p.error_code = map_produce_error_code(r.error()); + } + } catch (...) { + p.error_code = error_code::unknown_server_error; + } + return p; + }), + }; } /** * \brief handle writing to a single topic partition. */ -static ss::future produce_topic_partition( +static partition_produce_stages produce_topic_partition( produce_ctx& octx, produce_request::topic& topic, produce_request::partition& part) { @@ -183,10 +206,9 @@ static ss::future produce_topic_partition( auto shard = octx.rctx.shards().shard_for(ntp); if (!shard) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::unknown_topic_or_partition}); + return make_ready_stage(produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::unknown_topic_or_partition}); } // steal the batch from the adapter @@ -214,55 +236,106 @@ static ss::future produce_topic_partition( auto num_records = batch.record_count(); auto reader = reader_from_lcore_batch(std::move(batch)); auto start = std::chrono::steady_clock::now(); - auto f = octx.rctx.partition_manager().invoke_on( - *shard, - octx.ssg, - [reader = std::move(reader), - ntp = std::move(ntp), - num_records, - bid, - acks = octx.request.data.acks](cluster::partition_manager& mgr) mutable { - auto partition = mgr.get(ntp); - if (!partition) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::unknown_topic_or_partition}); - } - if (unlikely(!partition->is_leader())) { - return ss::make_ready_future( - produce_response::partition{ - .partition_index = ntp.tp.partition, - .error_code = error_code::not_leader_for_partition}); - } - return partition_append( - ntp.tp.partition, - ss::make_lw_shared(std::move(partition)), - bid, - std::move(reader), - acks, - num_records); - }); - return f.then([&octx, start](produce_response::partition p) { - if (p.error_code == error_code::none) { - auto dur = std::chrono::steady_clock::now() - start; - octx.rctx.connection()->server().update_produce_latency(dur); - } - return p; - }); + + auto dispatch = std::make_unique>(); + auto dispatch_f = dispatch->get_future(); + auto f + = octx.rctx.partition_manager() + .invoke_on( + *shard, + octx.ssg, + [reader = std::move(reader), + ntp = std::move(ntp), + dispatch = std::move(dispatch), + num_records, + bid, + acks = octx.request.data.acks, + source_shard = ss::this_shard_id()]( + cluster::partition_manager& mgr) mutable { + auto partition = mgr.get(ntp); + if (!partition) { + // submit back to promise source shard + (void)ss::smp::submit_to( + source_shard, [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + return ss::make_ready_future( + produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::unknown_topic_or_partition}); + } + if (unlikely(!partition->is_leader())) { + // submit back to promise source shard + (void)ss::smp::submit_to( + source_shard, [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + return ss::make_ready_future( + produce_response::partition{ + .partition_index = ntp.tp.partition, + .error_code = error_code::not_leader_for_partition}); + } + auto stages = partition_append( + ntp.tp.partition, + ss::make_lw_shared( + std::move(partition)), + bid, + std::move(reader), + acks, + num_records); + return stages.dispatched + .then_wrapped([source_shard, dispatch = std::move(dispatch)]( + ss::future<> f) mutable { + if (f.failed()) { + (void)ss::smp::submit_to( + source_shard, + [dispatch = std::move(dispatch), + e = f.get_exception()]() mutable { + dispatch->set_exception(e); + dispatch.reset(); + }); + return; + } + (void)ss::smp::submit_to( + source_shard, + [dispatch = std::move(dispatch)]() mutable { + dispatch->set_value(); + dispatch.reset(); + }); + }) + .then([f = std::move(stages.produced)]() mutable { + return std::move(f); + }); + }) + .then([&octx, start](produce_response::partition p) { + if (p.error_code == error_code::none) { + auto dur = std::chrono::steady_clock::now() - start; + octx.rctx.connection()->server().update_produce_latency(dur); + } + return p; + }); + return partition_produce_stages{ + .dispatched = std::move(dispatch_f), + .produced = std::move(f), + }; } /** * \brief Dispatch and collect topic partition produce responses */ -static ss::future +static topic_produce_stages produce_topic(produce_ctx& octx, produce_request::topic& topic) { - std::vector> partitions; - partitions.reserve(topic.partitions.size()); + std::vector> partitions_produced; + std::vector> partitions_dispatched; + partitions_produced.reserve(topic.partitions.size()); + partitions_dispatched.reserve(topic.partitions.size()); for (auto& part : topic.partitions) { if (!octx.rctx.authorized(security::acl_operation::write, topic.name)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -273,7 +346,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if (!octx.rctx.metadata_cache().contains( model::topic_namespace_view(model::kafka_namespace, topic.name), part.partition_index)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -283,7 +357,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { // the record data on the wire was null value if (unlikely(!part.records)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -293,7 +368,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { // an error occured handling legacy messages (magic 0 or 1) if (unlikely(part.records->adapter.legacy_error)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -302,7 +378,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { } if (unlikely(!part.records->adapter.valid_crc)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -320,7 +397,8 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if (unlikely( !part.records->adapter.v2_format || !part.records->adapter.batch)) { - partitions.push_back( + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( ss::make_ready_future( produce_response::partition{ .partition_index = part.partition_index, @@ -329,38 +407,42 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { } auto pr = produce_topic_partition(octx, topic, part); - partitions.push_back(std::move(pr)); + partitions_produced.push_back(std::move(pr.produced)); + partitions_dispatched.push_back(std::move(pr.dispatched)); } // collect partition responses and build the topic response - return when_all_succeed(partitions.begin(), partitions.end()) - .then([name = std::move(topic.name)]( - std::vector parts) mutable { - return produce_response::topic{ - .name = std::move(name), - .partitions = std::move(parts), - }; - }); + return topic_produce_stages{ + .dispatched = ss::when_all_succeed( + partitions_dispatched.begin(), partitions_dispatched.end()), + .produced + = ss::when_all_succeed( + partitions_produced.begin(), partitions_produced.end()) + .then([name = std::move(topic.name)]( + std::vector parts) mutable { + return produce_response::topic{ + .name = std::move(name), + .partitions = std::move(parts), + }; + }), + }; } /** * \brief Dispatch and collect topic produce responses */ -static std::vector> -produce_topics(produce_ctx& octx) { - std::vector> topics; +static std::vector produce_topics(produce_ctx& octx) { + std::vector topics; topics.reserve(octx.request.data.topics.size()); for (auto& topic : octx.request.data.topics) { - auto tr = produce_topic(octx, topic); - topics.push_back(std::move(tr)); + topics.push_back(produce_topic(octx, topic)); } return topics; } -template<> -ss::future +process_result_stages produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { produce_request request; request.decode(ctx.reader(), ctx.header().version); @@ -390,8 +472,9 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { */ if (request.has_transactional) { if (!ctx.are_transactions_enabled()) { - return ctx.respond(request.make_error_response( - error_code::transactional_id_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::transactional_id_authorization_failed))); } if ( @@ -399,8 +482,9 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { || !ctx.authorized( security::acl_operation::write, transactional_id(*request.data.transactional_id))) { - return ctx.respond(request.make_error_response( - error_code::transactional_id_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::transactional_id_authorization_failed))); } // Note that authorization to a transactionalId implies // ProducerId authorization @@ -409,13 +493,15 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { if (!ctx.authorized( security::acl_operation::idempotent_write, security::default_cluster_name)) { - return ctx.respond(request.make_error_response( - error_code::cluster_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::cluster_authorization_failed))); } if (!ctx.is_idempotence_enabled()) { - return ctx.respond(request.make_error_response( - error_code::cluster_authorization_failed)); + return process_result_stages::single_stage( + ctx.respond(request.make_error_response( + error_code::cluster_authorization_failed))); } } else if (request.data.acks < -1 || request.data.acks > 1) { @@ -428,65 +514,95 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { "configuration/" "producer-configs.html", request.data.acks); - return ctx.respond( - request.make_error_response(error_code::invalid_required_acks)); + return process_result_stages::single_stage(ctx.respond( + request.make_error_response(error_code::invalid_required_acks))); } - - return ss::do_with( + ss::promise<> dispatched_promise; + auto dispatched_f = dispatched_promise.get_future(); + auto produced_f = ss::do_with( produce_ctx(std::move(ctx), std::move(request), ssg), - [](produce_ctx& octx) { + [dispatched_promise = std::move(dispatched_promise)]( + produce_ctx& octx) mutable { vlog(klog.trace, "handling produce request {}", octx.request); // dispatch produce requests for each topic - auto topics = produce_topics(octx); - - // collect topic responses - return when_all_succeed(topics.begin(), topics.end()) - .then([&octx](std::vector topics) { - octx.response.data.responses = std::move(topics); - }) - .then([&octx] { - // send response immediately - if (octx.request.data.acks != 0) { - return octx.rctx.respond(std::move(octx.response)); - } - - // acks = 0 is handled separately. first, check for - // errors - bool has_error = false; - for (const auto& topic : octx.response.data.responses) { - for (const auto& p : topic.partitions) { - if (p.error_code != error_code::none) { - has_error = true; - break; - } - } - } - - // in the absense of errors, acks = 0 results in the - // response being dropped, as the client does not expect - // a response. here we mark the response as noop, but - // let it flow back so that it can be accounted for in - // quota and stats tracking. it is dropped later during - // processing. - if (!has_error) { - return octx.rctx.respond(std::move(octx.response)) - .then([](response_ptr resp) { - resp->mark_noop(); - return resp; + auto stages = produce_topics(octx); + std::vector> dispatched; + std::vector> produced; + dispatched.reserve(stages.size()); + produced.reserve(stages.size()); + + for (auto& s : stages) { + dispatched.push_back(std::move(s.dispatched)); + produced.push_back(std::move(s.produced)); + } + return seastar::when_all_succeed(dispatched.begin(), dispatched.end()) + .then_wrapped([&octx, + dispatched_promise = std::move(dispatched_promise), + produced = std::move(produced)]( + ss::future<> f) mutable { + try { + f.get(); + dispatched_promise.set_value(); + // collect topic responses + return when_all_succeed(produced.begin(), produced.end()) + .then( + [&octx](std::vector topics) { + octx.response.data.responses = std::move(topics); + }) + .then([&octx] { + // send response immediately + if (octx.request.data.acks != 0) { + return octx.rctx.respond( + std::move(octx.response)); + } + + // acks = 0 is handled separately. first, check for + // errors + bool has_error = false; + for (const auto& topic : + octx.response.data.responses) { + for (const auto& p : topic.partitions) { + if (p.error_code != error_code::none) { + has_error = true; + break; + } + } + } + + // in the absense of errors, acks = 0 results in the + // response being dropped, as the client does not + // expect a response. here we mark the response as + // noop, but let it flow back so that it can be + // accounted for in quota and stats tracking. it is + // dropped later during processing. + if (!has_error) { + return octx.rctx.respond(std::move(octx.response)) + .then([](response_ptr resp) { + resp->mark_noop(); + return resp; + }); + } + + // errors in a response from an acks=0 produce request + // result in the connection being dropped to signal an + // issue to the client + return ss::make_exception_future( + std::runtime_error(fmt::format( + "Closing connection due to error in produce " + "response: {}", + octx.response))); }); + } catch (...) { + dispatched_promise.set_exception(std::current_exception()); + return ss::make_exception_future( + std::current_exception()); } - - // errors in a response from an acks=0 produce request - // result in the connection being dropped to signal an - // issue to the client - return ss::make_exception_future( - std::runtime_error(fmt::format( - "Closing connection due to error in produce " - "response: {}", - octx.response))); }); }); + + return process_result_stages( + std::move(dispatched_f), std::move(produced_f)); } } // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.h b/src/v/kafka/server/handlers/produce.h index 179deb8e170e..a8a5aa73baea 100644 --- a/src/v/kafka/server/handlers/produce.h +++ b/src/v/kafka/server/handlers/produce.h @@ -14,6 +14,11 @@ namespace kafka { -using produce_handler = handler; +struct produce_handler { + using api = produce_api; + static constexpr api_version min_supported = api_version(0); + static constexpr api_version max_supported = api_version(7); + static process_result_stages handle(request_context, ss::smp_service_group); +}; -} +} // namespace kafka diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 8aa38bf61cc5..0cd8c7787b50 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -12,6 +12,7 @@ #include "kafka/protocol/errors.h" #include "model/fundamental.h" +#include "raft/types.h" #include "storage/types.h" #include @@ -108,17 +109,21 @@ ss::future> replicated_partition::replicate( return ret_t(_translator->to_kafka_offset(r.value().last_offset)); }); } -ss::future> replicated_partition::replicate( + +raft::replicate_stages replicated_partition::replicate( model::batch_identity batch_id, model::record_batch_reader&& rdr, raft::replicate_options opts) { - using ret_t = result; - return _partition->replicate(batch_id, std::move(rdr), opts) - .then([this](result r) { + using ret_t = result; + auto res = _partition->replicate_in_stages(batch_id, std::move(rdr), opts); + res.replicate_finished = res.replicate_finished.then( + [this](result r) { if (!r) { return ret_t(r.error()); } - return ret_t(_translator->to_kafka_offset(r.value().last_offset)); + return ret_t(raft::replicate_result{ + _translator->to_kafka_offset(r.value().last_offset)}); }); + return res; } } // namespace kafka diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 5af41407da29..1499eed46bc4 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -49,7 +49,7 @@ class replicated_partition final : public kafka::partition_proxy::impl { ss::future> replicate(model::record_batch_reader, raft::replicate_options); - ss::future> replicate( + raft::replicate_stages replicate( model::batch_identity, model::record_batch_reader&&, raft::replicate_options); diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 176024940764..0a38f4c8bf5e 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -184,7 +184,6 @@ class request_context { }; // Executes the API call identified by the specified request_context. -ss::future -process_request(request_context&&, ss::smp_service_group); +process_result_stages process_request(request_context&&, ss::smp_service_group); } // namespace kafka diff --git a/src/v/kafka/server/requests.cc b/src/v/kafka/server/requests.cc index 22e56f3281e6..49af16449e0b 100644 --- a/src/v/kafka/server/requests.cc +++ b/src/v/kafka/server/requests.cc @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0 #include "kafka/server/handlers/handlers.h" +#include "kafka/server/handlers/produce.h" #include "kafka/server/request_context.h" #include "kafka/types.h" #include "utils/to_string.h" @@ -22,20 +23,20 @@ namespace kafka { * Dispatch request with version bounds checking. */ template -CONCEPT(requires(KafkaApiHandler)) +CONCEPT(requires(KafkaApiHandler || KafkaApiTwoPhaseHandler)) struct process_dispatch { - static ss::future + static process_result_stages process(request_context&& ctx, ss::smp_service_group g) { if ( ctx.header().version < Request::min_supported || ctx.header().version > Request::max_supported) { - return ss::make_exception_future( - std::runtime_error(fmt::format( - "Unsupported version {} for {} API", - ctx.header().version, - Request::api::name))); + throw std::runtime_error(fmt::format( + "Unsupported version {} for {} API", + ctx.header().version, + Request::api::name)); } - return Request::handle(std::move(ctx), g); + return process_result_stages::single_stage( + Request::handle(std::move(ctx), g)); } }; @@ -48,16 +49,35 @@ struct process_dispatch { */ template<> struct process_dispatch { - static ss::future + static process_result_stages process(request_context&& ctx, ss::smp_service_group g) { - return api_versions_handler::handle(std::move(ctx), g); + return process_result_stages::single_stage( + api_versions_handler::handle(std::move(ctx), g)); + } +}; +/** + * Requests processed in two stages + */ +template<> +struct process_dispatch { + static process_result_stages + process(request_context&& ctx, ss::smp_service_group g) { + return produce_handler::handle(std::move(ctx), g); + } +}; + +template<> +struct process_dispatch { + static process_result_stages + process(request_context&& ctx, ss::smp_service_group g) { + return offset_commit_handler::handle(std::move(ctx), g); } }; template -CONCEPT(requires(KafkaApiHandler)) -ss::future do_process( - request_context&& ctx, ss::smp_service_group g) { +CONCEPT(requires(KafkaApiHandler || KafkaApiTwoPhaseHandler)) +process_result_stages + do_process(request_context&& ctx, ss::smp_service_group g) { vlog( klog.trace, "Processing name:{}, key:{}, version:{} for {}", @@ -85,7 +105,7 @@ handle_auth_handshake(request_context&& ctx, ss::smp_service_group g) { conn->sasl().set_handshake_v0(); } return do_process(std::move(ctx), g) - .then([conn = std::move(conn)](response_ptr r) { + .response.then([conn = std::move(conn)](response_ptr r) { if (conn->sasl().has_mechanism()) { conn->sasl().set_state( security::sasl_server::sasl_state::authenticate); @@ -146,7 +166,7 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { } auto conn = ctx.connection(); return do_process(std::move(ctx), g) - .then([conn = std::move(conn)](response_ptr r) { + .response.then([conn = std::move(conn)](response_ptr r) { /* * there may be multiple authentication round-trips so it is fine * to return without entering an end state like complete/failed. @@ -179,7 +199,7 @@ handle_auth(request_context&& ctx, ss::smp_service_group g) { } } -ss::future +process_result_stages process_request(request_context&& ctx, ss::smp_service_group g) { /* * requests are handled as normal when auth is disabled. otherwise no @@ -187,14 +207,15 @@ process_request(request_context&& ctx, ss::smp_service_group g) { */ if (unlikely(!ctx.sasl().complete())) { auto conn = ctx.connection(); - return handle_auth(std::move(ctx), g) - .then_wrapped([conn](ss::future f) { - if (f.failed()) { - conn->sasl().set_state( - security::sasl_server::sasl_state::failed); - } - return f; - }); + return process_result_stages::single_stage( + handle_auth(std::move(ctx), g) + .then_wrapped([conn](ss::future f) { + if (f.failed()) { + conn->sasl().set_state( + security::sasl_server::sasl_state::failed); + } + return f; + })); } switch (ctx.header().key) { @@ -235,14 +256,15 @@ process_request(request_context&& ctx, ss::smp_service_group g) { case describe_groups_handler::api::key: return do_process(std::move(ctx), g); case sasl_handshake_handler::api::key: - return ctx.respond( - sasl_handshake_response(error_code::illegal_sasl_state, {})); + return process_result_stages::single_stage(ctx.respond( + sasl_handshake_response(error_code::illegal_sasl_state, {}))); case sasl_authenticate_handler::api::key: { sasl_authenticate_response_data data{ .error_code = error_code::illegal_sasl_state, .error_message = "Authentication process already completed", }; - return ctx.respond(sasl_authenticate_response(std::move(data))); + return process_result_stages::single_stage( + ctx.respond(sasl_authenticate_response(std::move(data)))); } case init_producer_id_handler::api::key: return do_process(std::move(ctx), g); @@ -267,8 +289,8 @@ process_request(request_context&& ctx, ss::smp_service_group g) { case end_txn_handler::api::key: return do_process(std::move(ctx), g); }; - return ss::make_exception_future( - std::runtime_error(fmt::format("Unsupported API {}", ctx.header().key))); + throw std::runtime_error( + fmt::format("Unsupported API {}", ctx.header().key)); } std::ostream& operator<<(std::ostream& os, const request_header& header) { diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index 0691a50f2f0c..9b43a4721ef3 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -16,6 +16,7 @@ #include "kafka/protocol/types.h" #include "seastarx.h" +#include #include #include @@ -54,4 +55,43 @@ class response { using response_ptr = ss::foreign_ptr>; +struct process_result_stages { + process_result_stages( + ss::future<> dispatched_f, ss::future response_f) + : dispatched(std::move(dispatched_f)) + , response(std::move(response_f)) {} + + explicit process_result_stages(response_ptr response) + : dispatched(ss::now()) + , response(ss::make_ready_future(std::move(response))) {} + + /** + * Single stage method is a helper to execute whole request in foreground. + * The dispatch phase if finished after response phase this way when + * response is processed in background it is already resolved future. + */ + static process_result_stages single_stage(ss::future f) { + ss::promise response; + auto response_f = response.get_future(); + auto dispatch = f.then_wrapped( + [response = std::move(response)](ss::future f) mutable { + try { + auto r = f.get(); + response.set_value(std::move(r)); + } catch (...) { + response.set_exception(std::current_exception()); + } + }); + + return process_result_stages( + std::move(dispatch), std::move(response_f)); + } + + // after this future resolved request is dispatched for processing and + // processing order is set + ss::future<> dispatched; + // the response future is intended to be executed in background + ss::future response; +}; + } // namespace kafka