Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing requests in order in Kafka layer #1496

Merged
merged 4 commits into from
Jun 15, 2021

Conversation

mmaslankaprv
Copy link
Member

@mmaslankaprv mmaslankaprv commented May 28, 2021

Cover letter

Changed the way how we process requests in Redpanda Kafka protocol implementation. Previously we processed requests in background, this allowed us to process multiple requests at a time but it may cause issues since requests were not guaranteed to be processed in the same order as they were received. Implemented handling Kafka requests in foreground, this way we normally process one request at a time per connection in exactly the same order as the requests were received.

Produce and commit offsets requests are treated differently as they leverage two phase processing. We process request dispatch in foreground, after this first phase request processing order is guaranteed. Then the second phase is processed in background. This approach allow us to process multiple produce & offset commit requests at a time without compromising ordering.

Fixes: N/A

Release notes

  • Full compatibility with Kafka with regards to requests ordering.

@mmaslankaprv mmaslankaprv force-pushed the queue-depth-one branch 3 times, most recently from 69fc92f to c120c50 Compare June 1, 2021 07:01
@mmaslankaprv mmaslankaprv changed the title Queue depth one Processing requests in order in Kafka layer Jun 9, 2021
@mmaslankaprv mmaslankaprv marked this pull request as ready for review June 9, 2021 08:23
@mmaslankaprv mmaslankaprv requested a review from a team as a code owner June 9, 2021 08:23
@mmaslankaprv mmaslankaprv requested review from Lazin, dotnwat, rystsov and BenPope and removed request for a team June 9, 2021 08:23
Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is awesome and I think its really close to going in. I think the only real important thing in my feedback in (1) the setting of the exception that crosses cores (see produce.cc feedback) I'm not sure its an issue but its worth looking at closely, and (2) it seems like all of the cases where the cross-core promise is set that that cross-core traffic could be done in the background (without appropriate tracking via gates). but (2) is probably a future optimization. (3) can we add a comment to group_commit / produce about how the cross core promise works (potentially removing the foreign pointer) and stating what the rules are for interacting with that pointer/promise?

src/v/kafka/server/requests.cc Show resolved Hide resolved
src/v/kafka/server/requests.cc Show resolved Hide resolved
src/v/kafka/server/connection_context.cc Show resolved Hide resolved
src/v/kafka/server/connection_context.cc Show resolved Hide resolved
src/v/kafka/server/handlers/produce.cc Show resolved Hide resolved
src/v/kafka/server/handlers/produce.cc Show resolved Hide resolved
src/v/kafka/server/handlers/produce.cc Show resolved Hide resolved
});

auto dispatch = ss::make_foreign<std::unique_ptr<ss::promise<>>>(
std::make_unique<ss::promise<>>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure a foreign ptr here is necessary since you explicitly handle the cross-core stuff yourself. maybe consider using a normal unique ptr? in either case, I do think that there should be a short comment here about how the cross-core promise signaling works

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did previously used std::unique_ptr the problem is that lambda is handled on foreign core, deletion of lambda captures is done on the remote core, it causes segmentation faults.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sounds like the segmentation fault is a race condition? normal memory can be freed on a remote core without a foreign pointer which is an optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is indeed a race condition, using std::unique_ptr across shards isn't safe, logic inside the std::unique_ptr is triggered twice and it may lead to use after free situations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain? AFAICT foreign pointer should be an optimization, unnecessary for correctness. Thus, it sounds like there is a bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary from out-of-band conversation:

hypothesis: the unique pointer is actually destroyed on the destination core (not the source core where the promise is created/set). in this case the promise destructor detects the issue and has a problem. the foreign pointer masks that, doing double duty as an optimization and correctness. I think that in this case it is less of an optimization since we are adding additional round-trips.

in submit_to(source_core, ...) when we set the promise value, it seems like we could at that moment also take care of reseting the unique_ptr and destroying the promise on the correct core (basically the same thing that foreign pointer would be doing).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this and I didn't have any issues, but I'm not really sure how to reproduce the issue you were seeing when the foreign pointer wasn't being used.

also, after res.dispatched completes, can we also background the promise sets (not just in the error cases?)

diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc
index 8e0495caa..34aa9879a 100644
--- a/src/v/kafka/server/handlers/produce.cc
+++ b/src/v/kafka/server/handlers/produce.cc
@@ -237,7 +237,7 @@ static partition_produce_stages produce_topic_partition(
     auto reader = reader_from_lcore_batch(std::move(batch));
     auto start = std::chrono::steady_clock::now();
 
-    auto dispatch = ss::make_foreign<std::unique_ptr<ss::promise<>>>(
+    auto dispatch = std::unique_ptr<ss::promise<>>(
       std::make_unique<ss::promise<>>());
     auto dispatch_f = dispatch->get_future();
     auto f
@@ -259,6 +259,7 @@ static partition_produce_stages produce_topic_partition(
                     (void)ss::smp::submit_to(
                       source_shard, [dispatch = std::move(dispatch)]() mutable {
                           dispatch->set_value();
+                          dispatch.reset();
                       });
                     return ss::make_ready_future<produce_response::partition>(
                       produce_response::partition{
@@ -270,6 +271,7 @@ static partition_produce_stages produce_topic_partition(
                     (void)ss::smp::submit_to(
                       source_shard, [dispatch = std::move(dispatch)]() mutable {
                           dispatch->set_value();
+                          dispatch.reset();
                       });
                     return ss::make_ready_future<produce_response::partition>(
                       produce_response::partition{
@@ -288,18 +290,22 @@ static partition_produce_stages produce_topic_partition(
                   .then_wrapped([source_shard, dispatch = std::move(dispatch)](
                                   ss::future<> f) mutable {
                       if (f.failed()) {
-                          return ss::smp::submit_to(
+                          (void)ss::smp::submit_to(
                             source_shard,
                             [dispatch = std::move(dispatch),
                              e = f.get_exception()]() mutable {
                                 dispatch->set_exception(e);
+                                dispatch.reset();
                             });
+                          return ss::now();
                       }
-                      return ss::smp::submit_to(
+                      (void)ss::smp::submit_to(
                         source_shard,
                         [dispatch = std::move(dispatch)]() mutable {
                             dispatch->set_value();
+                            dispatch.reset();
                         });
+                      return ss::now();
                   })
                   .then([f = std::move(stages.produced)]() mutable {
                       return std::move(f);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was seeing an error after this code was deployed to the cluster and executed for a while. I am going to check this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is reasonable to use foreign pointer as an RAII tool for cases like moving around the promise which will panic if destroyed on the other core. but it was good to get to the bottom of the reasoning behind things being fragile.

looking again, I think the right solution is to use the foreign pointer (for the RAII protection against exception paths), but to do the explicit reset on the source core to avoid the extra core round-trip. that extra round trip should be avoided in foreign pointer destructor if it finds that the pointer had already been reset.

src/v/kafka/server/handlers/produce.cc Show resolved Hide resolved
Introduced a type aggregating futures representing two stages of kafka
request processing. This way request handler will be able to decide
which part of the processing should be executed in foreground (blocking
other requests from being handled) and which can be executed in
background asynchronous to other requests processing.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Split handling of Kafka request into two stages. Dispatch stage is
executed in foreground while the second stage is executed in background.
This way we can leverage the fact that request processing order is
established before its processing completely finished and handle
multiple requests at the time without compromising correct ordering.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Implemented two stage handling of produce request. The two phases of
produce request processing are reflected in two phases of
`cluster::partition::replicate` this way redpanda can handle multiple
requests per connection while still not changing the request processing
ordering.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Offsets commit handler uses raft to replicate offset commit requests.
Leveraging raft two stage replicate processing to to handle multiple
in-flight offset commit requests and prevent contention.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants