Skip to content

Commit

Permalink
id_allocator: make it return a result<...> instead of a homemade type
Browse files Browse the repository at this point in the history
That's mostly for later use in raft_fixture::retry_with_leader
  • Loading branch information
bashtanov committed May 10, 2024
1 parent 94e99f0 commit 07e714f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 29 deletions.
16 changes: 8 additions & 8 deletions src/v/cluster/id_allocator_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ reset_id_handler::process(ss::shard_id shard, reset_id_allocator_request req) {
}
return stm->reset_next_id(id, timeout)
.then([](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
r.assume_error().message());
return reset_id_allocator_reply{errc::replication_error};
}

Expand All @@ -146,7 +146,7 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get partition by {} ntp",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
auto stm = partition->id_allocator_stm();
if (!stm) {
Expand All @@ -155,19 +155,19 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get id allocator stm of the {}' partition",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
return stm->allocate_id(timeout).then(
[](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
return allocate_id_reply{r.id, errc::replication_error};
r.assume_error().message());
return allocate_id_reply{-1, errc::replication_error};
}

return allocate_id_reply{r.id, errc::success};
return allocate_id_reply{r.assume_value(), errc::success};
});
});
}
Expand Down
30 changes: 16 additions & 14 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,28 @@ id_allocator_stm::reset_next_id(
return _lock
.with(
timeout, [this, id, timeout]() { return advance_state(id, timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::advance_state(
int64_t value, model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
if (value < _curr_id) {
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return _curr_id;
}
_curr_id = value;
auto success = co_await set_state(_curr_id + _batch_size, timeout);
if (!success) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return stm_allocation_result(_curr_id);
}

ss::future<bool> id_allocator_stm::set_state(
Expand All @@ -123,31 +124,32 @@ ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::allocate_id(model::timeout_clock::duration timeout) {
return _lock
.with(timeout, [this, timeout]() { return do_allocate_id(timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}

if (_curr_batch == 0) {
_curr_id = _state;
if (!co_await set_state(_curr_id + _batch_size, timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
}

auto id = _curr_id;
int64_t id = _curr_id;

_curr_id += 1;
_curr_batch -= 1;

co_return stm_allocation_result{id, raft::errc::success};
co_return stm_allocation_result{id};
}

ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
public:
static constexpr std::string_view name = "id_allocator_stm";

struct stm_allocation_result {
int64_t id;
raft::errc raft_status{raft::errc::success};
};
using stm_allocation_result = result<int64_t>;

explicit id_allocator_stm(ss::logger&, raft::consensus*);

Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/tests/id_allocator_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ struct id_allocator_stm_fixture : simple_raft_fixture {
for (int i = 0; i < n; i++) {
auto result = _stm->allocate_id(1s).get0();

BOOST_REQUIRE_EQUAL(raft::errc::success, result.raft_status);
BOOST_REQUIRE_LT(cur_last_id, result.id);
BOOST_REQUIRE_EQUAL(result.has_value(), true);
BOOST_REQUIRE_LT(cur_last_id, result.assume_value());

cur_last_id = result.id;
cur_last_id = result.assume_value();
}
return cur_last_id;
}
Expand Down

0 comments on commit 07e714f

Please sign in to comment.