Skip to content

Commit

Permalink
db/legacy_schema_tables: Merge schema locking
Browse files Browse the repository at this point in the history
Add locking to merge_schema() to ensure only one CPU is able to fiddle
with internals at a time.

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
  • Loading branch information
Pekka Enberg authored and avikivity committed Jul 22, 2015
1 parent 6a9d049 commit ea668b5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
29 changes: 25 additions & 4 deletions db/legacy_schema_tables.cc
Expand Up @@ -479,6 +479,16 @@ future<> save_system_keyspace_schema() {
}
#endif

static semaphore the_merge_lock;

future<> merge_lock() {
return smp::submit_to(0, [] { return the_merge_lock.wait(); });
}

future<> merge_unlock() {
return smp::submit_to(0, [] { the_merge_lock.signal(); });
}

/**
* Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
* (which also involves fs operations on add/drop ks/cf)
Expand All @@ -490,12 +500,25 @@ future<> save_system_keyspace_schema() {
*/
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations)
{
return merge_schema(proxy, std::move(mutations), true).then([&proxy] {
return update_schema_version_and_announce(proxy);
return merge_lock().then([&proxy, mutations = std::move(mutations)] {
return do_merge_schema(proxy, std::move(mutations), true).then([&proxy] {
return update_schema_version_and_announce(proxy);
});
}).finally([] {
return merge_unlock();
});
}

future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
{
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] {
return merge_schema(proxy, std::move(mutations), do_flush);
}).finally([] {
return merge_unlock();
});
}

future<> do_merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
{
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] {
schema_ptr s = keyspaces();
Expand Down Expand Up @@ -532,8 +555,6 @@ future<> save_system_keyspace_schema() {
/*auto& new_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();

// FIXME: Make the update atomic like in Origin.

std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)).get0();
#if 0
Expand Down
2 changes: 2 additions & 0 deletions db/legacy_schema_tables.hh
Expand Up @@ -65,6 +65,8 @@ future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutat

future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);

future<> do_merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);

future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after);

std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
Expand Down

0 comments on commit ea668b5

Please sign in to comment.