From c16ec870dade60f5a588056f77eb3492c27d6d18 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 21 Jul 2023 11:49:18 +0300 Subject: [PATCH] gms: pass endpoint_state_ptr to endpoint_state change subscribers Now that the endpoint_state isn't change in place we do not need to copy it to each subscriber. We can rather just pass the lw_shared_ptr holding a snapshot of it. Signed-off-by: Benny Halevy --- cdc/generation.cc | 4 ++-- cdc/generation_service.hh | 10 +++++----- gms/feature_service.cc | 10 +++++----- gms/gossiper.cc | 17 +++++++---------- gms/gossiper.hh | 2 +- gms/i_endpoint_state_change_subscriber.hh | 10 +++++----- repair/row_level.cc | 10 +++++----- service/load_broadcaster.hh | 12 ++++++------ service/migration_manager.cc | 8 ++++---- service/migration_manager.hh | 10 +++++----- service/raft/raft_group_registry.cc | 20 ++++++++++---------- service/storage_service.cc | 12 ++++++------ service/storage_service.hh | 10 +++++----- service/view_update_backlog_broker.hh | 10 +++++----- streaming/stream_manager.cc | 4 ++-- streaming/stream_manager.hh | 11 ++++++----- 16 files changed, 79 insertions(+), 81 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index 29dcc2af32ce..5dd1c2fcdf72 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -802,10 +802,10 @@ future<> generation_service::leave_ring() { co_await _gossiper.unregister_(shared_from_this()); } -future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state, gms::permit_id pid) { +future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state_ptr ep_state, gms::permit_id pid) { assert_shard_zero(__PRETTY_FUNCTION__); - auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID); + auto val = ep_state->get_application_state_ptr(gms::application_state::CDC_GENERATION_ID); if (!val) { return make_ready_future(); } diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index 1d7f8756cf66..be268bb8fbf6 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -104,13 +104,13 @@ public: return _cdc_metadata; } - virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } - virtual future<> on_alive(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_dead(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address, gms::endpoint_state_ptr, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } + virtual future<> on_alive(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_dead(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } virtual future<> on_remove(gms::inet_address, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_restart(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_join(gms::inet_address, gms::endpoint_state, gms::permit_id) override; + virtual future<> on_join(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override; virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&, gms::permit_id) override; future<> check_and_repair_cdc_streams(); diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 87588c7a91f6..89120c595e79 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -223,7 +223,7 @@ class persistent_feature_enabler : public i_endpoint_state_change_subscriber { , _sys_ks(s) { } - future<> on_join(inet_address ep, endpoint_state state, gms::permit_id) override { + future<> on_join(inet_address ep, endpoint_state_ptr state, gms::permit_id) override { return enable_features(); } future<> on_change(inet_address ep, application_state state, const versioned_value&, gms::permit_id) override { @@ -232,11 +232,11 @@ class persistent_feature_enabler : public i_endpoint_state_change_subscriber { } return make_ready_future(); } - future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); } - future<> on_alive(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); } - future<> on_dead(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); } + future<> before_change(inet_address, endpoint_state_ptr, application_state, const versioned_value&) override { return make_ready_future(); } + future<> on_alive(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } + future<> on_dead(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } future<> on_remove(inet_address, gms::permit_id) override { return make_ready_future(); } - future<> on_restart(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); } + future<> on_restart(inet_address, endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } future<> enable_features(); }; diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 04a18d7dee30..e7895c076792 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1667,7 +1667,7 @@ future<> gossiper::real_mark_alive(inet_address addr) { } co_await _subscribers.for_each([addr, es, pid = permit.id()] (shared_ptr subscriber) -> future<> { - co_await subscriber->on_alive(addr, *es, pid); + co_await subscriber->on_alive(addr, es, pid); logger.trace("Notified {}", fmt::ptr(subscriber.get())); }); } @@ -1686,10 +1686,7 @@ future<> gossiper::mark_dead(inet_address addr, endpoint_state_ptr state, permit future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps, permit_id pid) { verify_permit(ep, pid); - std::optional eps_old; - if (auto p = get_endpoint_state_ptr(ep)) { - eps_old = *p; - } + endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep); if (!is_dead_state(eps) && !is_in_shadow_round()) { if (_endpoint_state_map.contains(ep)) { @@ -1714,7 +1711,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta if (eps_old) { // the node restarted: it is up to the subscriber to take whatever action is necessary co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr subscriber) { - return subscriber->on_restart(ep, *eps_old, pid); + return subscriber->on_restart(ep, eps_old, pid); }); } @@ -1732,7 +1729,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta auto eps_new = get_endpoint_state_ptr(ep); if (eps_new) { co_await _subscribers.for_each([ep, eps_new, pid] (shared_ptr subscriber) { - return subscriber->on_join(ep, *eps_new, pid); + return subscriber->on_join(ep, eps_new, pid); }); } // check this at the end so nodes will learn about the endpoint @@ -1845,7 +1842,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta maybe_rethrow_exception(std::move(ep)); } -future<> gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { +future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) { co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { return subscriber->before_change(addr, ep_state, ap_state, new_value); }); @@ -1862,7 +1859,7 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) { co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr subscriber) { - return subscriber->on_dead(addr, *state, pid); + return subscriber->on_dead(addr, state, pid); }); } @@ -2190,7 +2187,7 @@ future<> gossiper::add_local_application_state(std::list do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); + future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value); // notify that an application state has changed // Must be called under lock_endpoint. diff --git a/gms/i_endpoint_state_change_subscriber.hh b/gms/i_endpoint_state_change_subscriber.hh index ae710d11c8a7..2168dd864604 100644 --- a/gms/i_endpoint_state_change_subscriber.hh +++ b/gms/i_endpoint_state_change_subscriber.hh @@ -42,15 +42,15 @@ public: * @param endpoint endpoint for which the state change occurred. * @param epState state that actually changed for the above endpoint. */ - virtual future<> on_join(inet_address endpoint, endpoint_state ep_state, permit_id) = 0; + virtual future<> on_join(inet_address endpoint, endpoint_state_ptr ep_state, permit_id) = 0; - virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0; + virtual future<> before_change(inet_address endpoint, endpoint_state_ptr current_state, application_state new_statekey, const versioned_value& newvalue) = 0; virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, permit_id) = 0; - virtual future<> on_alive(inet_address endpoint, endpoint_state state, permit_id) = 0; + virtual future<> on_alive(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0; - virtual future<> on_dead(inet_address endpoint, endpoint_state state, permit_id) = 0; + virtual future<> on_dead(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0; virtual future<> on_remove(inet_address endpoint, permit_id) = 0; @@ -60,7 +60,7 @@ public: * previously marked down. It will have only if {@code state.isAlive() == false} * as {@code state} is from before the restarted node is marked up. */ - virtual future<> on_restart(inet_address endpoint, endpoint_state state, permit_id) = 0; + virtual future<> on_restart(inet_address endpoint, endpoint_state_ptr state, permit_id) = 0; }; } // namespace gms diff --git a/repair/row_level.cc b/repair/row_level.cc index f8a029b30717..98727ac8fbd2 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2985,13 +2985,13 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc } virtual future<> on_join( gms::inet_address endpoint, - gms::endpoint_state ep_state, + gms::endpoint_state_ptr ep_state, gms::permit_id) override { return make_ready_future(); } virtual future<> before_change( gms::inet_address endpoint, - gms::endpoint_state current_state, + gms::endpoint_state_ptr current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override { return make_ready_future(); @@ -3005,13 +3005,13 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc } virtual future<> on_alive( gms::inet_address endpoint, - gms::endpoint_state state, + gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); } virtual future<> on_dead( gms::inet_address endpoint, - gms::endpoint_state state, + gms::endpoint_state_ptr state, gms::permit_id) override { return remove_row_level_repair(endpoint); } @@ -3022,7 +3022,7 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc } virtual future<> on_restart( gms::inet_address endpoint, - gms::endpoint_state ep_state, + gms::endpoint_state_ptr ep_state, gms::permit_id) override { return remove_row_level_repair(endpoint); } diff --git a/service/load_broadcaster.hh b/service/load_broadcaster.hh index 60715b878c40..8075028dc039 100644 --- a/service/load_broadcaster.hh +++ b/service/load_broadcaster.hh @@ -42,21 +42,21 @@ public: return make_ready_future(); } - virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id pid) override { - auto* local_value = ep_state.get_application_state_ptr(gms::application_state::LOAD); + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id pid) override { + auto* local_value = ep_state->get_application_state_ptr(gms::application_state::LOAD); if (local_value) { return on_change(endpoint, gms::application_state::LOAD, *local_value, pid); } return make_ready_future(); } - virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); } - future<> on_alive(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } - future<> on_dead(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } - future<> on_restart(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override { _load_info.erase(endpoint); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 34b29e0cdd76..ec940e007b6c 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -1199,8 +1199,8 @@ future get_column_mapping(db::system_keyspace& sys_ks, table_id return db::schema_tables::get_column_mapping(sys_ks, table_id, v); } -future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) { - schedule_schema_pull(endpoint, ep_state); +future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) { + schedule_schema_pull(endpoint, *ep_state); return make_ready_future(); } @@ -1218,8 +1218,8 @@ future<> migration_manager::on_change(gms::inet_address endpoint, gms::applicati return make_ready_future(); } -future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) { - schedule_schema_pull(endpoint, state); +future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) { + schedule_schema_pull(endpoint, *state); return make_ready_future(); } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index ed57fb7d3f51..e72571cec44a 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -191,13 +191,13 @@ public: future get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source* as = nullptr); private: - virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override; + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override; virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) override; - virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override; - virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override; + virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); } virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); } - virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); } public: // For tests only. diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 65ccb8fea64b..4b3c9bcc51e8 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -81,12 +81,12 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang raft_address_map& _address_map; future<> - on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state ep_state) { - auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID); + on_endpoint_change(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state) { + auto app_state_ptr = ep_state->get_application_state_ptr(gms::application_state::HOST_ID); if (app_state_ptr) { raft::server_id id(utils::UUID(app_state_ptr->value())); rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change() {} {}", endpoint, id); - _address_map.add_or_update_entry(id, endpoint, ep_state.get_heart_beat_state().get_generation()); + _address_map.add_or_update_entry(id, endpoint, ep_state->get_heart_beat_state().get_generation()); } return make_ready_future<>(); } @@ -97,13 +97,13 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang {} virtual future<> - on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override { + on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override { return on_endpoint_change(endpoint, ep_state); } virtual future<> before_change(gms::inet_address endpoint, - gms::endpoint_state current_state, gms::application_state new_statekey, + gms::endpoint_state_ptr current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { // Raft server ID never changes - do nothing return make_ready_future<>(); @@ -117,9 +117,9 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang } virtual future<> - on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override { - co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, &ep_state] (auto& handler) -> future<> { - auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID); + on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override { + co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, ep_state] (auto& handler) -> future<> { + auto app_state_ptr = ep_state->get_application_state_ptr(gms::application_state::HOST_ID); if (!app_state_ptr) { co_return; } @@ -139,7 +139,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang } virtual future<> - on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { + on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override { return make_ready_future<>(); } @@ -152,7 +152,7 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang } virtual future<> - on_restart(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override { + on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override { return on_endpoint_change(endpoint, ep_state); } }; diff --git a/service/storage_service.cc b/service/storage_service.cc index b63e747c6cc2..719217e1f169 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3245,14 +3245,14 @@ future<> storage_service::handle_state_removed(inet_address endpoint, std::vecto } } -future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id pid) { +future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id pid) { slogger.debug("endpoint={} on_join: permit_id={}", endpoint, pid); - for (const auto& e : ep_state.get_application_state_map()) { + for (const auto& e : ep_state->get_application_state_map()) { co_await on_change(endpoint, e.first, e.second, pid); } } -future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) { +future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) { slogger.debug("endpoint={} on_alive: permit_id={}", endpoint, pid); bool is_normal_token_owner = get_token_metadata().is_normal_token_owner(endpoint); if (is_normal_token_owner) { @@ -3265,7 +3265,7 @@ future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_sta } } -future<> storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) { +future<> storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) { slogger.debug("endpoint={} before_change: new app_state={}, new versioned_value={}", endpoint, new_state_key, new_value); return make_ready_future(); } @@ -3341,12 +3341,12 @@ future<> storage_service::on_remove(gms::inet_address endpoint, gms::permit_id p co_await replicate_to_all_cores(std::move(tmptr)); } -future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) { +future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) { slogger.debug("endpoint={} on_dead: permit_id={}", endpoint, pid); return notify_down(endpoint); } -future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) { +future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) { slogger.debug("endpoint={} on_restart: permit_id={}", endpoint, pid); // If we have restarted before the node was even marked down, we need to reset the connection pool if (endpoint != get_broadcast_address() && _gossiper.is_alive(endpoint)) { diff --git a/service/storage_service.hh b/service/storage_service.hh index ff96745a4b3f..3781c130e032 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -410,8 +410,8 @@ public: locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; public: - virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override; - virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override; + virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override; + virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state_ptr current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override; /* * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update @@ -445,10 +445,10 @@ public: * you should never bootstrap a new node during a removenode, decommission or move. */ virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, gms::permit_id) override; - virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override; - virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override; + virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override; + virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override; virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override; - virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override; + virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override; public: // For migration_listener diff --git a/service/view_update_backlog_broker.hh b/service/view_update_backlog_broker.hh index c383beb5bd5a..d843b16fc3f1 100644 --- a/service/view_update_backlog_broker.hh +++ b/service/view_update_backlog_broker.hh @@ -43,11 +43,11 @@ public: virtual future<> on_remove(gms::inet_address, gms::permit_id) override; - virtual future<> on_join(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } - virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } - virtual future<> on_alive(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_dead(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_restart(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_join(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } + virtual future<> before_change(gms::inet_address, gms::endpoint_state_ptr, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); } + virtual future<> on_alive(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_dead(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_restart(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override { return make_ready_future(); } }; } diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index 80fc1c2292d1..a4ac0c48d375 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -341,7 +341,7 @@ future<> stream_manager::on_remove(inet_address endpoint, gms::permit_id) { return make_ready_future(); } -future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state, gms::permit_id) { +future<> stream_manager::on_restart(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) { if (has_peer(endpoint)) { sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint); //FIXME: discarded future. @@ -354,7 +354,7 @@ future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_sta return make_ready_future(); } -future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state, gms::permit_id) { +future<> stream_manager::on_dead(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) { if (has_peer(endpoint)) { sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint); //FIXME: discarded future. diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 455bdf229f30..8a25b48ddbd5 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -69,6 +69,7 @@ struct stream_bytes { class stream_manager : public gms::i_endpoint_state_change_subscriber, public enable_shared_from_this, public peering_sharded_service { using inet_address = gms::inet_address; using endpoint_state = gms::endpoint_state; + using endpoint_state_ptr = gms::endpoint_state_ptr; using application_state = gms::application_state; using versioned_value = gms::versioned_value; /* @@ -165,13 +166,13 @@ public: shared_ptr get_session(streaming::plan_id plan_id, gms::inet_address from, const char* verb, std::optional cf_id = {}); public: - virtual future<> on_join(inet_address endpoint, endpoint_state ep_state, gms::permit_id) override { return make_ready_future(); } - virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override { return make_ready_future(); } + virtual future<> on_join(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) override { return make_ready_future(); } + virtual future<> before_change(inet_address endpoint, endpoint_state_ptr current_state, application_state new_state_key, const versioned_value& new_value) override { return make_ready_future(); } virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_alive(inet_address endpoint, endpoint_state state, gms::permit_id) override { return make_ready_future(); } - virtual future<> on_dead(inet_address endpoint, endpoint_state state, gms::permit_id) override; + virtual future<> on_alive(inet_address endpoint, endpoint_state_ptr state, gms::permit_id) override { return make_ready_future(); } + virtual future<> on_dead(inet_address endpoint, endpoint_state_ptr state, gms::permit_id) override; virtual future<> on_remove(inet_address endpoint, gms::permit_id) override; - virtual future<> on_restart(inet_address endpoint, endpoint_state ep_state, gms::permit_id) override; + virtual future<> on_restart(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) override; private: void fail_all_sessions();