From 4ee62a3c302f6d2de83b5c98f9046167c5262467 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 Nov 2020 08:58:54 +0100 Subject: [PATCH 1/4] k/fetch_request: remove obsolete code comment Signed-off-by: Michal Maslanka --- src/v/kafka/requests/fetch_request.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/v/kafka/requests/fetch_request.cc b/src/v/kafka/requests/fetch_request.cc index 5c6a1ea9ea16..ba2dae0743ed 100644 --- a/src/v/kafka/requests/fetch_request.cc +++ b/src/v/kafka/requests/fetch_request.cc @@ -523,7 +523,6 @@ static ss::future<> fetch_topic_partitions(op_context& octx) { octx.for_each_fetch_partition( [&resp_it, &octx, &fetches](fetch_partition p) { - // we use gate as we may not wait for all the fetch results return fetches.push_back(fetch_topic_partition(octx, p, resp_it++)); }); @@ -540,7 +539,7 @@ static ss::future<> fetch_topic_partitions(op_context& octx) { fetch_reads_debounce_timeout, octx.request.max_wait_time)); }); }); -} // namespace kafka +} ss::future fetch_api::process(request_context&& rctx, ss::smp_service_group ssg) { @@ -710,7 +709,6 @@ op_context::response_iterator::response_iterator( void op_context::response_iterator::set( fetch_response::partition_response&& response) { - // we are already done, for now ignore what we read vassert( response.id == _it->partition_response->id, "Response and current partition ids have to be the same. Current " From cd7b7fabbebed8fbbad63b7daa9096da397d4613 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 Nov 2020 09:39:49 +0100 Subject: [PATCH 2/4] configuration: changed default group_max_session_timeout_ms to 300s In order to have the same defaults as kafka to work out of the box with most use cases change the default `group_max_session_timeout_ms` to be in align with the default from Kafka. With current setup Faust quick start guide failed. Signed-off-by: Michal Maslanka --- src/v/config/configuration.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 487dddc66148..a8902fca8f37 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -184,7 +184,7 @@ configuration::configuration() "heartbeats at the cost of a longer time to detect failures. " "Default quota tracking window size in milliseconds", required::no, - 30'000ms) + 300s) , group_initial_rebalance_delay( *this, "group_initial_rebalance_delay", From 0b12ddf2e91283424f997e7f77160a4cab3e7c93 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 Nov 2020 09:45:35 +0100 Subject: [PATCH 3/4] k/fetch_request: fixed iterating over partition responses The fetch_response iterator wrapper created by operation context was incorrectly incrementing only the partition_response iterator. This caused it to access out of range elements when multiple topics were present in the request. Fixes: #180 Signed-off-by: Michal Maslanka --- src/v/kafka/requests/fetch_request.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/kafka/requests/fetch_request.cc b/src/v/kafka/requests/fetch_request.cc index ba2dae0743ed..8bcfc3b2c92d 100644 --- a/src/v/kafka/requests/fetch_request.cc +++ b/src/v/kafka/requests/fetch_request.cc @@ -750,7 +750,7 @@ void op_context::response_iterator::set( } op_context::response_iterator& op_context::response_iterator::operator++() { - _it->partition_response++; + _it++; return *this; } From ee87535b1bf6e9cb4b5b4b89e35fd707bac63951 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 26 Nov 2020 09:48:06 +0100 Subject: [PATCH 4/4] k/fetch_test: added multiple topic fetch test Added test reproducing #180 i.e. redpanda failure when multiple topic partitions were present in fetch request. Signed-off-by: Michal Maslanka --- src/v/kafka/tests/fetch_test.cc | 132 ++++++++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 16 deletions(-) diff --git a/src/v/kafka/tests/fetch_test.cc b/src/v/kafka/tests/fetch_test.cc index 685ba6de67c8..50c8a52c5ac9 100644 --- a/src/v/kafka/tests/fetch_test.cc +++ b/src/v/kafka/tests/fetch_test.cc @@ -253,14 +253,12 @@ FIXTURE_TEST(fetch_one, redpanda_thread_fixture) { } } -SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { - kafka::fetch_response response; - - auto make_partition = [](ss::sstring topic) { +FIXTURE_TEST(fetch_response_iterator_test, redpanda_thread_fixture) { + static auto make_partition = [](ss::sstring topic) { return kafka::fetch_response::partition(model::topic(std::move(topic))); }; - auto make_partition_response = [](int id) { + static auto make_partition_response = [](int id) { kafka::fetch_response::partition_response resp; resp.error = kafka::error_code::none; resp.id = model::partition_id(id); @@ -268,26 +266,33 @@ SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { return resp; }; - response.partitions.push_back(make_partition("tp-1")); - response.partitions.push_back(make_partition("tp-2")); - response.partitions.push_back(make_partition("tp-3")); - - response.partitions[0].responses.push_back(make_partition_response(0)); - response.partitions[0].responses.push_back(make_partition_response(1)); - response.partitions[0].responses.push_back(make_partition_response(2)); + auto make_test_fetch_response = []() { + kafka::fetch_response response; + response.partitions.push_back(make_partition("tp-1")); + response.partitions.push_back(make_partition("tp-2")); + response.partitions.push_back(make_partition("tp-3")); - response.partitions[1].responses.push_back(make_partition_response(0)); + response.partitions[0].responses.push_back(make_partition_response(0)); + response.partitions[0].responses.push_back(make_partition_response(1)); + response.partitions[0].responses.push_back(make_partition_response(2)); - response.partitions[2].responses.push_back(make_partition_response(0)); - response.partitions[2].responses.push_back(make_partition_response(1)); + response.partitions[1].responses.push_back(make_partition_response(0)); + response.partitions[2].responses.push_back(make_partition_response(0)); + response.partitions[2].responses.push_back(make_partition_response(1)); + return response; + }; + kafka::op_context ctx( + make_request_context(), ss::default_smp_service_group()); + auto response = make_test_fetch_response(); + ctx.response = make_test_fetch_response(); + auto wrapper_iterator = ctx.response_begin(); int i = 0; for (auto it = response.begin(); it != response.end(); ++it) { if (i < 3) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-1"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), i); - } else if (i == 3) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-2"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), 0); @@ -295,7 +300,13 @@ SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-3"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), i - 4); } + BOOST_REQUIRE_EQUAL( + it->partition->name, wrapper_iterator->partition->name); + BOOST_REQUIRE_EQUAL( + wrapper_iterator->partition_response->id, + wrapper_iterator->partition_response->id); ++i; + ++wrapper_iterator; } }; @@ -465,3 +476,92 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { BOOST_REQUIRE(resp.partitions[0].responses[0].record_set); BOOST_REQUIRE(resp.partitions[0].responses[0].record_set->size_bytes() > 0); } + +FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { + // create a topic partition with some data + model::topic topic_1("foo"); + model::topic topic_2("bar"); + model::offset zero(0); + wait_for_controller_leadership().get0(); + + add_topic(model::topic_namespace(model::ns("kafka"), topic_1), 6).get(); + add_topic(model::topic_namespace(model::ns("kafka"), topic_2), 1).get(); + + std::vector ntps = {}; + // topic 1 + for (int i = 0; i < 6; ++i) { + ntps.push_back(make_default_ntp(topic_1, model::partition_id(i))); + wait_for_partition_offset(ntps.back(), model::offset(0)).get0(); + } + // topic 2 + ntps.push_back(make_default_ntp(topic_2, model::partition_id(0))); + wait_for_partition_offset(ntps.back(), model::offset(0)).get0(); + + // request + kafka::fetch_request req; + req.max_bytes = std::numeric_limits::max(); + req.min_bytes = 1; + req.max_wait_time = std::chrono::milliseconds(3000); + req.session_id = kafka::invalid_fetch_session_id; + req.topics = { + { + .name = topic_1, + .partitions = {}, + }, + { + .name = topic_2, + .partitions = {}, + }}; + + for (auto& ntp : ntps) { + kafka::fetch_request::partition p; + p.id = model::partition_id(ntp.tp.partition); + p.log_start_offset = zero; + p.fetch_offset = zero; + p.partition_max_bytes = std::numeric_limits::max(); + auto idx = ntp.tp.topic == topic_1 ? 0 : 1; + req.topics[idx].partitions.push_back(p); + } + + auto client = make_kafka_client().get0(); + client.connect().get(); + // add date to all partitions + for (auto& ntp : ntps) { + auto shard = app.shard_table.local().shard_for(ntp); + auto r = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + auto batches = storage::test::make_random_batches( + model::offset(0), 5); + auto rdr = model::make_memory_record_batch_reader( + std::move(batches)); + return partition->replicate( + std::move(rdr), + raft::replicate_options( + raft::consistency_level::quorum_ack)); + }) + .get0(); + } + + auto resp = client.dispatch(req, kafka::api_version(4)).get0(); + client.stop().then([&client] { client.shutdown(); }).get(); + + BOOST_REQUIRE_EQUAL(resp.partitions.size(), 2); + BOOST_REQUIRE_EQUAL(resp.partitions[0].name, topic_1); + BOOST_REQUIRE_EQUAL(resp.partitions[1].name, topic_2); + BOOST_REQUIRE_EQUAL(resp.partitions[0].responses.size(), 6); + BOOST_REQUIRE_EQUAL(resp.partitions[1].responses.size(), 1); + size_t total_size = 0; + for (int i = 0; i < 6; ++i) { + BOOST_REQUIRE_EQUAL( + resp.partitions[0].responses[i].error, kafka::error_code::none); + BOOST_REQUIRE_EQUAL( + resp.partitions[0].responses[i].id, model::partition_id(i)); + BOOST_REQUIRE(resp.partitions[0].responses[i].record_set); + + total_size += resp.partitions[0].responses[i].record_set->size_bytes(); + } + BOOST_REQUIRE_GT(total_size, 0); +}