Skip to content

Commit

Permalink
Merge 'Allow materialized views to by synchronous' from Piotr Sarna
Browse files Browse the repository at this point in the history
This pull request introduces a "synchronous mode" for global views. In this mode, all view updates are applied synchronously as if the view was local.

Marking view as a synchronous one can be done using `CREATE MATERIALIZED VIEW` and `ALTER MATERIALIZED VIEW`. E.g.:
```cql
ALTER MATERIALIZED VIEW ks.v WITH synchronous_updates = true;
```

Marking view as a synchronous one was done using tags (originally used by alternator). No big modifications in the view's code were needed.

Fixes: #10545

Closes #11013

* github.com:scylladb/scylla:
  cql-pytest: extend synchronous mv test with new cases
  cql-pytest: allow extra parameters in new_materialized_view
  docs: add a paragraph on view synchronous updates
  test/boost/cql_query_test: add test setting synchronous updates property
  test: cql-pytest: add a test for synchronous mode materialized views
  db: view: react to synchronous updates tag
  cql3: statements: cf_prop_defs: apply synchronous updates tag
  alternator, db: move the tag code to db/tags
  cql3: statements: add a synchronous_updates property
  • Loading branch information
nyh committed Jul 26, 2022
2 parents 5014bd0 + 277aa30 commit cb8a67d
Show file tree
Hide file tree
Showing 20 changed files with 319 additions and 73 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -22,6 +22,7 @@ resources
.pytest_cache
/expressions.tokens
tags
!db/tags/
testlog
test/*/*.reject
.vscode
Expand Down
70 changes: 17 additions & 53 deletions alternator/executor.cc
Expand Up @@ -41,7 +41,8 @@
#include "collection_mutation.hh"
#include "db/query_context.hh"
#include "schema.hh"
#include "alternator/tags_extension.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/rmw_operation.hh"
#include <seastar/core/coroutine.hh>
#include <boost/range/adaptors.hpp>
Expand Down Expand Up @@ -638,34 +639,6 @@ static schema_ptr get_table_from_arn(service::storage_proxy& proxy, std::string_
}
}

const std::map<sstring, sstring>& get_tags_of_table(schema_ptr schema) {
auto it = schema->extensions().find(tags_extension::NAME);
if (it == schema->extensions().end()) {
throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name()));
}
auto tags_extension = static_pointer_cast<alternator::tags_extension>(it->second);
return tags_extension->tags();
}

// find_tag() returns the value of a specific tag, or nothing if it doesn't
// exist. Unlike get_tags_of_table() above, if the table is missing the
// tags extension (e.g., is not an Alternator table) it's not an error -
// we return nothing, as in the case that tags exist but not this tag.
std::optional<std::string> find_tag(const schema& s, const sstring& tag) {
auto it1 = s.extensions().find(tags_extension::NAME);
if (it1 == s.extensions().end()) {
return std::nullopt;
}
const std::map<sstring, sstring>& tags_map =
static_pointer_cast<alternator::tags_extension>(it1->second)->tags();
auto it2 = tags_map.find(tag);
if (it2 == tags_map.end()) {
return std::nullopt;
} else {
return it2->second;
}
}

static bool is_legal_tag_char(char c) {
// FIXME: According to docs, unicode strings should also be accepted.
// Alternator currently uses a simplified ASCII approach
Expand Down Expand Up @@ -760,22 +733,13 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
validate_tags(tags_map);
}

// FIXME: Updating tags currently relies on updating schema, which may be subject
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
// FIXME: the following needs to be in a loop. If mm.announce() below
// fails, we need to retry the whole thing.
auto group0_guard = co_await mm.start_group0_operation();

schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());

co_await mm.announce(std::move(m), std::move(group0_guard));
});
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema) {
auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
return *tags_ptr;
} else {
throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name()));
}
}

future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
Expand All @@ -786,7 +750,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
co_return api_error::access_denied("Incorrect resource identifier");
}
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
const rjson::value* tags = rjson::find(request, "Tags");
if (!tags || !tags->IsArray()) {
co_return api_error::validation("Cannot parse tags");
Expand All @@ -795,7 +759,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
co_return api_error::validation("The number of tags must be at least 1") ;
}
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_return json_string("");
}

Expand All @@ -813,9 +777,9 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli

schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));

std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_return json_string("");
}

Expand All @@ -827,7 +791,7 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
}
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));

auto tags_map = get_tags_of_table(schema);
auto tags_map = get_tags_of_table_or_throw(schema);
rjson::value ret = rjson::empty_object();
rjson::add(ret, "Tags", rjson::empty_array());

Expand Down Expand Up @@ -1046,7 +1010,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
if (tags && tags->IsArray()) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
}
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));

schema_ptr schema = builder.build();
auto where_clause_it = where_clauses.begin();
Expand All @@ -1061,7 +1025,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
}
const bool include_all_columns = true;
view_builder.with_view_info(*schema, include_all_columns, *where_clause_it);
view_builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>());
view_builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>());
++where_clause_it;
}

Expand Down Expand Up @@ -1455,7 +1419,7 @@ std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::re
}

rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(schema_ptr schema) {
const auto& tags = get_tags_of_table(schema);
const auto& tags = get_tags_of_table_or_throw(schema);
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
if (it == tags.end() || it->second.empty()) {
return default_write_isolation;
Expand Down
5 changes: 2 additions & 3 deletions alternator/executor.hh
Expand Up @@ -81,11 +81,10 @@ namespace parsed {
class path;
};

const std::map<sstring, sstring>& get_tags_of_table(schema_ptr schema);
std::optional<std::string> find_tag(const schema& s, const sstring& tag);
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map);
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension.
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema);

// An attribute_path_map object is used to hold data for various attributes
// paths (parsed::path) in a hierarchy of attribute paths. Each attribute path
Expand Down
1 change: 0 additions & 1 deletion alternator/streams.cc
Expand Up @@ -33,7 +33,6 @@
#include "gms/feature_service.hh"

#include "executor.hh"
#include "tags_extension.hh"
#include "rmw_operation.hh"

/**
Expand Down
9 changes: 5 additions & 4 deletions alternator/ttl.cc
Expand Up @@ -46,6 +46,7 @@
#include "alternator/serialization.hh"
#include "dht/sharder.hh"
#include "db/config.hh"
#include "db/tags/utils.hh"

#include "ttl.hh"

Expand Down Expand Up @@ -91,7 +92,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());

std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
co_return api_error::validation("TTL is already enabled");
Expand All @@ -108,7 +109,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
tags_map.erase(TTL_TAG_KEY);
}
co_await update_tags(_mm, schema, std::move(tags_map));
co_await db::update_tags(_mm, schema, std::move(tags_map));
// Prepare the response, which contains a TimeToLiveSpecification
// basically identical to the request's
rjson::value response = rjson::empty_object();
Expand All @@ -119,7 +120,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
future<executor::request_return_type> executor::describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.describe_time_to_live++;
schema_ptr schema = get_table(_proxy, request);
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
rjson::value desc = rjson::empty_object();
auto i = tags_map.find(TTL_TAG_KEY);
if (i == tags_map.end()) {
Expand Down Expand Up @@ -644,7 +645,7 @@ static future<bool> scan_table(
// Check if an expiration-time attribute is enabled for this table.
// If not, just return false immediately.
// FIXME: the setting of the TTL may change in the middle of a long scan!
std::optional<std::string> attribute_name = find_tag(*s, TTL_TAG_KEY);
std::optional<std::string> attribute_name = db::find_tag(*s, TTL_TAG_KEY);
if (!attribute_name) {
co_return false;
}
Expand Down
1 change: 1 addition & 0 deletions configure.py
Expand Up @@ -887,6 +887,7 @@ def find_headers(repodir, excluded_dirs):
'db/large_data_handler.cc',
'db/marshal/type_parser.cc',
'db/batchlog_manager.cc',
'db/tags/utils.cc',
'db/view/view.cc',
'db/view/view_update_generator.cc',
'db/view/row_locking.cc',
Expand Down
4 changes: 4 additions & 0 deletions cql3/statements/alter_table_statement.cc
Expand Up @@ -341,6 +341,10 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
}
}

if (_properties->get_synchronous_updates_flag()) {
throw exceptions::invalid_request_exception(format("The synchronous_updates option is only applicable to materialized views, not to base tables"));
}

_properties->apply_to_builder(cfm, std::move(schema_extensions));
}
break;
Expand Down
18 changes: 17 additions & 1 deletion cql3/statements/cf_prop_defs.cc
Expand Up @@ -11,6 +11,7 @@
#include "cql3/statements/cf_prop_defs.hh"
#include "data_dictionary/data_dictionary.hh"
#include "db/extensions.hh"
#include "db/tags/extension.hh"
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "gms/feature.hh"
Expand Down Expand Up @@ -40,6 +41,7 @@ const sstring cf_prop_defs::KW_MAX_INDEX_INTERVAL = "max_index_interval";
const sstring cf_prop_defs::KW_SPECULATIVE_RETRY = "speculative_retry";
const sstring cf_prop_defs::KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
const sstring cf_prop_defs::KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
const sstring cf_prop_defs::KW_SYNCHRONOUS_UPDATES = "synchronous_updates";

const sstring cf_prop_defs::KW_COMPACTION = "compaction";
const sstring cf_prop_defs::KW_COMPRESSION = "compression";
Expand Down Expand Up @@ -79,7 +81,8 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
KW_GCGRACESECONDS, KW_CACHING, KW_DEFAULT_TIME_TO_LIVE,
KW_MIN_INDEX_INTERVAL, KW_MAX_INDEX_INTERVAL, KW_SPECULATIVE_RETRY,
KW_BF_FP_CHANCE, KW_MEMTABLE_FLUSH_PERIOD, KW_COMPACTION,
KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS
KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS,
KW_SYNCHRONOUS_UPDATES
});
static std::set<sstring> obsolete_keywords({
sstring("index_interval"),
Expand Down Expand Up @@ -178,6 +181,10 @@ int32_t cf_prop_defs::get_gc_grace_seconds() const
return get_int(KW_GCGRACESECONDS, DEFAULT_GC_GRACE_SECONDS);
}

bool cf_prop_defs::get_synchronous_updates_flag() const {
return get_boolean(KW_SYNCHRONOUS_UPDATES, false);
}

int32_t cf_prop_defs::get_paxos_grace_seconds() const {
return get_int(KW_PAXOSGRACESECONDS, DEFAULT_GC_GRACE_SECONDS);
}
Expand Down Expand Up @@ -330,6 +337,15 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
}

builder.set_extensions(std::move(schema_extensions));

if (has_property(KW_SYNCHRONOUS_UPDATES)) {
bool is_synchronous = get_synchronous_updates_flag();
std::map<sstring, sstring> tags_map = {
{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, is_synchronous ? "true" : "false"}
};

builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
}
}

void cf_prop_defs::validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const
Expand Down
2 changes: 2 additions & 0 deletions cql3/statements/cf_prop_defs.hh
Expand Up @@ -49,6 +49,7 @@ public:
static const sstring KW_SPECULATIVE_RETRY;
static const sstring KW_BF_FP_CHANCE;
static const sstring KW_MEMTABLE_FLUSH_PERIOD;
static const sstring KW_SYNCHRONOUS_UPDATES;

static const sstring KW_COMPACTION;
static const sstring KW_COMPRESSION;
Expand Down Expand Up @@ -100,6 +101,7 @@ public:
int32_t get_gc_grace_seconds() const;
int32_t get_paxos_grace_seconds() const;
std::optional<utils::UUID> get_id() const;
bool get_synchronous_updates_flag() const;

void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions) const;
void validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const;
Expand Down
3 changes: 3 additions & 0 deletions cql3/statements/create_table_statement.cc
Expand Up @@ -181,6 +181,9 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}

_properties.validate(db, keyspace(), _properties.properties()->make_schema_extensions(db.extensions()));
if (_properties.properties()->get_synchronous_updates_flag()) {
throw exceptions::invalid_request_exception(format("The synchronous_updates option is only applicable to materialized views, not to base tables"));
}
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;

auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
Expand Down
7 changes: 6 additions & 1 deletion alternator/tags_extension.hh → db/tags/extension.hh
Expand Up @@ -12,7 +12,7 @@
#include "schema.hh"
#include "db/extensions.hh"

namespace alternator {
namespace db {

class tags_extension : public schema_extension {
public:
Expand All @@ -37,4 +37,9 @@ private:
std::map<sstring, sstring> _tags;
};

// Information whether the view updates are synchronous is stored using the
// SYNCHRONOUS_VIEW_UPDATES_TAG_KEY tag. Value of this tag is a stored as a
// serialized boolean value ("true" or "false")
static const sstring SYNCHRONOUS_VIEW_UPDATES_TAG_KEY("system:synchronous_view_updates");

}
56 changes: 56 additions & 0 deletions db/tags/utils.cc
@@ -0,0 +1,56 @@
/*
* Copyright 2022-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include "db/tags/utils.hh"

#include "db/tags/extension.hh"
#include "schema_builder.hh"
#include "schema_registry.hh"

namespace db {

const std::map<sstring, sstring>* get_tags_of_table(schema_ptr schema) {
auto it = schema->extensions().find(tags_extension::NAME);
if (it == schema->extensions().end()) {
return nullptr;
}
auto tags_ext = static_pointer_cast<tags_extension>(it->second);
return &tags_ext->tags();
}

std::optional<std::string> find_tag(const schema& s, const sstring& tag) {
auto it1 = s.extensions().find(tags_extension::NAME);
if (it1 == s.extensions().end()) {
return std::nullopt;
}
const std::map<sstring, sstring>& tags_map =
static_pointer_cast<tags_extension>(it1->second)->tags();
auto it2 = tags_map.find(tag);
if (it2 == tags_map.end()) {
return std::nullopt;
} else {
return it2->second;
}
}

future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
// FIXME: the following needs to be in a loop. If mm.announce() below
// fails, we need to retry the whole thing.
auto group0_guard = co_await mm.start_group0_operation();

schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());

co_await mm.announce(std::move(m), std::move(group0_guard));
});
}

}

0 comments on commit cb8a67d

Please sign in to comment.