Skip to content

Commit

Permalink
Merge 'alternator: fix isolation of concurrent modifications to tags'…
Browse files Browse the repository at this point in the history
… from Nadav Har'El

Alternator's implementation of TagResource, UntagResource and UpdateTimeToLive (the latter uses tags to store the TTL configuration) was unsafe for concurrent modifications - some of these modifications may be lost. This short series fixes the bug, and also adds (in the last patch) a test that reproduces the bug and verifies that it's fixed.

The cause of the incorrect isolation was that we separately read the old tags and wrote the modified tags. In this series we introduce a new function, `modify_tags()` which can do both under one lock, so concurrent tag operations are serialized and therefore isolated as expected.

Fixes #6389.

Closes #13150

* github.com:scylladb/scylladb:
  test/alternator: test concurrent TagResource / UntagResource
  db/tags: drop unsafe update_tags() utility function
  alternator: isolate concurrent modification to tags
  db/tags: add safe modify_tags() utility functions
  migration_manager: expose access to storage_proxy

(cherry picked from commit dba1d36)

Closes #16453
  • Loading branch information
denesb committed Dec 19, 2023
1 parent 23fd693 commit 46a29e9
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 32 deletions.
12 changes: 6 additions & 6 deletions alternator/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,16 +764,16 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
co_return api_error::access_denied("Incorrect resource identifier");
}
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
const rjson::value* tags = rjson::find(request, "Tags");
if (!tags || !tags->IsArray()) {
co_return api_error::validation("Cannot parse tags");
}
if (tags->Size() < 1) {
co_return api_error::validation("The number of tags must be at least 1") ;
}
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
});
co_return json_string("");
}

Expand All @@ -791,9 +791,9 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli

schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));

std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
co_await db::update_tags(_mm, schema, std::move(tags_map));
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
});
co_return json_string("");
}

Expand Down
35 changes: 18 additions & 17 deletions alternator/ttl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,25 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());

std::map<sstring, sstring> tags_map = get_tags_of_table_or_throw(schema);
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
co_return api_error::validation("TTL is already enabled");
}
tags_map[TTL_TAG_KEY] = attribute_name;
} else {
auto i = tags_map.find(TTL_TAG_KEY);
if (i == tags_map.end()) {
co_return api_error::validation("TTL is already disabled");
} else if (i->second != attribute_name) {
co_return api_error::validation(format(
"Requested to disable TTL on attribute {}, but a different attribute {} is enabled.",
attribute_name, i->second));
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
throw api_error::validation("TTL is already enabled");
}
tags_map[TTL_TAG_KEY] = attribute_name;
} else {
auto i = tags_map.find(TTL_TAG_KEY);
if (i == tags_map.end()) {
throw api_error::validation("TTL is already disabled");
} else if (i->second != attribute_name) {
throw api_error::validation(format(
"Requested to disable TTL on attribute {}, but a different attribute {} is enabled.",
attribute_name, i->second));
}
tags_map.erase(TTL_TAG_KEY);
}
tags_map.erase(TTL_TAG_KEY);
}
co_await db::update_tags(_mm, schema, std::move(tags_map));
});

// Prepare the response, which contains a TimeToLiveSpecification
// basically identical to the request's
rjson::value response = rjson::empty_object();
Expand Down
23 changes: 19 additions & 4 deletions db/tags/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "db/tags/extension.hh"
#include "schema_builder.hh"
#include "schema_registry.hh"
#include "service/storage_proxy.hh"
#include "data_dictionary/data_dictionary.hh"

namespace db {

Expand Down Expand Up @@ -38,14 +40,27 @@ std::optional<std::string> find_tag(const schema& s, const sstring& tag) {
}
}

future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
future<> modify_tags(service::migration_manager& mm, sstring ks, sstring cf,
std::function<void(std::map<sstring, sstring>&)> modify) {
co_await mm.container().invoke_on(0, [ks = std::move(ks), cf = std::move(cf), modify = std::move(modify)] (service::migration_manager& mm) -> future<> {
// FIXME: the following needs to be in a loop. If mm.announce() below
// fails, we need to retry the whole thing.
auto group0_guard = co_await mm.start_group0_operation();

// After getting the schema-modification lock, we need to read the
// table's *current* schema - it might have changed before we got
// the lock, by some concurrent modification. If the table is gone,
// this will throw no_such_column_family.
schema_ptr s = mm.get_storage_proxy().data_dictionary().find_schema(ks, cf);
const std::map<sstring, sstring>* tags_ptr = get_tags_of_table(s);
std::map<sstring, sstring> tags;
if (tags_ptr) {
// tags_ptr is a constant pointer to schema data. To allow func()
// to modify the tags, we must make a copy.
tags = *tags_ptr;
}
modify(tags);
schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());

Expand Down
19 changes: 14 additions & 5 deletions db/tags/utils.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ const std::map<sstring, sstring>* get_tags_of_table(schema_ptr schema);
// tags exist but not this tag.
std::optional<std::string> find_tag(const schema& s, const sstring& tag);

// FIXME: Updating tags currently relies on updating schema, which may be subject
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map);

// modify_tags() atomically modifies the tags on a given table: It reads the
// existing tags, passes them as a map to the given function which can modify
// the map, and finally writes the modified tags. This read-modify-write
// operation is atomic - isolated from other concurrent schema operations.
//
// The isolation requirement is also why modify_tags() takes the table's name
// ks,cf and not a schema object - the current schema may not be relevant by
// the time the tags are modified, due to some other concurrent modification.
// If a table (ks, cf) doesn't exist, no_such_column_family is thrown.
//
// If the table didn't have the tags schema extension, it's fine: The function
// is passed an empty map, and the tags it adds will be added to the table.
future<> modify_tags(service::migration_manager& mm, sstring ks, sstring cf,
std::function<void(std::map<sstring, sstring>&)> modify_func);
}
2 changes: 2 additions & 0 deletions service/migration_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public:

migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }
service::storage_proxy& get_storage_proxy() { return _storage_proxy; }
const service::storage_proxy& get_storage_proxy() const { return _storage_proxy; }

future<> submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node = true);

Expand Down
51 changes: 51 additions & 0 deletions test/alternator/test_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from botocore.exceptions import ClientError
import re
import time
import threading
from util import multiset, create_test_table, unique_table_name, random_string
from packaging.version import Version

Expand Down Expand Up @@ -291,3 +292,53 @@ def test_tag_lsi_gsi(table_lsi_gsi):
table_lsi_gsi.meta.client.tag_resource(ResourceArn=gsi_arn, Tags=tags)
with pytest.raises(ClientError, match='ValidationException.*ResourceArn'):
table_lsi_gsi.meta.client.tag_resource(ResourceArn=lsi_arn, Tags=tags)

# Test that if we concurrently add tags A and B to a table, both survive.
# If the process of adding tag A involved reading the current tags, adding
# A and then over-writing the tags back, if we did this for A and B
# concurrently the risk is that both would read the state before both changes.
# To solve this, Scylla needs to serialize tag modification. This test
# is designed to fail if this serialization is missing. Reproduces #6389
@pytest.mark.veryslow
def test_concurrent_tag(dynamodb, test_table):
client = test_table.meta.client
arn = client.describe_table(TableName=test_table.name)['Table']['TableArn']
# Unfortunately by default Python threads print their exceptions
# (e.g., assertion failures) but don't propagate them to the join(),
# so the overall test doesn't fail. The following Thread wrapper
# causes join() to rethrow the exception, so the test will fail.
class ThreadWrapper(threading.Thread):
def run(self):
try:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if hasattr(self, 'exception'):
raise self.exception
return self.ret

def tag_untag_once(tag):
client.tag_resource(ResourceArn=arn, Tags=[{'Key': tag, 'Value': 'Hello'}])
# Check that the tag that we just added is still on the table (and
# wasn't overwritten by a concurrent addition of a different tag):
got = test_table.meta.client.list_tags_of_resource(ResourceArn=arn)['Tags']
assert [x['Value'] for x in got if x['Key']==tag] == ['Hello']
client.untag_resource(ResourceArn=arn, TagKeys=[tag])
got = test_table.meta.client.list_tags_of_resource(ResourceArn=arn)['Tags']
assert [x['Value'] for x in got if x['Key']==tag] == []
def tag_loop(tag, count):
for i in range(count):
tag_untag_once(tag)
# The more iterations we do, the higher the chance of reproducing
# this issue. On my laptop, count = 100 reproduces the bug every time.
# Lower numbers have some chance of not catching the bug. If this
# issue starts to xpass, we may need to increase the count.
count = 200
t1 = ThreadWrapper(target=lambda: tag_loop('A', count))
t2 = ThreadWrapper(target=lambda: tag_loop('B', count))
t1.start()
t2.start()
t1.join()
t2.join()

0 comments on commit 46a29e9

Please sign in to comment.