76 changes: 65 additions & 11 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include <seastar/core/metrics.hh>
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
Expand Down Expand Up @@ -411,6 +412,10 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
on_fatal_internal_error(slogger, format("Unexpected state {} for node {}", rs.state, id));
}
}

if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
tmptr->set_tablets(co_await replica::read_tablet_metadata(*_qp));
}
}));

if (auto gen_id = _topology_state_machine._topology.current_cdc_generation_id) {
Expand Down Expand Up @@ -1836,13 +1841,13 @@ storage_service::get_range_to_address_map(const sstring& keyspace) const {
}

future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const {
storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const {
return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens());
}

// Caller is responsible to hold token_metadata valid until the returned future is resolved
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm,
storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm,
const std::vector<token>& sorted_tokens) const {
co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens));
}
Expand Down Expand Up @@ -2539,8 +2544,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt

std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
std::vector<std::unordered_map<sstring, locator::effective_replication_map_ptr>> pending_effective_replication_maps;
std::vector<std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr>> pending_effective_replication_maps;
pending_effective_replication_maps.resize(smp::count);
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms;
pending_table_erms.resize(smp::count);

try {
auto base_shard = this_shard_id();
Expand All @@ -2560,17 +2567,37 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
auto keyspaces = db.get_all_keyspaces();
for (auto& ks_name : keyspaces) {
auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr();
if (rs->is_per_table()) {
continue;
}
auto erm = co_await get_erm_factory().create_effective_replication_map(rs, tmptr);
pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm));
}
co_await container().invoke_on_others([&] (storage_service& ss) -> future<> {
auto& db = ss._db.local();
for (auto& ks_name : keyspaces) {
auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr();
if (rs->is_per_table()) {
continue;
}
auto tmptr = pending_token_metadata_ptr[this_shard_id()];
auto erm = co_await ss.get_erm_factory().create_effective_replication_map(rs, std::move(tmptr));
pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm));

}
});
// Prepare per-table erms.
co_await container().invoke_on_all([&] (storage_service& ss) {
auto& db = ss._db.local();
auto tmptr = pending_token_metadata_ptr[this_shard_id()];
for (auto&& [id, cf] : db.get_column_families()) { // Safe because we iterate without preemption
auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr();
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs->maybe_as_per_table()) {
erm = pt_rs->make_replication_map(id, tmptr);
} else {
erm = pending_effective_replication_maps[this_shard_id()][cf->schema()->keypace_name()];
}
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
}
});
} catch (...) {
Expand All @@ -2583,6 +2610,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
co_await smp::invoke_on_all([&] () -> future<> {
auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(pending_table_erms[this_shard_id()]);

co_await utils::clear_gently(erms);
co_await utils::clear_gently(tmptr);
Expand All @@ -2598,14 +2626,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
try {
co_await container().invoke_on_all([&] (storage_service& ss) {
ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
auto& db = ss._db.local();

auto& erms = pending_effective_replication_maps[this_shard_id()];
for (auto it = erms.begin(); it != erms.end(); ) {
auto& db = ss._db.local();
auto& ks = db.find_keyspace(it->first);
ks.update_effective_replication_map(std::move(it->second));
it = erms.erase(it);
}

auto& table_erms = pending_table_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
it = table_erms.erase(it);
}
});
} catch (...) {
// applying the changes on all shards should never fail
Expand Down Expand Up @@ -2780,7 +2815,7 @@ future<std::map<gms::inet_address, float>> storage_service::get_ownership() {

future<std::map<gms::inet_address, float>> storage_service::effective_ownership(sstring keyspace_name) {
return run_with_no_api_lock([keyspace_name] (storage_service& ss) mutable -> future<std::map<gms::inet_address, float>> {
locator::effective_replication_map_ptr erm;
locator::vnode_effective_replication_map_ptr erm;
if (keyspace_name != "") {
//find throws no such keyspace if it is missing
const replica::keyspace& ks = ss._db.local().find_keyspace(keyspace_name);
Expand Down Expand Up @@ -3259,7 +3294,7 @@ future<> storage_service::decommission() {

ss.update_pending_ranges(format("decommission {}", endpoint)).get();

auto non_system_keyspaces = db.get_non_local_strategy_keyspaces();
auto non_system_keyspaces = db.get_non_local_vnode_based_strategy_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) {
throw std::runtime_error("data is currently moving to this node; unable to leave the ring");
Expand Down Expand Up @@ -4112,7 +4147,8 @@ int32_t storage_service::get_exception_count() {
return 0;
}

future<std::unordered_multimap<dht::token_range, inet_address>> storage_service::get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint) {
future<std::unordered_multimap<dht::token_range, inet_address>>
storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = get_ranges_for_endpoint(erm, endpoint);

Expand Down Expand Up @@ -4462,7 +4498,7 @@ future<> storage_service::shutdown_protocol_servers() {
}

future<std::unordered_multimap<inet_address, dht::token_range>>
storage_service::get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const {
storage_service::get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const {
auto my_address = get_broadcast_address();
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_addresses = co_await erm->get_range_addresses();
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
Expand Down Expand Up @@ -4510,7 +4546,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_

future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::construct_range_to_endpoint_map(
locator::effective_replication_map_ptr erm,
locator::vnode_effective_replication_map_ptr erm,
const dht::token_range_vector& ranges) const {
std::unordered_map<dht::token_range, inet_address_vector_replica_set> res;
res.reserve(ranges.size());
Expand Down Expand Up @@ -4595,6 +4631,24 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
});
}

void storage_service::on_update_tablet_metadata() {
if (this_shard_id() != 0) {
// replicate_to_all_cores() takes care of other shards.
return;
}
// FIXME: Avoid reading whole tablet metadata on partial changes.
load_tablet_metadata().get();
}

future<> storage_service::load_tablet_metadata() {
if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
return make_ready_future<>();
}
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> {
tmptr->set_tablets(co_await replica::read_tablet_metadata(*_qp));
}, acquire_merge_lock::no);
}

future<> storage_service::snitch_reconfigured() {
assert(this_shard_id() == 0);
auto& snitch = _snitch.local();
Expand Down Expand Up @@ -5016,7 +5070,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
};

dht::token_range_vector
storage_service::get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const {
storage_service::get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const {
return erm->get_ranges(ep);
}

Expand Down
19 changes: 13 additions & 6 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private:
gms::gossiper& _gossiper;
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
cql3::query_processor* _qp = nullptr;
sharded<repair_service>& _repair;
sharded<streaming::stream_manager>& _stream_manager;
sharded<locator::snitch_ptr>& _snitch;
Expand Down Expand Up @@ -164,6 +165,7 @@ public:
void init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks);
future<> uninit_messaging_service();

future<> load_tablet_metadata();
private:
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;

Expand Down Expand Up @@ -267,6 +269,10 @@ public:
_protocol_servers.push_back(&server);
}

void set_query_processor(cql3::query_processor& qp) {
_qp = &qp;
}

// All pointers are valid.
const std::vector<protocol_server*>& protocol_servers() const {
return _protocol_servers;
Expand Down Expand Up @@ -368,9 +374,9 @@ public:
sstring get_rpc_address(const inet_address& endpoint) const;

future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(const sstring& keyspace) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const;

future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm,
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm,
const std::vector<token>& sorted_tokens) const;

/**
Expand Down Expand Up @@ -404,7 +410,7 @@ public:
* @return mapping of ranges to the replicas responsible for them.
*/
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> construct_range_to_endpoint_map(
locator::effective_replication_map_ptr erm,
locator::vnode_effective_replication_map_ptr erm,
const dht::token_range_vector& ranges) const;
public:
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
Expand Down Expand Up @@ -462,6 +468,7 @@ public:
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
virtual void on_update_tablet_metadata() override;

virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
Expand Down Expand Up @@ -553,7 +560,7 @@ private:
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;

/**
* Sends a notification to a node indicating we have finished replicating data.
Expand All @@ -577,7 +584,7 @@ private:
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);

// needs to be modified to accept either a keyspace or ARS.
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint);
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint);

future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip);
public:
Expand All @@ -595,7 +602,7 @@ public:
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
dht::token_range_vector get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const;
dht::token_range_vector get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const;

/**
* Get all ranges that span the ring given a set
Expand Down
90 changes: 90 additions & 0 deletions service/tablet_allocator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include "locator/tablets.hh"
#include "replica/tablets.hh"
#include "locator/tablet_replication_strategy.hh"
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"

using namespace locator;
using namespace replica;

namespace service {

class tablet_allocator_impl : public tablet_allocator::impl
, public service::migration_listener::empty_listener {
service::migration_notifier& _migration_notifier;
replica::database& _db;
bool _stopped = false;
public:
tablet_allocator_impl(service::migration_notifier& mn, replica::database& db)
: _migration_notifier(mn)
, _db(db) {
_migration_notifier.register_listener(this);
}

tablet_allocator_impl(tablet_allocator_impl&&) = delete; // "this" captured.

~tablet_allocator_impl() {
assert(_stopped);
}

future<> stop() {
co_await _migration_notifier.unregister_listener(this);
_stopped = true;
}

void on_before_create_column_family(const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
keyspace& ks = _db.find_keyspace(s.ks_name());
auto&& rs = ks.get_replication_strategy();
if (auto&& tablet_rs = rs.maybe_as_tablet_aware()) {
auto tm = _db.get_shared_token_metadata().get();
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm).get0();
muts.emplace_back(tablet_map_to_mutation(map, s.id(), s.keypace_name(), s.cf_name(), ts).get0());
}
}

void on_before_drop_column_family(const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
keyspace& ks = _db.find_keyspace(s.ks_name());
auto&& rs = ks.get_replication_strategy();
std::vector<mutation> result;
if (rs.uses_tablets()) {
auto tm = _db.get_shared_token_metadata().get();
muts.emplace_back(make_drop_tablet_map_mutation(s.keypace_name(), s.id(), ts));
}
}

void on_before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>& muts, api::timestamp_type ts) override {
keyspace& ks = _db.find_keyspace(keyspace_name);
auto&& rs = ks.get_replication_strategy();
if (rs.uses_tablets()) {
auto tm = _db.get_shared_token_metadata().get();
for (auto&& [name, s] : ks.metadata()->cf_meta_data()) {
muts.emplace_back(make_drop_tablet_map_mutation(keyspace_name, s->id(), ts));
}
}
}

// FIXME: Handle materialized views.
};

tablet_allocator::tablet_allocator(service::migration_notifier& mn, replica::database& db)
: _impl(std::make_unique<tablet_allocator_impl>(mn, db)) {
}

future<> tablet_allocator::stop() {
return impl().stop();
}

tablet_allocator_impl& tablet_allocator::impl() {
return static_cast<tablet_allocator_impl&>(*_impl);
}

}
34 changes: 34 additions & 0 deletions service/tablet_allocator.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include "replica/database.hh"
#include "service/migration_manager.hh"
#include <any>

namespace service {

class tablet_allocator_impl;

class tablet_allocator {
public:
class impl {
public:
virtual ~impl() = default;
};
private:
std::unique_ptr<impl> _impl;
tablet_allocator_impl& impl();
public:
tablet_allocator(service::migration_notifier& mn, replica::database& db);
public:
future<> stop();
};

}
2 changes: 1 addition & 1 deletion sstables_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
auto s = table.schema();
const auto cf_id = s->id();
const auto reason = streaming::stream_reason::repair;
auto erm = _db.local().find_keyspace(ks_name).get_effective_replication_map();
auto erm = _db.local().find_column_family(s).get_effective_replication_map();

// By sorting SSTables by their primary key, we allow SSTable runs to be
// incrementally streamed.
Expand Down
2 changes: 1 addition & 1 deletion test/boost/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ with_column_family(schema_ptr s, replica::column_family::config cfg, sstables::s
auto cm = make_lw_shared<compaction_manager>(tm, compaction_manager::for_testing_tag{});
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto s_opts = make_lw_shared<replica::storage_options>();
auto cf = make_lw_shared<replica::column_family>(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker);
auto cf = make_lw_shared<replica::column_family>(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker, nullptr);
cf->mark_ready_for_writes();
co_await func(*cf);
co_await cf->stop();
Expand Down
163 changes: 151 additions & 12 deletions test/boost/network_topology_strategy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <seastar/core/sstring.hh>
#include "log.hh"
#include "gms/gossiper.hh"
#include "schema/schema_builder.hh"
#include <vector>
#include <string>
#include <map>
Expand Down Expand Up @@ -64,19 +65,19 @@ static void verify_sorted(const dht::token_range_vector& trv) {
BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end());
}

static void check_ranges_are_sorted(effective_replication_map_ptr erm, gms::inet_address ep) {
static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms::inet_address ep) {
verify_sorted(erm->get_ranges(ep));
verify_sorted(erm->get_primary_ranges(ep));
verify_sorted(erm->get_primary_ranges_within_dc(ep));
}

void strategy_sanity_check(
abstract_replication_strategy::ptr_type ars_ptr,
replication_strategy_ptr ars_ptr,
const token_metadata& tm,
const std::map<sstring, sstring>& options) {

network_topology_strategy* nts_ptr =
dynamic_cast<network_topology_strategy*>(ars_ptr.get());
const network_topology_strategy* nts_ptr =
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());

//
// Check that both total and per-DC RFs in options match the corresponding
Expand All @@ -94,13 +95,24 @@ void strategy_sanity_check(
}

void endpoints_check(
abstract_replication_strategy::ptr_type ars_ptr,
replication_strategy_ptr ars_ptr,
const token_metadata& tm,
inet_address_vector_replica_set& endpoints,
const inet_address_vector_replica_set& endpoints,
const locator::topology& topo) {

auto&& nodes_per_dc = tm.get_topology().get_datacenter_endpoints();
const network_topology_strategy* nts_ptr =
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());

size_t total_rf = 0;
for (auto&& [dc, nodes] : nodes_per_dc) {
auto effective_rf = std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes.size());
total_rf += effective_rf;
}

// Check the total RF
BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor(tm));
BOOST_CHECK(endpoints.size() == total_rf);
BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(tm));

// Check the uniqueness
std::unordered_set<inet_address> ep_set(endpoints.begin(), endpoints.end());
Expand All @@ -119,10 +131,9 @@ void endpoints_check(
}
}

network_topology_strategy* nts_ptr =
dynamic_cast<network_topology_strategy*>(ars_ptr.get());
for (auto& rf : dc_rf) {
BOOST_CHECK(rf.second == nts_ptr->get_replication_factor(rf.first));
for (auto&& [dc, rf] : dc_rf) {
auto effective_rf = std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes_per_dc.at(dc).size());
BOOST_CHECK(rf == effective_rf);
}
}

Expand All @@ -145,7 +156,7 @@ auto d2t = [](double d) -> int64_t {
*/
void full_ring_check(const std::vector<ring_point>& ring_points,
const std::map<sstring, sstring>& options,
abstract_replication_strategy::ptr_type ars_ptr,
replication_strategy_ptr ars_ptr,
locator::token_metadata_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();
Expand Down Expand Up @@ -177,6 +188,33 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
}
}

void full_ring_check(const tablet_map& tmap,
const std::map<sstring, sstring>& options,
replication_strategy_ptr rs_ptr,
locator::token_metadata_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();

auto get_endpoint_for_host_id = [&] (host_id host) {
auto endpoint_opt = tm.get_endpoint_for_host_id(host);
assert(endpoint_opt);
return *endpoint_opt;
};

auto to_endpoint_set = [&] (const tablet_replica_set& replicas) {
inet_address_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(get_endpoint_for_host_id(replica.host));
}
return result;
};

for (tablet_id tb : tmap.tablet_ids()) {
endpoints_check(rs_ptr, tm, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo);
}
}

locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
// This resembles rack_inferring_snitch dc/rack generation which is
// still in use by this test via token_metadata internals
Expand Down Expand Up @@ -348,6 +386,107 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_heavy) {
return heavy_origin_test();
}

SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));

// Create the RackInferringSnitch
snitch_config cfg;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();

locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = host_id::create_random_id();
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);

std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
{ 3.0, inet_address("192.102.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 5.0, inet_address("192.101.20.1") },
{ 6.0, inet_address("192.102.20.1") },
{ 7.0, inet_address("192.100.30.1") },
{ 8.0, inet_address("192.101.30.1") },
{ 9.0, inet_address("192.102.30.1") },
{ 10.0, inet_address("192.102.40.1") },
{ 11.0, inet_address("192.102.40.2") }
};

// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
tm.update_host_id(id, endpoint);
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
}
}).get();

/////////////////////////////////////
// Create the replication strategy
std::map<sstring, sstring> options323 = {
{"100", "3"},
{"101", "2"},
{"102", "3"},
{"initial_tablets", "100"}
};

auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options323);

auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);

auto s = schema_builder("ks", "tb")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();

auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options323, ars_ptr, stm.get());

///////////////
// Create the replication strategy
std::map<sstring, sstring> options320 = {
{"100", "3"},
{"101", "2"},
{"102", "0"},
{"initial_tablets", "100"}
};

ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options320);
tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);

tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options320, ars_ptr, stm.get());

// Test the case of not enough nodes to meet RF in DC 102
std::map<sstring, sstring> options324 = {
{"100", "3"},
{"101", "4"},
{"102", "2"},
{"initial_tablets", "100"}
};

ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options324);
tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);

tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options324, ars_ptr, stm.get());
}

/**
* static impl of "old" network topology strategy endpoint calculation.
*/
Expand Down
20 changes: 20 additions & 0 deletions test/boost/partitioner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,23 @@ SEASTAR_THREAD_TEST_CASE(test_dht_subtract_ranges) {
BOOST_REQUIRE(contained);
}
}

SEASTAR_THREAD_TEST_CASE(test_split_token_range_msb) {
dht::token_comparator cmp;
for (auto msb : {0, 1, 2, 3, 8}) {
auto ranges = dht::split_token_range_msb(msb);
BOOST_REQUIRE_EQUAL(ranges.size(), 1 << msb);

std::optional<dht::token> prev_last_token;
for (int i = 0; i < ranges.size(); i++) {
auto t = dht::last_token_of_compaction_group(msb, i);
testlog.debug("msb: {}, t: {}, range: {}", msb, t, ranges[i]);
BOOST_REQUIRE(ranges[i].contains(t, cmp));
if (prev_last_token) {
BOOST_REQUIRE(ranges[i].contains(dht::next_token(*prev_last_token), cmp));
BOOST_REQUIRE(!ranges[i].contains(*prev_last_token, cmp));
}
prev_last_token = t;
}
}
}
4 changes: 3 additions & 1 deletion test/boost/schema_change_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
{
auto group0_guard = mm.start_group0_operation().get();
const auto ts = group0_guard.write_timestamp();
auto muts = e.migration_manager().local().prepare_keyspace_drop_announcement("ks", ts);
auto muts = e.migration_manager().local().prepare_keyspace_drop_announcement("ks", ts).get0();
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(muts, std::move(group0_guard)).get();
}
Expand Down Expand Up @@ -523,6 +523,7 @@ class counting_migration_listener : public service::migration_listener {
int update_function_count = 0;
int update_aggregate_count = 0;
int update_view_count = 0;
int update_tablets = 0;
int drop_keyspace_count = 0;
int drop_column_family_count = 0;
int drop_user_type_count = 0;
Expand All @@ -545,6 +546,7 @@ class counting_migration_listener : public service::migration_listener {
virtual void on_update_function(const sstring&, const sstring&) override { ++update_function_count; }
virtual void on_update_aggregate(const sstring&, const sstring&) override { ++update_aggregate_count; }
virtual void on_update_view(const sstring&, const sstring&, bool) override { ++update_view_count; }
virtual void on_update_tablet_metadata() override { ++update_tablets; }
virtual void on_drop_keyspace(const sstring&) override { ++drop_keyspace_count; }
virtual void on_drop_column_family(const sstring&, const sstring&) override { ++drop_column_family_count; }
virtual void on_drop_user_type(const sstring&, const sstring&) override { ++drop_user_type_count; }
Expand Down
2 changes: 1 addition & 1 deletion test/boost/sstable_3_x_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3025,7 +3025,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt
auto cm = make_lw_shared<compaction_manager_for_testing>(false);
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<replica::column_family>(s, env.make_table_config(), make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker);
auto cf = make_lw_shared<replica::column_family>(s, env.make_table_config(), make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker, nullptr);
cf->mark_ready_for_writes();
lw_shared_ptr<replica::memtable> mt = make_lw_shared<replica::memtable>(s);

Expand Down
2 changes: 1 addition & 1 deletion test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4205,7 +4205,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;

auto cf = make_lw_shared<replica::column_family>(s, cfg, make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker);
auto cf = make_lw_shared<replica::column_family>(s, cfg, make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker, nullptr);
cf->start();
cf->mark_ready_for_writes();
tables.push_back(cf);
Expand Down
22 changes: 1 addition & 21 deletions test/boost/storage_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {

auto check = [&s](locator::token_metadata_ptr tmptr, dht::partition_range input,
dht::partition_range_vector expected) {
query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input});
query_ranges_to_vnodes_generator ranges_to_vnodes(locator::make_splitter(tmptr), s, {input});
auto actual = ranges_to_vnodes(1000);
if (!std::equal(actual.begin(), actual.end(), expected.begin(), [&s](auto&& r1, auto&& r2) {
return r1.equal(r2, dht::ring_position_comparator(*s));
Expand All @@ -52,26 +52,6 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
}
};

{
const auto endpoint = gms::inet_address("10.0.0.1");
const auto endpoint_token = ring[2].token();
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{});
tmptr->update_topology(endpoint, {"dc1", "rack1"});
tmptr->update_normal_tokens({endpoint_token}, endpoint).get();

const auto next_token = dht::token::from_int64(dht::token::to_int64(endpoint_token) + 1);
const auto endpoint_token_ending_bound = dht::partition_range::bound {
dht::ring_position::ending_at(endpoint_token), true
};
const auto input = dht::partition_range::make(
endpoint_token_ending_bound,
{dht::ring_position::starting_at(next_token), false});
const auto expected_output = dht::partition_range::make(
endpoint_token_ending_bound,
endpoint_token_ending_bound);
check(tmptr, input, { expected_output });
}

{
// Ring with minimum token
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{});
Expand Down
270 changes: 270 additions & 0 deletions test/boost/tablets_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Copyright (C) 2023-present-2020 ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/



#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
#include "db/config.hh"
#include "schema/schema_builder.hh"

#include "replica/tablets.hh"
#include "locator/tablets.hh"
#include "locator/tablet_replication_strategy.hh"

using namespace locator;
using namespace replica;

static api::timestamp_type next_timestamp = api::new_timestamp();

static
void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm) {
save_tablet_metadata(env.local_db(), tm, next_timestamp++).get();
auto tm2 = read_tablet_metadata(env.local_qp()).get0();
BOOST_REQUIRE_EQUAL(tm, tm2);
}

static
cql_test_config tablet_cql_test_config() {
cql_test_config c;
c.db_config->experimental_features({
db::experimental_features_t::feature::TABLETS,
}, db::config::config_source::CommandLine);
c.db_config->consistent_cluster_management(true);
return c;
}

static
future<table_id> add_table(cql_test_env& e) {
auto id = table_id(utils::UUID_gen::get_time_UUID());
co_await e.create_table([id] (std::string_view ks_name) {
return *schema_builder(ks_name, id.to_sstring(), id)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("r1", int32_type)
.build();
});
co_return id;
}

SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());

auto table1 = add_table(e).get0();
auto table2 = add_table(e).get0();

{
tablet_metadata tm;

// Empty
verify_tablet_metadata_persistence(e, tm);

// Add table1
{
tablet_map tmap(1);
tmap.set_tablet(tmap.first_tablet(), tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
tablet_replica {h3, 1},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);

// Add table2
{
tablet_map tmap(4);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 3},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 2},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 1},
}
});
tm.set_tablet_map(table2, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);

// Increase RF of table2
{
auto&& tmap = tm.get_tablet_map(table2);
auto tb = tmap.first_tablet();
tb = *tmap.next_tablet(tb);

tmap.set_tablet_transition_info(tb, tablet_transition_info{
tablet_replica_set {
tablet_replica {h3, 3},
tablet_replica {h1, 7},
},
tablet_replica {h1, 7}
});

tb = *tmap.next_tablet(tb);
tmap.set_tablet_transition_info(tb, tablet_transition_info{
tablet_replica_set {
tablet_replica {h1, 4},
tablet_replica {h2, 2},
},
tablet_replica {h1, 4}
});
}

verify_tablet_metadata_persistence(e, tm);

// Reduce tablet count in table2
{
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 3},
}
});
tm.set_tablet_map(table2, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);

// Reduce RF for table1, increasing tablet count
{
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);

// Reduce tablet count for table1
{
tablet_map tmap(1);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);

// Change replica of table1
{
tablet_map tmap(1);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);
}
}, tablet_cql_test_config());
}

SEASTAR_TEST_CASE(test_large_tablet_metadata) {
return do_with_cql_env_thread([] (cql_test_env& e) {
tablet_metadata tm;

auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());

const int nr_tables = 1'00;
const int tablets_per_table = 1024;

for (int i = 0; i < nr_tables; ++i) {
tablet_map tmap(tablets_per_table);

for (tablet_id j : tmap.tablet_ids()) {
tmap.set_tablet(j, tablet_info {
tablet_replica_set {{h1, 0}, {h2, 1}, {h3, 2},}
});
}

auto id = add_table(e).get0();
tm.set_tablet_map(id, std::move(tmap));
}

verify_tablet_metadata_persistence(e, tm);
}, tablet_cql_test_config());
}

SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
const auto real_min_token = dht::token(dht::token_kind::key, std::numeric_limits<int64_t>::min() + 1);
const auto real_max_token = dht::token(dht::token_kind::key, std::numeric_limits<int64_t>::max());

for (auto&& tmap : {
tablet_map(1),
tablet_map(2),
tablet_map(4),
tablet_map(16),
tablet_map(1024),
}) {
testlog.debug("tmap: {}", tmap);

BOOST_REQUIRE_EQUAL(real_min_token, tmap.get_first_token(tmap.first_tablet()));
BOOST_REQUIRE_EQUAL(real_max_token, tmap.get_last_token(tmap.last_tablet()));

std::optional<tablet_id> prev_tb;
for (tablet_id tb : tmap.tablet_ids()) {
testlog.debug("first: {}, last: {}", tmap.get_first_token(tb), tmap.get_last_token(tb));
BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_first_token(tb)));
BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_last_token(tb)));
if (prev_tb) {
BOOST_REQUIRE_EQUAL(dht::next_token(tmap.get_last_token(*prev_tb)), tmap.get_first_token(tb));
}
prev_tb = tb;
}
}
}
13 changes: 11 additions & 2 deletions test/lib/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <seastar/core/coroutine.hh>
#include "utils/UUID_gen.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
#include "compaction/compaction_manager.hh"
#include "message/messaging_service.hh"
#include "service/raft/raft_address_map.hh"
Expand Down Expand Up @@ -298,9 +299,7 @@ class single_node_cql_env : public cql_test_env {
}

virtual future<> create_table(std::function<schema(std::string_view)> schema_maker) override {
auto id = table_id(utils::UUID_gen::get_time_UUID());
schema_builder builder(make_lw_shared<schema>(schema_maker(ks_name)));
builder.set_uuid(id);
auto s = builder.build(schema_builder::compact_storage::no);
auto group0_guard = co_await _mm.local().start_group0_operation();
auto ts = group0_guard.write_timestamp();
Expand Down Expand Up @@ -767,6 +766,12 @@ class single_node_cql_env : public cql_test_env {
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

distributed<service::tablet_allocator> the_tablet_allocator;
the_tablet_allocator.start(std::ref(mm_notif), std::ref(db)).get();
auto stop_tablet_allocator = defer([&] {
the_tablet_allocator.stop().get();
});

cql3::query_processor::memory_config qp_mcfg;
if (cfg_in.qp_mcfg) {
qp_mcfg = *cfg_in.qp_mcfg;
Expand Down Expand Up @@ -813,6 +818,10 @@ class single_node_cql_env : public cql_test_env {
std::ref(snitch)).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });

ss.invoke_on_all([&] (service::storage_service& ss) {
ss.set_query_processor(qp.local());
}).get();

for (const auto p: all_system_table_load_phases) {
replica::distributed_loader::init_system_keyspace(sys_ks, db, ss, gossiper, raft_gr, *cfg, p).get();
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/test_services.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, s
_data->cfg.cf_stats = &_data->cf_stats;
_data->cfg.enable_commitlog = false;
_data->cm.enable();
_data->cf = make_lw_shared<replica::column_family>(_data->s, _data->cfg, make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker);
_data->cf = make_lw_shared<replica::column_family>(_data->s, _data->cfg, make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker, nullptr);
_data->cf->mark_ready_for_writes();
_data->table_s = std::make_unique<table_state>(*_data, sstables_manager);
_data->cm.add(*_data->table_s);
Expand Down
1 change: 1 addition & 0 deletions test/perf/entry_point.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ int scylla_fast_forward_main(int argc, char** argv);
int scylla_row_cache_update_main(int argc, char**argv);
int scylla_simple_query_main(int argc, char** argv);
int scylla_sstable_main(int argc, char** argv);
int scylla_tablets_main(int argc, char**argv);

} // namespace tools
2 changes: 1 addition & 1 deletion test/perf/perf_sstable.hh
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public:
cell_locker_stats cl_stats;
tasks::task_manager tm;
auto cm = make_lw_shared<compaction_manager>(tm, compaction_manager::for_testing_tag{});
auto cf = make_lw_shared<replica::column_family>(s, env.make_table_config(), make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker);
auto cf = make_lw_shared<replica::column_family>(s, env.make_table_config(), make_lw_shared<replica::storage_options>(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker, nullptr);

auto start = perf_sstable_test_env::now();

Expand Down
176 changes: 176 additions & 0 deletions test/perf/perf_tablets.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include <seastar/core/distributed.hh>
#include <seastar/core/app-template.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/reactor.hh>

#include "locator/tablets.hh"
#include "replica/tablets.hh"
#include "locator/tablet_replication_strategy.hh"
#include "db/config.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "db/system_keyspace.hh"

#include "test/perf/perf.hh"
#include "test/lib/log.hh"
#include "test/lib/cql_test_env.hh"

using namespace locator;
using namespace replica;

seastar::abort_source aborted;

static const size_t MiB = 1 << 20;

static
cql_test_config tablet_cql_test_config() {
cql_test_config c;
c.db_config->experimental_features({
db::experimental_features_t::feature::TABLETS,
db::experimental_features_t::feature::RAFT
}, db::config::config_source::CommandLine);
return c;
}

static
future<table_id> add_table(cql_test_env& e) {
auto id = table_id(utils::UUID_gen::get_time_UUID());
co_await e.create_table([id] (std::string_view ks_name) {
return *schema_builder(ks_name, id.to_sstring(), id)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("r1", int32_type)
.build();
});
co_return id;
}

static future<> test_basic_operations(app_template& app) {
return do_with_cql_env_thread([&] (cql_test_env& e) {
tablet_metadata tm;

auto h1 = host_id(utils::UUID_gen::get_time_UUID());

int nr_tables = app.configuration()["tables"].as<int>();
int tablets_per_table = app.configuration()["tablets-per-table"].as<int>();
int rf = app.configuration()["rf"].as<int>();

size_t total_tablets = 0;

std::vector<table_id> ids;
ids.resize(nr_tables);
for (int i = 0; i < nr_tables; ++i) {
ids[i] = add_table(e).get0();
}

testlog.info("Generating tablet metadata");

for (int i = 0; i < nr_tables; ++i) {
tablet_map tmap(tablets_per_table);

for (tablet_id j : tmap.tablet_ids()) {
aborted.check();
thread::maybe_yield();
tablet_replica_set replicas;
for (int k = 0; k < rf; ++k) {
replicas.push_back({h1, 0});
}
assert(replicas.size() == rf);
tmap.set_tablet(j, tablet_info{std::move(replicas)});
++total_tablets;
}

tm.set_tablet_map(ids[i], std::move(tmap));
}

testlog.info("Total tablet count: {}", total_tablets);

testlog.info("Size of tablet_metadata in memory: {} KiB",
(tm.external_memory_usage() + sizeof(tablet_metadata)) / 1024);

tablet_metadata tm2;
auto time_to_copy = duration_in_seconds([&] {
tm2 = tm;
});

testlog.info("Copied in {:.6f} [ms]", time_to_copy.count() * 1000);

auto time_to_clear = duration_in_seconds([&] {
tm2.clear_gently().get();
});

testlog.info("Cleared in {:.6f} [ms]", time_to_clear.count() * 1000);

auto time_to_save = duration_in_seconds([&] {
save_tablet_metadata(e.local_db(), tm, api::new_timestamp()).get();
});

testlog.info("Saved in {:.6f} [ms]", time_to_save.count() * 1000);

auto time_to_read = duration_in_seconds([&] {
tm2 = read_tablet_metadata(e.local_qp()).get0();
});
assert(tm == tm2);

testlog.info("Read in {:.6f} [ms]", time_to_read.count() * 1000);

std::vector<canonical_mutation> muts;
auto time_to_read_muts = duration_in_seconds([&] {
muts = replica::read_tablet_mutations(e.local_qp().proxy().container()).get0();
});

testlog.info("Read mutations in {:.6f} [ms]", time_to_read_muts.count() * 1000);

auto cm_size = 0;
for (auto&& cm : muts) {
cm_size += cm.representation().size();
}

testlog.info("Size of canonical mutations: {:.6f} [MiB]", double(cm_size) / MiB);

auto&& tablets_table = e.local_db().find_column_family(db::system_keyspace::tablets());
testlog.info("Disk space used by system.tablets: {:.6f} [MiB]", double(tablets_table.get_stats().live_disk_space_used) / MiB);
}, tablet_cql_test_config());
}

namespace perf {

int scylla_tablets_main(int argc, char** argv) {
namespace bpo = boost::program_options;
app_template app;
app.add_options()
("tables", bpo::value<int>()->default_value(100), "Number of tables to create.")
("tablets-per-table", bpo::value<int>()->default_value(10000), "Number of tablets per table.")
("rf", bpo::value<int>()->default_value(3), "Number of replicas per tablet.")
("verbose", "Enables standard logging")
;
return app.run(argc, argv, [&] {
return seastar::async([&] {
if (!app.configuration().contains("verbose")) {
auto testlog_level = logging::logger_registry().get_logger_level("testlog");
logging::logger_registry().set_all_loggers_level(seastar::log_level::warn);
logging::logger_registry().set_logger_level("testlog", testlog_level);
}
engine().at_exit([] {
aborted.request_abort();
return make_ready_future();
});
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
try {
test_basic_operations(app).get();
} catch (seastar::abort_requested_exception&) {
// Ignore
}
});
});
}

} // namespace perf
1 change: 1 addition & 0 deletions test/topology/suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ cluster_size: 3
extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
experimental_features: [tablets]
99 changes: 99 additions & 0 deletions test/topology/test_tablets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#

from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
import pytest
import asyncio
import logging


logger = logging.getLogger(__name__)


async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)


@pytest.mark.asyncio
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
"""Test that you can create a table and insert and query data"""

servers = await manager.running_servers()

s0 = servers[0].server_id
not_s0 = servers[1:]

# s0 should miss schema and tablet changes
await manager.server_stop_gracefully(s0)

manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 3, 'initial_tablets': 100};")

# force s0 to catch up later from the snapshot and not the raft log
await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0)
manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")

keys = range(10)
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 1);") for k in keys])

rows = manager.cql.execute("SELECT * FROM test.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == 1

await manager.server_start(s0, wait_others=2)

manager.driver_close()
await manager.driver_connect(server=servers[0])

# Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema
manager.cql.execute("CREATE KEYSPACE test_dummy WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 1, 'initial_tablets': 1};")

await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
for k in keys])

rows = manager.cql.execute("SELECT * FROM test.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == 2

# Check that after rolling restart the tablet metadata is still there
for s in servers:
manager.server_restart(s, wait_others=2)

await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
for k in keys])

rows = manager.cql.execute("SELECT * FROM test.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == 3

manager.cql.execute("DROP KEYSPACE test;")
manager.cql.execute("DROP KEYSPACE test_dummy;")


@pytest.mark.asyncio
async def test_scans(manager: ManagerClient):
manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 1, 'initial_tablets': 8};")
manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")

keys = range(100)
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])

rows = manager.cql.execute("SELECT count(*) FROM test.test;")
assert rows.one().count == len(keys)

rows = manager.cql.execute("SELECT * FROM test.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == r.pk

manager.cql.execute("DROP KEYSPACE test;")
2 changes: 1 addition & 1 deletion thrift/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ class thrift_handler : public CassandraCobSvIf {
throw NotFoundException();
}

co_return mm.prepare_keyspace_drop_announcement(keyspace, ts);
co_return co_await mm.prepare_keyspace_drop_announcement(keyspace, ts);
});
});
}
Expand Down
3 changes: 1 addition & 2 deletions tombstone_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ static bool needs_repair_before_gc(const replica::database& db, sstring ks_name)
// need to run repair even if tombstone_gc mode = repair.
auto& ks = db.find_keyspace(ks_name);
auto& rs = ks.get_replication_strategy();
auto erm = ks.get_effective_replication_map();
bool needs_repair = rs.get_type() != locator::replication_strategy_type::local
&& erm->get_replication_factor() != 1;
&& rs.get_replication_factor(db.get_token_metadata()) != 1;
return needs_repair;
}

Expand Down
2 changes: 2 additions & 0 deletions transport/event_notifier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ void cql_server::event_notifier::on_update_aggregate(const sstring& ks_name, con
elogger.warn("%s event ignored", __func__);
}

void cql_server::event_notifier::on_update_tablet_metadata() {}

void cql_server::event_notifier::on_drop_keyspace(const sstring& ks_name)
{
for (auto&& conn : _schema_change_listeners) {
Expand Down
1 change: 1 addition & 0 deletions transport/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ public:
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_update_tablet_metadata() override;

virtual void on_drop_keyspace(const sstring& ks_name) override;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;
Expand Down
11 changes: 11 additions & 0 deletions utils/chunked_vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <utility>
#include <algorithm>
#include <stdexcept>
#include <malloc.h>

namespace utils {

Expand Down Expand Up @@ -193,6 +194,8 @@ public:
size_t memory_size() const {
return _capacity * sizeof(T);
}

size_t external_memory_usage() const;
public:
template <class ValueType>
class iterator_type {
Expand Down Expand Up @@ -301,6 +304,14 @@ public:
}
};

template<typename T, size_t max_contiguous_allocation>
size_t chunked_vector<T, max_contiguous_allocation>::external_memory_usage() const {
size_t result = 0;
for (auto&& chunk : _chunks) {
result += ::malloc_usable_size(chunk.get());
}
return result;
}

template <typename T, size_t max_contiguous_allocation>
chunked_vector<T, max_contiguous_allocation>::chunked_vector(const chunked_vector& x)
Expand Down
8 changes: 8 additions & 0 deletions utils/small_vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <initializer_list>
#include <memory>
#include <stdexcept>
#include <malloc.h>

namespace utils {

Expand Down Expand Up @@ -234,6 +235,13 @@ public:
}
}

size_t external_memory_usage() const {
if (uses_internal_storage()) {
return 0;
}
return ::malloc_usable_size(_begin);
}

void reserve(size_t n) {
if (__builtin_expect(_begin + n > _capacity_end, false)) {
expand(n);
Expand Down