Skip to content

Commit

Permalink
Merge 'Add a method to return a list of shared sstables co-owned by a…
Browse files Browse the repository at this point in the history
… node ' from Pavel Solodovnikov

Also, a couple of code cleanups for CQL syntax in other shared_sstables methods.

Expose a `system_distributed_keyspace` sharded service in the `cql_test_env` to be able to write tests using `system_distributed*` keyspaces.

Closes scylladb#7

* github.com:avikivity/scylla:
  db: shared_sstables: add method to return the list of shared sstables owned by a node
  db: system_distributed_keyspace: shared_sstables: CQL syntax cleanup
  test: add system_distributed_keyspace to cql test environment
  • Loading branch information
avikivity committed Oct 25, 2021
2 parents b6f6b71 + 51ca01b commit 7c6d211
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
29 changes: 22 additions & 7 deletions db/system_distributed_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -835,34 +835,49 @@ future<> system_distributed_keyspace::drop_service_level(sstring service_level_n
}

future<> system_distributed_keyspace::add_shared_sstable_owner(utils::UUID table_id, sstring sstable, gms::inet_address owner) {
static const sstring insert_new_query = format("INSERT INTO {}.{} (table, sstable, owners) VALUES (?, ?, {{?}}) IF NOT EXISTS", NAME_EVERYWHERE, SHARED_SSTABLES);
static const sstring insert_new_query = format("INSERT INTO {}.{} (\"table\", sstable, owners) VALUES (?, ?, ?) IF NOT EXISTS", NAME_EVERYWHERE, SHARED_SSTABLES);
const auto insert_res = co_await _qp.execute_internal(insert_new_query,
db::consistency_level::SERIAL,
internal_distributed_query_state(),
{table_id, sstable, owner.addr()});
{table_id, sstable, make_set_value(inet_addr_type, {owner.addr()})});
if (insert_res->one().get_as<bool>("[applied]")) {
co_return;
}
static const sstring update_query = format("UPDATE {}.{} SET owners=owners+{{?}} WHERE table=? AND sstable=? IF EXISTS", NAME_EVERYWHERE, SHARED_SSTABLES);
static const sstring update_query = format("UPDATE {}.{} SET owners=owners+? WHERE \"table\"=? AND sstable=? IF EXISTS", NAME_EVERYWHERE, SHARED_SSTABLES);
co_return co_await _qp.execute_internal(update_query,
db::consistency_level::SERIAL,
internal_distributed_query_state(),
{table_id, sstable, owner.addr()}).discard_result();
{table_id, sstable, make_set_value(inet_addr_type, {owner.addr()})}).discard_result();
}

future<bool> system_distributed_keyspace::remove_shared_sstable_owner(utils::UUID table_id, sstring sstable, gms::inet_address owner) {
static const sstring update_query = format("UPDATE {}.{} SET owners=owners-{{?}} WHERE table=? AND sstable=? IF owners!={{}}", NAME_EVERYWHERE, SHARED_SSTABLES);
static const sstring update_query = format("UPDATE {}.{} SET owners=owners-? WHERE \"table\"=? AND sstable=? IF owners!={{}}", NAME_EVERYWHERE, SHARED_SSTABLES);
co_await _qp.execute_internal(update_query,
db::consistency_level::SERIAL,
internal_distributed_query_state(),
{table_id, sstable, owner.addr()}).discard_result();
{table_id, sstable, make_set_value(inet_addr_type, {owner.addr()})}).discard_result();

static const sstring delete_empty_query = format("DELETE FROM {}.{} WHERE table=? AND sstable=? IF owners!={{}}", NAME_EVERYWHERE, SHARED_SSTABLES);
static const sstring delete_empty_query = format("DELETE FROM {}.{} WHERE \"table\"=? AND sstable=? IF owners!={{}}", NAME_EVERYWHERE, SHARED_SSTABLES);
const auto delete_res = co_await _qp.execute_internal(delete_empty_query,
db::consistency_level::SERIAL,
internal_distributed_query_state(),
{table_id, sstable});
co_return delete_res->one().get_as<bool>("[applied]");
}

future<std::vector<sstring>> system_distributed_keyspace::shared_sstables_owned_by(gms::inet_address owner, utils::UUID table_id) {
static const sstring query = format("SELECT sstable FROM {}.{} WHERE \"table\"=? AND owners CONTAINS ? ALLOW FILTERING", NAME_EVERYWHERE, SHARED_SSTABLES);
auto res = co_await _qp.execute_internal(query,
db::consistency_level::ONE,
internal_distributed_query_state(),
{table_id, owner.addr()});

std::vector<sstring> sstables;
sstables.reserve(res->size());
for (const auto& row : *res) {
sstables.emplace_back(row.get_as<sstring>("sstable"));
}
co_return std::move(sstables);
}

}
1 change: 1 addition & 0 deletions db/system_distributed_keyspace.hh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public:

future<> add_shared_sstable_owner(utils::UUID table_id, sstring sstable, gms::inet_address owner);
future<bool> remove_shared_sstable_owner(utils::UUID table_id, sstring sstable, gms::inet_address owner);
future<std::vector<sstring>> shared_sstables_owned_by(gms::inet_address owner, utils::UUID table_id);
};

}
11 changes: 9 additions & 2 deletions test/lib/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class single_node_cql_env : public cql_test_env {
sharded<service::migration_notifier>& _mnotifier;
sharded<qos::service_level_controller>& _sl_controller;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
private:
struct core_local_state {
service::client_state client_state;
Expand Down Expand Up @@ -181,7 +182,8 @@ class single_node_cql_env : public cql_test_env {
sharded<db::view::view_update_generator>& view_update_generator,
sharded<service::migration_notifier>& mnotifier,
sharded<service::migration_manager>& mm,
sharded<qos::service_level_controller> &sl_controller)
sharded<qos::service_level_controller> &sl_controller,
sharded<db::system_distributed_keyspace>& sys_dist_ks)
: _db(db)
, _qp(qp)
, _auth_service(auth_service)
Expand All @@ -190,6 +192,7 @@ class single_node_cql_env : public cql_test_env {
, _mnotifier(mnotifier)
, _sl_controller(sl_controller)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
{
adjust_rlimit();
}
Expand Down Expand Up @@ -395,6 +398,10 @@ class single_node_cql_env : public cql_test_env {
});
}

virtual sharded<db::system_distributed_keyspace>& sys_dist_ks() override {
return _sys_dist_ks;
}

future<> start() {
return _core_local.start(std::ref(_auth_service), std::ref(_sl_controller));
}
Expand Down Expand Up @@ -766,7 +773,7 @@ class single_node_cql_env : public cql_test_env {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}

single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller));
single_node_cql_env env(db, qp, auth_service, view_builder, view_update_generator, mm_notif, mm, std::ref(sl_controller), sys_dist_ks);
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });

Expand Down
3 changes: 3 additions & 0 deletions test/lib/cql_test_env.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public:

namespace db {
class config;
class system_distributed_keyspace;
}

struct scheduling_groups {
Expand Down Expand Up @@ -163,6 +164,8 @@ public:
virtual sharded<service::migration_manager>& migration_manager() = 0;

virtual future<> refresh_client_state() = 0;

virtual sharded<db::system_distributed_keyspace>& sys_dist_ks() = 0;
};

future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config = {});
Expand Down

0 comments on commit 7c6d211

Please sign in to comment.