Skip to content

Commit

Permalink
gms: pass endpoint_state_ptr to endpoint_state change subscribers
Browse files Browse the repository at this point in the history
Now that the endpoint_state isn't change in place
we do not need to copy it to each subscriber.
We can rather just pass the lw_shared_ptr holding
a snapshot of it.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Aug 31, 2023
1 parent 1d04242 commit c16ec87
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 81 deletions.
4 changes: 2 additions & 2 deletions cdc/generation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,10 @@ future<> generation_service::leave_ring() {
co_await _gossiper.unregister_(shared_from_this());
}

future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state, gms::permit_id pid) {
future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
assert_shard_zero(__PRETTY_FUNCTION__);

auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
auto val = ep_state->get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
if (!val) {
return make_ready_future();
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/generation_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public:
return _cdc_metadata;
}

virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
virtual future<> on_alive(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_dead(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
virtual future<> before_change(gms::inet_address, gms::endpoint_state_ptr, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
virtual future<> on_alive(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_dead(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_remove(gms::inet_address, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_restart(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_restart(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }

virtual future<> on_join(gms::inet_address, gms::endpoint_state, gms::permit_id) override;
virtual future<> on_join(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override;
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&, gms::permit_id) override;

future<> check_and_repair_cdc_streams();
Expand Down
10 changes: 5 additions & 5 deletions gms/feature_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
, _sys_ks(s)
{
}
future<> on_join(inet_address ep, endpoint_state state, gms::permit_id) override {
future<> on_join(inet_address ep, endpoint_state_ptr state, gms::permit_id) override {
return enable_features();
}
future<> on_change(inet_address ep, application_state state, const versioned_value&, gms::permit_id) override {
Expand All @@ -232,11 +232,11 @@ class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
}
return make_ready_future();
}
future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); }
future<> on_alive(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> on_dead(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> before_change(inet_address, endpoint_state_ptr, application_state, const versioned_value&) override { return make_ready_future(); }
future<> on_alive(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }
future<> on_dead(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }
future<> on_remove(inet_address, gms::permit_id) override { return make_ready_future(); }
future<> on_restart(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> on_restart(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }

future<> enable_features();
};
Expand Down
17 changes: 7 additions & 10 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,7 @@ future<> gossiper::real_mark_alive(inet_address addr) {
}

co_await _subscribers.for_each([addr, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, *es, pid);
co_await subscriber->on_alive(addr, es, pid);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
Expand All @@ -1686,10 +1686,7 @@ future<> gossiper::mark_dead(inet_address addr, endpoint_state_ptr state, permit
future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps, permit_id pid) {
verify_permit(ep, pid);

std::optional<endpoint_state> eps_old;
if (auto p = get_endpoint_state_ptr(ep)) {
eps_old = *p;
}
endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);

if (!is_dead_state(eps) && !is_in_shadow_round()) {
if (_endpoint_state_map.contains(ep)) {
Expand All @@ -1714,7 +1711,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
if (eps_old) {
// the node restarted: it is up to the subscriber to take whatever action is necessary
co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_restart(ep, *eps_old, pid);
return subscriber->on_restart(ep, eps_old, pid);
});
}

Expand All @@ -1732,7 +1729,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
auto eps_new = get_endpoint_state_ptr(ep);
if (eps_new) {
co_await _subscribers.for_each([ep, eps_new, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_join(ep, *eps_new, pid);
return subscriber->on_join(ep, eps_new, pid);
});
}
// check this at the end so nodes will learn about the endpoint
Expand Down Expand Up @@ -1845,7 +1842,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta
maybe_rethrow_exception(std::move(ep));
}

future<> gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) {
future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) {
co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->before_change(addr, ep_state, ap_state, new_value);
});
Expand All @@ -1862,7 +1859,7 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati

future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) {
co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_dead(addr, *state, pid);
return subscriber->on_dead(addr, state, pid);
});
}

Expand Down Expand Up @@ -2190,7 +2187,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
auto& value = p.second;
// Fire "before change" notifications:
// Not explicit, but apparently we allow this to defer (inside out implicit seastar::async)
gossiper.do_before_change_notifications(ep_addr, *ep_state_before, state, value).get();
gossiper.do_before_change_notifications(ep_addr, ep_state_before, state, value).get();
}

auto es = gossiper.get_endpoint_state_ptr(ep_addr);
Expand Down
2 changes: 1 addition & 1 deletion gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private:

// notify that a local application state is going to change (doesn't get triggered for remote changes)
// Must be called under lock_endpoint.
future<> do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value);
future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value);

// notify that an application state has changed
// Must be called under lock_endpoint.
Expand Down
10 changes: 5 additions & 5 deletions gms/i_endpoint_state_change_subscriber.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public:
* @param endpoint endpoint for which the state change occurred.
* @param epState state that actually changed for the above endpoint.
*/
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state, permit_id) = 0;
virtual future<> on_join(inet_address endpoint, endpoint_state_ptr ep_state, permit_id) = 0;

virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0;
virtual future<> before_change(inet_address endpoint, endpoint_state_ptr current_state, application_state new_statekey, const versioned_value& newvalue) = 0;

virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, permit_id) = 0;

virtual future<> on_alive(inet_address endpoint, endpoint_state state, permit_id) = 0;
virtual future<> on_alive(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0;

virtual future<> on_dead(inet_address endpoint, endpoint_state state, permit_id) = 0;
virtual future<> on_dead(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0;

virtual future<> on_remove(inet_address endpoint, permit_id) = 0;

Expand All @@ -60,7 +60,7 @@ public:
* previously marked down. It will have only if {@code state.isAlive() == false}
* as {@code state} is from before the restarted node is marked up.
*/
virtual future<> on_restart(inet_address endpoint, endpoint_state state, permit_id) = 0;
virtual future<> on_restart(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0;
};

} // namespace gms
10 changes: 5 additions & 5 deletions repair/row_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2985,13 +2985,13 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
}
virtual future<> on_join(
gms::inet_address endpoint,
gms::endpoint_state ep_state,
gms::endpoint_state_ptr ep_state,
gms::permit_id) override {
return make_ready_future();
}
virtual future<> before_change(
gms::inet_address endpoint,
gms::endpoint_state current_state,
gms::endpoint_state_ptr current_state,
gms::application_state new_state_key,
const gms::versioned_value& new_value) override {
return make_ready_future();
Expand All @@ -3005,13 +3005,13 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
}
virtual future<> on_alive(
gms::inet_address endpoint,
gms::endpoint_state state,
gms::endpoint_state_ptr state,
gms::permit_id) override {
return make_ready_future();
}
virtual future<> on_dead(
gms::inet_address endpoint,
gms::endpoint_state state,
gms::endpoint_state_ptr state,
gms::permit_id) override {
return remove_row_level_repair(endpoint);
}
Expand All @@ -3022,7 +3022,7 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
}
virtual future<> on_restart(
gms::inet_address endpoint,
gms::endpoint_state ep_state,
gms::endpoint_state_ptr ep_state,
gms::permit_id) override {
return remove_row_level_repair(endpoint);
}
Expand Down
12 changes: 6 additions & 6 deletions service/load_broadcaster.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public:
return make_ready_future();
}

virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id pid) override {
auto* local_value = ep_state.get_application_state_ptr(gms::application_state::LOAD);
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id pid) override {
auto* local_value = ep_state->get_application_state_ptr(gms::application_state::LOAD);
if (local_value) {
return on_change(endpoint, gms::application_state::LOAD, *local_value, pid);
}
return make_ready_future();
}

virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); }
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); }

future<> on_alive(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }

future<> on_dead(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }

future<> on_restart(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); }

virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override {
_load_info.erase(endpoint);
Expand Down
8 changes: 4 additions & 4 deletions service/migration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1199,8 +1199,8 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
}

future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) {
schedule_schema_pull(endpoint, ep_state);
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) {
schedule_schema_pull(endpoint, *ep_state);
return make_ready_future();
}

Expand All @@ -1218,8 +1218,8 @@ future<> migration_manager::on_change(gms::inet_address endpoint, gms::applicati
return make_ready_future();
}

future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) {
schedule_schema_pull(endpoint, state);
future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) {
schedule_schema_pull(endpoint, *state);
return make_ready_future();
}

Expand Down
10 changes: 5 additions & 5 deletions service/migration_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ public:
future<schema_ptr> get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source* as = nullptr);

private:
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override;
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) override;
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override;
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override;
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override { return make_ready_future(); }
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); }
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); }
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }

public:
// For tests only.
Expand Down
20 changes: 10 additions & 10 deletions service/raft/raft_group_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
raft_address_map& _address_map;

future<>
on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state ep_state) {
auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID);
on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state) {
auto app_state_ptr = ep_state->get_application_state_ptr(gms::application_state::HOST_ID);
if (app_state_ptr) {
raft::server_id id(utils::UUID(app_state_ptr->value()));
rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change() {} {}", endpoint, id);
_address_map.add_or_update_entry(id, endpoint, ep_state.get_heart_beat_state().get_generation());
_address_map.add_or_update_entry(id, endpoint, ep_state->get_heart_beat_state().get_generation());
}
return make_ready_future<>();
}
Expand All @@ -97,13 +97,13 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
{}

virtual future<>
on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override {
return on_endpoint_change(endpoint, ep_state);
}

virtual future<>
before_change(gms::inet_address endpoint,
gms::endpoint_state current_state, gms::application_state new_statekey,
gms::endpoint_state_ptr current_state, gms::application_state new_statekey,
const gms::versioned_value& newvalue) override {
// Raft server ID never changes - do nothing
return make_ready_future<>();
Expand All @@ -117,9 +117,9 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
}

virtual future<>
on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, &ep_state] (auto& handler) -> future<> {
auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID);
on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override {
co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, ep_state] (auto& handler) -> future<> {
auto app_state_ptr = ep_state->get_application_state_ptr(gms::application_state::HOST_ID);
if (!app_state_ptr) {
co_return;
}
Expand All @@ -139,7 +139,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
}

virtual future<>
on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override {
on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override {
return make_ready_future<>();
}

Expand All @@ -152,7 +152,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang
}

virtual future<>
on_restart(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override {
return on_endpoint_change(endpoint, ep_state);
}
};
Expand Down

0 comments on commit c16ec87

Please sign in to comment.