Skip to content

Commit

Permalink
Merge "Maintain dc/rack by topology" from Pavel Emelyanov
Browse files Browse the repository at this point in the history
"
There's an ongoing effort to move the endpoint -> {dc/rack} mappings
from snitch onto topology object and this set finalizes it. After it the
snitch service stops depending on gossiper and system keyspace and is
ready for de-globalization. As a nice side-effect the system keyspace no
longer needs to maintain the dc/rack info cache and its starting code gets
relaxed.

refs: scylladb#2737
refs: scylladb#2795
"

* 'br-snitch-dont-mess-with-topology-data-2' of https://github.com/xemul/scylla: (23 commits)
  system_keyspace: Dont maintain dc/rack cache
  system_keyspace: Indentation fix after previous patch
  system_keyspace: Coroutinuze build_dc_rack_info()
  topology: Move all post-configuration to topology::config
  snitch: Start early
  gossiper: Do not export system keyspace
  snitch: Remove gossiper reference
  snitch: Mark get_datacenter/_rack methods const
  snitch: Drop some dead dependency knots
  snitch, code: Make get_datacenter() report local dc only
  snitch, code: Make get_rack() report local rack only
  storage_service: Populate pending endpoint in on_alive()
  code: Populate pending locations
  topology: Put local dc/rack on topology early
  topology: Add pending locations collection
  topology: Make get_location() errors more verbose
  token_metadata: Add config, spread everywhere
  token_metadata: Hide token_metadata_impl copy constructor
  gosspier: Remove messaging service getter
  snitch: Get local address to gossip via config
  ...
  • Loading branch information
denesb committed Oct 19, 2022
2 parents 7fbad8d + 8b8b37c commit 2d581e9
Show file tree
Hide file tree
Showing 25 changed files with 244 additions and 263 deletions.
17 changes: 15 additions & 2 deletions api/endpoint_snitch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "locator/token_metadata.hh"
#include "locator/snitch_base.hh"
#include "locator/production_snitch_base.hh"
#include "endpoint_snitch.hh"
#include "api/api-doc/endpoint_snitch_info.json.hh"
#include "utils/fb_utilities.hh"
Expand All @@ -22,12 +23,24 @@ void set_endpoint_snitch(http_context& ctx, routes& r) {

httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
return topology.get_datacenter(host_or_broadcast(req));
auto ep = host_or_broadcast(req);
if (!topology.has_endpoint(ep, locator::topology::pending::yes)) {
// Cannot return error here, nodetool status can race, request
// info about just-left node and not handle it nicely
return sstring(locator::production_snitch_base::default_dc);
}
return topology.get_datacenter(ep);
});

httpd::endpoint_snitch_info_json::get_rack.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
return topology.get_rack(host_or_broadcast(req));
auto ep = host_or_broadcast(req);
if (!topology.has_endpoint(ep, locator::topology::pending::yes)) {
// Cannot return error here, nodetool status can race, request
// info about just-left node and not handle it nicely
return sstring(locator::production_snitch_base::default_rack);
}
return topology.get_rack(ep);
});

httpd::endpoint_snitch_info_json::get_snitch_name.set(r, [] (const_req req) {
Expand Down
72 changes: 21 additions & 51 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1267,8 +1267,8 @@ future<> system_keyspace::setup_version(sharded<netw::messaging_service>& ms) {
cql3::query_processor::CQL_VERSION,
::cassandra::thrift_version,
to_sstring(cql_serialization_format::latest_version),
snitch->get_datacenter(utils::fb_utilities::get_broadcast_address()),
snitch->get_rack(utils::fb_utilities::get_broadcast_address()),
snitch->get_datacenter(),
snitch->get_rack(),
sstring(cfg.partitioner()),
utils::fb_utilities::get_broadcast_rpc_address().addr(),
utils::fb_utilities::get_broadcast_address().addr(),
Expand All @@ -1283,38 +1283,31 @@ future<> system_keyspace::save_local_supported_features(const std::set<std::stri
::join(",", feats)).discard_result();
}

// Changing the real load_dc_rack_info into a future would trigger a tidal wave of futurization that would spread
// even into simple string operations like get_rack() / get_dc(). We will cache those at startup, and then change
// our view of it every time we do updates on those values.
//
// The cache must be distributed, because the values themselves may not update atomically, so a shard reading that
// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all
// updates are propagated correctly.
struct local_cache {
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> _cached_dc_rack_info;
locator::endpoint_dc_rack _local_dc_rack_info;
system_keyspace::bootstrap_state _state;
};

future<> system_keyspace::build_dc_rack_info() {
return execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS)).then([this] (::shared_ptr<cql3::untyped_result_set> msg) {
return do_for_each(*msg, [this] (auto& row) {
net::inet_address peer = row.template get_as<net::inet_address>("peer");
if (!row.has("data_center") || !row.has("rack")) {
return make_ready_future<>();
}
gms::inet_address gms_addr(std::move(peer));
sstring dc = row.template get_as<sstring>("data_center");
sstring rack = row.template get_as<sstring>("rack");
future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> system_keyspace::load_dc_rack_info() {
auto msg = co_await execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS));

locator::endpoint_dc_rack element = { dc, rack };
return container().invoke_on_all([gms_addr = std::move(gms_addr), element = std::move(element)] (auto& sys_ks) {
sys_ks._cache->_cached_dc_rack_info.emplace(gms_addr, element);
});
}).then([msg] {
// Keep msg alive.
});
});
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> ret;
for (const auto& row : *msg) {
net::inet_address peer = row.template get_as<net::inet_address>("peer");
if (!row.has("data_center") || !row.has("rack")) {
continue;
}
gms::inet_address gms_addr(std::move(peer));
sstring dc = row.template get_as<sstring>("data_center");
sstring rack = row.template get_as<sstring>("rack");

ret.emplace(gms_addr, locator::endpoint_dc_rack{ dc, rack });
}

co_return ret;
}

future<> system_keyspace::build_bootstrap_info() {
Expand Down Expand Up @@ -1342,7 +1335,6 @@ future<> system_keyspace::setup(sharded<netw::messaging_service>& ms) {

co_await setup_version(ms);
co_await update_schema_version(_db.local().get_version());
co_await build_dc_rack_info();
co_await build_bootstrap_info();
co_await check_health();
co_await db::schema_tables::save_system_keyspace_schema(_qp.local());
Expand Down Expand Up @@ -1596,20 +1588,6 @@ future<> system_keyspace::update_cached_values(gms::inet_address ep, sstring col
return make_ready_future<>();
}

template <>
future<> system_keyspace::update_cached_values(gms::inet_address ep, sstring column_name, sstring value) {
return container().invoke_on_all([ep = std::move(ep),
column_name = std::move(column_name),
value = std::move(value)] (auto& sys_ks) {
if (column_name == "data_center") {
sys_ks._cache->_cached_dc_rack_info[ep].dc = value;
} else if (column_name == "rack") {
sys_ks._cache->_cached_dc_rack_info[ep].rack = value;
}
return make_ready_future<>();
});
}

template <typename Value>
future<> system_keyspace::update_peer_info(gms::inet_address ep, sstring column_name, Value value) {
if (ep == utils::fb_utilities::get_broadcast_address()) {
Expand Down Expand Up @@ -1668,9 +1646,6 @@ future<> system_keyspace::update_schema_version(table_schema_version version) {
* Remove stored tokens being used by another node
*/
future<> system_keyspace::remove_endpoint(gms::inet_address ep) {
co_await container().invoke_on_all([ep] (auto& sys_ks) {
sys_ks._cache->_cached_dc_rack_info.erase(ep);
});
sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS);
co_await execute_cql(req, ep.addr()).discard_result();
co_await force_blocking_flush(PEERS);
Expand Down Expand Up @@ -1896,7 +1871,7 @@ class cluster_status_table : public memtable_filling_virtual_table {
set_cell(cr, "host_id", hostid->uuid());
}

if (tm.get_topology().has_endpoint(endpoint)) {
if (tm.is_member(endpoint)) {
sstring dc = tm.get_topology().get_location(endpoint).dc;
set_cell(cr, "dc", dc);
}
Expand Down Expand Up @@ -2804,11 +2779,6 @@ future<locator::host_id> system_keyspace::set_local_host_id(locator::host_id hos
co_return host_id;
}

std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>
system_keyspace::load_dc_rack_info() {
return _cache->_cached_dc_rack_info;
}

locator::endpoint_dc_rack system_keyspace::local_dc_rack() const {
return _cache->_local_dc_rack_info;
}
Expand Down Expand Up @@ -3370,8 +3340,8 @@ future<> system_keyspace::start() {
// the system.local table. However, cql_test_env needs cached local_dc_rack strings,
// but it doesn't call system_keyspace::setup() and thus ::setup_version() either
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
_cache->_local_dc_rack_info.dc = snitch->get_datacenter(utils::fb_utilities::get_broadcast_address());
_cache->_local_dc_rack_info.rack = snitch->get_rack(utils::fb_utilities::get_broadcast_address());
_cache->_local_dc_rack_info.dc = snitch->get_datacenter();
_cache->_local_dc_rack_info.rack = snitch->get_rack();

co_return;
}
Expand Down
3 changes: 1 addition & 2 deletions db/system_keyspace.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
future<> setup_version(sharded<netw::messaging_service>& ms);
future<> check_health();
static future<> force_blocking_flush(sstring cfname);
future<> build_dc_rack_info();
future<> build_bootstrap_info();
future<> cache_truncation_record();
template <typename Value>
Expand Down Expand Up @@ -284,7 +283,7 @@ public:
/**
* Return a map of IP addresses containing a map of dc and rack info
*/
std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> load_dc_rack_info();
future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> load_dc_rack_info();
locator::endpoint_dc_rack local_dc_rack() const;

enum class bootstrap_state {
Expand Down
2 changes: 0 additions & 2 deletions gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ public:
}
const std::set<inet_address>& get_seeds() const noexcept;

netw::messaging_service& get_local_messaging() const noexcept { return _messaging; }
sharded<db::system_keyspace>& get_system_keyspace() const noexcept { return _sys_ks; }
public:
static clk::time_point inline now() noexcept { return clk::now(); }
public:
Expand Down
6 changes: 4 additions & 2 deletions locator/gossiping_property_file_snitch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ future<bool> gossiping_property_file_snitch::property_file_was_modified() {
}

gossiping_property_file_snitch::gossiping_property_file_snitch(const snitch_config& cfg)
: production_snitch_base(cfg), _file_reader_cpu_id(cfg.io_cpu_id) {
: production_snitch_base(cfg)
, _file_reader_cpu_id(cfg.io_cpu_id)
, _listen_address(cfg.listen_address) {
if (this_shard_id() == _file_reader_cpu_id) {
io_cpu_id() = _file_reader_cpu_id;
}
Expand Down Expand Up @@ -102,7 +104,7 @@ void gossiping_property_file_snitch::periodic_reader_callback() {
}

std::list<std::pair<gms::application_state, gms::versioned_value>> gossiping_property_file_snitch::get_app_states() const {
sstring ip = format("{}", local().get_local_gossiper().get_local_messaging().listen_address());
sstring ip = format("{}", _listen_address);
return {
{gms::application_state::DC, gms::versioned_value::datacenter(_my_dc)},
{gms::application_state::RACK, gms::versioned_value::rack(_my_rack)},
Expand Down
1 change: 1 addition & 0 deletions locator/gossiping_property_file_snitch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private:
unsigned _file_reader_cpu_id;
snitch_signal_t _reconfigured;
promise<> _io_is_stopped;
gms::inet_address _listen_address;

void reset_io_state() {
// Reset the promise to allow repeating
Expand Down
53 changes: 4 additions & 49 deletions locator/production_snitch_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,63 +33,18 @@ production_snitch_base::production_snitch_base(snitch_config cfg)
}


sstring production_snitch_base::get_rack(inet_address endpoint) {
if (endpoint == utils::fb_utilities::get_broadcast_address()) {
return _my_rack;
}

return get_endpoint_info(endpoint,
gms::application_state::RACK,
default_rack);
sstring production_snitch_base::get_rack() const {
return _my_rack;
}

sstring production_snitch_base::get_datacenter(inet_address endpoint) {
if (endpoint == utils::fb_utilities::get_broadcast_address()) {
return _my_dc;
}

return get_endpoint_info(endpoint,
gms::application_state::DC,
default_dc);
sstring production_snitch_base::get_datacenter() const {
return _my_dc;
}

void production_snitch_base::set_backreference(snitch_ptr& d) {
_backreference = &d;
}

std::optional<sstring> production_snitch_base::get_endpoint_info(inet_address endpoint, gms::application_state key) {
gms::gossiper& local_gossiper = local().get_local_gossiper();
auto* ep_state = local_gossiper.get_application_state_ptr(endpoint, key);
return ep_state ? std::optional(ep_state->value) : std::nullopt;
}

sstring production_snitch_base::get_endpoint_info(inet_address endpoint, gms::application_state key,
const sstring& default_val) {
auto val = get_endpoint_info(endpoint, key);
auto& gossiper = local().get_local_gossiper();

if (val) {
return *val;
}
// ...if not found - look in the SystemTable...
if (!_saved_endpoints) {
_saved_endpoints = gossiper.get_system_keyspace().local().load_dc_rack_info();
}

auto it = _saved_endpoints->find(endpoint);

if (it != _saved_endpoints->end()) {
if (key == gms::application_state::RACK) {
return it->second.rack;
} else { // gms::application_state::DC
return it->second.dc;
}
}

// ...if still not found - return a default value
return default_val;
}

void production_snitch_base::set_my_dc_and_rack(const sstring& new_dc, const sstring& new_rack) {
if (!new_dc.empty()) {
_my_dc = new_dc;
Expand Down
8 changes: 2 additions & 6 deletions locator/production_snitch_base.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ public:

explicit production_snitch_base(snitch_config);

virtual sstring get_rack(inet_address endpoint) override;
virtual sstring get_datacenter(inet_address endpoint) override;
virtual sstring get_rack() const override;
virtual sstring get_datacenter() const override;
virtual void set_backreference(snitch_ptr& d) override;

private:
std::optional<sstring> get_endpoint_info(inet_address endpoint, gms::application_state key);
sstring get_endpoint_info(inet_address endpoint, gms::application_state key,
const sstring& default_val);
virtual void set_my_dc_and_rack(const sstring& new_dc, const sstring& new_rack) override;
virtual void set_prefer_local(bool prefer_local) override;
void parse_property_file();
Expand All @@ -76,7 +73,6 @@ protected:
void throw_incomplete_file() const;

protected:
std::optional<addr2dc_rack_map> _saved_endpoints;
std::string _prop_file_contents;
sstring _prop_file_name;
std::unordered_map<sstring, sstring> _prop_values;
Expand Down
10 changes: 6 additions & 4 deletions locator/rack_inferring_snitch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@ using inet_address = gms::inet_address;
*/
struct rack_inferring_snitch : public snitch_base {
rack_inferring_snitch(const snitch_config& cfg) {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
_my_dc = get_datacenter();
_my_rack = get_rack();

// This snitch is ready on creation
set_snitch_ready();
}

virtual sstring get_rack(inet_address endpoint) override {
virtual sstring get_rack() const override {
auto endpoint = utils::fb_utilities::get_broadcast_address();
return std::to_string(uint8_t(endpoint.bytes()[2]));
}

virtual sstring get_datacenter(inet_address endpoint) override {
virtual sstring get_datacenter() const override {
auto endpoint = utils::fb_utilities::get_broadcast_address();
return std::to_string(uint8_t(endpoint.bytes()[1]));
}

Expand Down
8 changes: 4 additions & 4 deletions locator/simple_snitch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ namespace locator {
*/
struct simple_snitch : public snitch_base {
simple_snitch(const snitch_config& cfg) {
_my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address());
_my_rack = get_rack(utils::fb_utilities::get_broadcast_address());
_my_dc = get_datacenter();
_my_rack = get_rack();

// This snitch is ready on creation
set_snitch_ready();
}

virtual sstring get_rack(inet_address endpoint) override {
virtual sstring get_rack() const override {
return "rack1";
}

virtual sstring get_datacenter(inet_address endpoint) override {
virtual sstring get_datacenter() const override {
return "datacenter1";
}

Expand Down
5 changes: 2 additions & 3 deletions locator/snitch_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
*/

#include "locator/snitch_base.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"

namespace locator {
Expand All @@ -21,8 +20,8 @@ std::list<std::pair<gms::application_state, gms::versioned_value>> snitch_base::
};
}

snitch_ptr::snitch_ptr(const snitch_config cfg, sharded<gms::gossiper>& g)
: _gossiper(g) {
snitch_ptr::snitch_ptr(const snitch_config cfg)
{
i_endpoint_snitch::ptr_type s;
try {
s = create_object<i_endpoint_snitch>(cfg.name, cfg);
Expand Down

0 comments on commit 2d581e9

Please sign in to comment.