Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/random: futurize make_random_batches #18388

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/v/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,8 @@ class archiver_cluster_fixture
acks);

// Make test data
auto batches = model::test::make_random_batches(
model::offset(0), count);
auto batches
= model::test::make_random_batches(model::offset(0), count).get();

auto partition = node->partition_manager.local().get(ntp);

Expand Down
42 changes: 24 additions & 18 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,19 @@ segment_layout write_random_batches(
int full_batches_count = records / records_per_batch;
// If the segment already includes records, start at the next offset.
auto full_batches = model::test::make_random_batches(
model::test::record_batch_spec{
.offset
= seg->offsets().get_committed_offset() == model::offset{}
? seg->offsets().get_base_offset()
: (seg->offsets().get_committed_offset() + model::offset_delta{1}),
.allow_compression = true,
.count = full_batches_count,
.records = records_per_batch,
.max_key_cardinality = 1,
.timestamp = timestamp});
model::test::record_batch_spec{
.offset = seg->offsets().get_committed_offset()
== model::offset{}
? seg->offsets().get_base_offset()
: (
seg->offsets().get_committed_offset()
+ model::offset_delta{1}),
.allow_compression = true,
.count = full_batches_count,
.records = records_per_batch,
.max_key_cardinality = 1,
.timestamp = timestamp})
.get();

vlog(
fixt_log.debug,
Expand All @@ -146,14 +149,17 @@ segment_layout write_random_batches(
write_batches(seg, std::move(full_batches));

if (leftover_records != 0) {
auto leftover_batches = model::test::make_random_batches(
model::test::record_batch_spec{
.offset = seg->offsets().get_committed_offset() + model::offset(1),
.allow_compression = true,
.count = 1,
.records = leftover_records,
.max_key_cardinality = 1,
.timestamp = timestamp});
auto leftover_batches
= model::test::make_random_batches(
model::test::record_batch_spec{
.offset = seg->offsets().get_committed_offset()
+ model::offset(1),
.allow_compression = true,
.count = 1,
.records = leftover_records,
.max_key_cardinality = 1,
.timestamp = timestamp})
.get();

for (const auto& batch : leftover_batches) {
vlog(fixt_log.debug, "Generated random batch {}", batch);
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/common_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ inline iobuf iobuf_deep_copy(const iobuf& i) {
};

inline iobuf generate_segment(model::offset base_offset, int count) {
auto buff = model::test::make_random_batches(base_offset, count, false);
auto buff
= model::test::make_random_batches(base_offset, count, false).get();
iobuf result;
for (auto&& batch : buff) {
auto hdr = storage::batch_header_to_disk_iobuf(batch.header());
Expand Down
15 changes: 8 additions & 7 deletions src/v/cluster/tests/idempotency_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,14 @@ FIXTURE_TEST(test_rm_stm_prevents_odd_session_start_off, rm_stm_test_fixture) {

auto count = 5;
auto rdr = random_batches_reader(model::test::record_batch_spec{
.offset = model::offset(0),
.allow_compression = true,
.count = count,
.enable_idempotence = true,
.producer_id = 0,
.producer_epoch = 0,
.base_sequence = 1});
.offset = model::offset(0),
.allow_compression = true,
.count = count,
.enable_idempotence = true,
.producer_id = 0,
.producer_epoch = 0,
.base_sequence = 1})
.get();

auto bid = model::batch_identity{
.pid = model::producer_identity{0, 0},
Expand Down
51 changes: 27 additions & 24 deletions src/v/cluster/tests/rebalancing_tests_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,33 @@ class rebalancing_tests_fixture : public cluster_test_fixture {
auto retries = 10;
foreign_batches_t ret;
auto single_retry = [count, ntp](cluster::partition_manager& pm) {
auto batches = model::test::make_random_batches(
model::offset(0), count);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
// replicate
auto f = pm.get(ntp)->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::quorum_ack));

return ss::with_timeout(
model::timeout_clock::now() + 2s, std::move(f))
.then([&pm, ntp](result<raft::replicate_result> res) {
auto p = pm.get(ntp);
return p->make_reader(storage::log_reader_config(
model::offset(0),
p->committed_offset(),
ss::default_priority_class()));
})
.then([](model::record_batch_reader r) {
return model::consume_reader_to_memory(
std::move(r), model::no_timeout)
.then([](batches_t batches) {
return ss::make_foreign<batches_ptr_t>(
ss::make_lw_shared<batches_t>(std::move(batches)));
return model::test::make_random_batches(model::offset(0), count)
.then([&pm, ntp](auto batches) {
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
// replicate
auto f = pm.get(ntp)->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));

return ss::with_timeout(
model::timeout_clock::now() + 2s, std::move(f))
.then([&pm, ntp](result<raft::replicate_result> res) {
auto p = pm.get(ntp);
return p->make_reader(storage::log_reader_config(
model::offset(0),
p->committed_offset(),
ss::default_priority_class()));
})
.then([](model::record_batch_reader r) {
return model::consume_reader_to_memory(
std::move(r), model::no_timeout)
.then([](batches_t batches) {
return ss::make_foreign<batches_ptr_t>(
ss::make_lw_shared<batches_t>(
std::move(batches)));
});
});
});
};
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1944,8 +1944,8 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
const auto gold = model::test::make_random_batches(
model::offset(0), 20);
const auto gold
= model::test::make_random_batches(model::offset(0), 20).get();

// make a copy of the source batches for later comparison because the
// copy moved into the request will get eaten.
Expand Down
2 changes: 1 addition & 1 deletion src/v/compat/raft_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ struct instance_generator<raft::append_entries_request> {
instance_generator<raft::vnode>::random(),
instance_generator<raft::protocol_metadata>::random(),
model::make_memory_record_batch_reader(
model::test::make_random_batches(model::offset(0), 3, false)),
model::test::make_random_batches(model::offset(0), 3, false).get()),
raft::flush_after_append(tests::random_bool()),
};
}
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/protocol/tests/batch_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ struct context {
};

context make_context(model::offset base_offset, size_t batch_count) {
auto input = model::test::make_random_batches(base_offset, batch_count);
auto input
= model::test::make_random_batches(base_offset, batch_count).get();
BOOST_REQUIRE(!input.empty());
const auto last_offset = input.back().last_offset();

Expand Down
98 changes: 54 additions & 44 deletions src/v/kafka/server/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ FIXTURE_TEST(fetch_leader_epoch, redpanda_thread_fixture) {
[ntp, this](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
{
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto batches
= model::test::make_random_batches(model::offset(0), 5).get();
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
partition->raft()
Expand All @@ -415,8 +415,8 @@ FIXTURE_TEST(fetch_leader_epoch, redpanda_thread_fixture) {
partition->raft()->step_down("trigger epoch change").get0();
wait_for_leader(ntp).get0();
{
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto batches
= model::test::make_random_batches(model::offset(0), 5).get();
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
partition->raft()
Expand Down Expand Up @@ -496,14 +496,16 @@ FIXTURE_TEST(fetch_multi_partitions_debounce, redpanda_thread_fixture) {
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::quorum_ack));
return model::test::make_random_batches(model::offset(0), 5)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
});
})
.discard_result()
.get0();
Expand Down Expand Up @@ -563,14 +565,16 @@ FIXTURE_TEST(fetch_leader_ack, redpanda_thread_fixture) {
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::leader_ack));
return model::test::make_random_batches(model::offset(0), 5)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::leader_ack));
});
})
.discard_result()
.get0();
Expand Down Expand Up @@ -620,14 +624,16 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) {
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::quorum_ack));
return model::test::make_random_batches(model::offset(0), 5)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
});
})
.discard_result()
.get0();
Expand Down Expand Up @@ -699,14 +705,16 @@ FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) {
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::quorum_ack));
return model::test::make_random_batches(model::offset(0), 5)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
});
})
.discard_result()
.get0();
Expand Down Expand Up @@ -750,14 +758,16 @@ FIXTURE_TEST(fetch_request_max_bytes, redpanda_thread_fixture) {
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = model::test::make_random_batches(
model::offset(0), 20);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(raft::consistency_level::quorum_ack));
return model::test::make_random_batches(model::offset(0), 20)
.then([ntp, &mgr](auto batches) {
auto partition = mgr.get(ntp);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->raft()->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
});
})
.get();

Expand Down
23 changes: 12 additions & 11 deletions src/v/kafka/server/tests/topic_recreate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,18 @@ FIXTURE_TEST(test_recreated_topic_does_not_lose_data, recreate_test_fixture) {
.invoke_on(
*shard_id,
[ntp](cluster::partition_manager& pm) {
auto batches = model::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
auto p = pm.get(ntp);
return p->raft()
->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack))
.then([p](auto) { return p->committed_offset(); });
return model::test::make_random_batches(model::offset(0), 5)
.then([&pm, ntp](auto batches) {
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
auto p = pm.get(ntp);
return p->raft()
->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack))
.then([p](auto) { return p->committed_offset(); });
});
})
.get0();
info("Restarting redpanda");
Expand Down
Loading
Loading