Skip to content

Commit

Permalink
alternator, db: move the tag code to db/tags
Browse files Browse the repository at this point in the history
Tags are a useful mechanism that could be used outside of alternator
namespace. My motivation to move tags_extension and other utilities to
db/tags/ was that I wanted to use them to mark "synchronous mode" views.

I have extracted `get_tags_of_table`, `find_tag` and `update_tags`
method to db/tags/utils.cc and moved alternator/tags_extension.hh to
db/tags/.

The signature of `get_tags_of_table` was changed from `const
std::map<sstring, sstring>&` to `const std::map<sstring, sstring>*`
Original behavior of this function was to throw an
`alternator::api_error` exception. This was undesirable, as it
introduced a dependency on the alternator module. I chose to change it
to return a potentially null value, and added a wrapper function to the
alternator module - `get_tags_of_table_or_throw` to keep the previous
throwing behavior.
  • Loading branch information
havaker authored and psarna committed Jul 25, 2022
1 parent 494e7fc commit 041cb77
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 66 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ resources
.pytest_cache
/expressions.tokens
tags
!db/tags/
testlog
test/*/*.reject
.vscode
Expand Down
70 changes: 17 additions & 53 deletions alternator/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
#include "collection_mutation.hh"
#include "db/query_context.hh"
#include "schema.hh"
#include "alternator/tags_extension.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/rmw_operation.hh"
#include <seastar/core/coroutine.hh>
#include <boost/range/adaptors.hpp>
Expand Down Expand Up @@ -638,34 +639,6 @@ static schema_ptr get_table_from_arn(service::storage_proxy& proxy, std::string_
}
}

const std::map<sstring, sstring>& get_tags_of_table(schema_ptr schema) {
auto it = schema->extensions().find(tags_extension::NAME);
if (it == schema->extensions().end()) {
throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name()));
}
auto tags_extension = static_pointer_cast<alternator::tags_extension>(it->second);
return tags_extension->tags();
}

// find_tag() returns the value of a specific tag, or nothing if it doesn't
// exist. Unlike get_tags_of_table() above, if the table is missing the
// tags extension (e.g., is not an Alternator table) it's not an error -
// we return nothing, as in the case that tags exist but not this tag.
std::optional<std::string> find_tag(const schema& s, const sstring& tag) {
auto it1 = s.extensions().find(tags_extension::NAME);
if (it1 == s.extensions().end()) {
return std::nullopt;
}
const std::map<sstring, sstring>& tags_map =
static_pointer_cast<alternator::tags_extension>(it1->second)->tags();
auto it2 = tags_map.find(tag);
if (it2 == tags_map.end()) {
return std::nullopt;
} else {
return it2->second;
}
}

static bool is_legal_tag_char(char c) {
// FIXME: According to docs, unicode strings should also be accepted.
// Alternator currently uses a simplified ASCII approach
Expand Down Expand Up @@ -760,22 +733,13 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
validate_tags(tags_map);
}

// FIXME: Updating tags currently relies on updating schema, which may be subject
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
// FIXME: the following needs to be in a loop. If mm.announce() below
// fails, we need to retry the whole thing.
auto group0_guard = co_await mm.start_group0_operation();

schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());

co_await mm.announce(std::move(m), std::move(group0_guard));
});
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema) {
auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
return *tags_ptr;
} else {
throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name()));
}
}

future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
Expand All @@ -786,7 +750,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
co_return api_error::access_denied("Incorrect resource identifier");
}
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
const rjson::value* tags = rjson::find(request, "Tags");
if (!tags || !tags->IsArray()) {
co_return api_error::validation("Cannot parse tags");
Expand All @@ -795,7 +759,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
co_return api_error::validation("The number of tags must be at least 1") ;
}
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_return json_string("");
}

Expand All @@ -813,9 +777,9 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli

schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));

std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_return json_string("");
}

Expand All @@ -827,7 +791,7 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
}
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));

auto tags_map = get_tags_of_table(schema);
auto tags_map = get_tags_of_table_or_throw(schema);
rjson::value ret = rjson::empty_object();
rjson::add(ret, "Tags", rjson::empty_array());

Expand Down Expand Up @@ -1046,7 +1010,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
if (tags && tags->IsArray()) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
}
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));

schema_ptr schema = builder.build();
auto where_clause_it = where_clauses.begin();
Expand All @@ -1061,7 +1025,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
}
const bool include_all_columns = true;
view_builder.with_view_info(*schema, include_all_columns, *where_clause_it);
view_builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>());
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>());
++where_clause_it;
}

Expand Down Expand Up @@ -1455,7 +1419,7 @@ std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::re
}

rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(schema_ptr schema) {
const auto& tags = get_tags_of_table(schema);
const auto& tags = get_tags_of_table_or_throw(schema);
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
if (it == tags.end() || it->second.empty()) {
return default_write_isolation;
Expand Down
5 changes: 2 additions & 3 deletions alternator/executor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@ namespace parsed {
class path;
};

const std::map<sstring, sstring>& get_tags_of_table(schema_ptr schema);
std::optional<std::string> find_tag(const schema& s, const sstring& tag);
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map);
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension.
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema);

// An attribute_path_map object is used to hold data for various attributes
// paths (parsed::path) in a hierarchy of attribute paths. Each attribute path
Expand Down
1 change: 0 additions & 1 deletion alternator/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "gms/feature_service.hh"

#include "executor.hh"
#include "tags_extension.hh"
#include "rmw_operation.hh"

/**
Expand Down
9 changes: 5 additions & 4 deletions alternator/ttl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "alternator/serialization.hh"
#include "dht/sharder.hh"
#include "db/config.hh"
#include "db/tags/utils.hh"

#include "ttl.hh"

Expand Down Expand Up @@ -91,7 +92,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());

std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
co_return api_error::validation("TTL is already enabled");
Expand All @@ -108,7 +109,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
tags_map.erase(TTL_TAG_KEY);
}
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
// Prepare the response, which contains a TimeToLiveSpecification
// basically identical to the request's
rjson::value response = rjson::empty_object();
Expand All @@ -119,7 +120,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
future<executor::request_return_type> executor::describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.describe_time_to_live++;
schema_ptr schema = get_table(_proxy, request);
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
rjson::value desc = rjson::empty_object();
auto i = tags_map.find(TTL_TAG_KEY);
if (i == tags_map.end()) {
Expand Down Expand Up @@ -644,7 +645,7 @@ static future<bool> scan_table(
// Check if an expiration-time attribute is enabled for this table.
// If not, just return false immediately.
// FIXME: the setting of the TTL may change in the middle of a long scan!
std::optional<std::string> attribute_name = find_tag(*s, TTL_TAG_KEY);
std::optional<std::string> attribute_name = db::find_tag(*s, TTL_TAG_KEY);
if (!attribute_name) {
co_return false;
}
Expand Down
1 change: 1 addition & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def find_headers(repodir, excluded_dirs):
'db/large_data_handler.cc',
'db/marshal/type_parser.cc',
'db/batchlog_manager.cc',
'db/tags/utils.cc',
'db/view/view.cc',
'db/view/view_update_generator.cc',
'db/view/row_locking.cc',
Expand Down
2 changes: 1 addition & 1 deletion alternator/tags_extension.hh → db/tags/extension.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "schema.hh"
#include "db/extensions.hh"

namespace alternator {
namespace db {

class tags_extension : public schema_extension {
public:
Expand Down
56 changes: 56 additions & 0 deletions db/tags/utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include "db/tags/utils.hh"

#include "db/tags/extension.hh"
#include "schema_builder.hh"
#include "schema_registry.hh"

namespace db {

const std::map<sstring, sstring>* get_tags_of_table(schema_ptr schema) {
auto it = schema->extensions().find(tags_extension::NAME);
if (it == schema->extensions().end()) {
return nullptr;
}
auto tags_ext = static_pointer_cast<tags_extension>(it->second);
return &tags_ext->tags();
}

std::optional<std::string> find_tag(const schema& s, const sstring& tag) {
auto it1 = s.extensions().find(tags_extension::NAME);
if (it1 == s.extensions().end()) {
return std::nullopt;
}
const std::map<sstring, sstring>& tags_map =
static_pointer_cast<tags_extension>(it1->second)->tags();
auto it2 = tags_map.find(tag);
if (it2 == tags_map.end()) {
return std::nullopt;
} else {
return it2->second;
}
}

future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
// FIXME: the following needs to be in a loop. If mm.announce() below
// fails, we need to retry the whole thing.
auto group0_guard = co_await mm.start_group0_operation();

schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());

co_await mm.announce(std::move(m), std::move(group0_guard));
});
}

}
41 changes: 41 additions & 0 deletions db/tags/utils.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include <functional>
#include <map>
#include <optional>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"

#include "schema.hh"
#include "service/client_state.hh"
#include "service/migration_manager.hh"

namespace db {

// get_tags_of_table() returns all tags associated with the given table, or
// nullptr if the table is missing the tags extension.
// Returned value is a non-owning pointer, which refers to the inner fields of the
// schema. It should not outlive the schema_ptr argument.
const std::map<sstring, sstring>* get_tags_of_table(schema_ptr schema);

// find_tag() returns the value of a specific tag, or nothing if it doesn't
// exist. If the table is missing the tags extension (e.g., is not an
// Alternator table) it's not an error - we return nothing, as in the case that
// tags exist but not this tag.
std::optional<std::string> find_tag(const schema& s, const sstring& tag);

// FIXME: Updating tags currently relies on updating schema, which may be subject
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map);

}
4 changes: 2 additions & 2 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
#include "tombstone_gc_extension.hh"
#include "alternator/tags_extension.hh"
#include "db/tags/extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/storage_proxy.hh"
Expand Down Expand Up @@ -457,7 +457,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
app_template app(std::move(app_cfg));

auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<alternator::tags_extension>(alternator::tags_extension::NAME);
ext->add_schema_extension<db::tags_extension>(db::tags_extension::NAME);
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
ext->add_schema_extension<tombstone_gc_extension>(tombstone_gc_extension::NAME);
Expand Down
4 changes: 2 additions & 2 deletions test/perf/perf_simple_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "alternator/tags_extension.hh"
#include "db/tags/extension.hh"
#include "gms/gossiper.hh"

static const sstring table_name = "cf";
Expand Down Expand Up @@ -531,7 +531,7 @@ int main(int argc, char** argv) {
seastar::testing::local_random_engine.seed(seed + this_shard_id());
}).then([&app] () -> future<> {
auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<alternator::tags_extension>(alternator::tags_extension::NAME);
ext->add_schema_extension<db::tags_extension>(db::tags_extension::NAME);
auto db_cfg = ::make_shared<db::config>(ext);

const auto enable_cache = app.configuration()["enable-cache"].as<bool>();
Expand Down

0 comments on commit 041cb77

Please sign in to comment.