Skip to content

Commit

Permalink
migration_manager: Append actual keyspace mutations with schema notif…
Browse files Browse the repository at this point in the history
…ications

There is a workaround for notification race, which attaches keyspace
mutations to other schema changes in case the target node missed the
keyspace creation. Currently that generated keyspace mutations on the
spot instead of using the ones stored in schema tables. Those
mutations would have current timestamp, as if the keyspace has been
just modified. This is problematic because this may generate an
overwrite of keyspace parameters with newer timestamp but with stale
values, if the node is not up to date with keyspace metadata.

That's especially the case when booting up a node without enabling
auto_bootstrap. In such case the node will not wait for schema sync
before creating auth tables. Such table creation will attach
potentially out of date mutations for keyspace metadata, which may
overwrite changes made to keyspace paramteters made earlier in the
cluster.

Refs #2129.
  • Loading branch information
tgrabiec committed Mar 7, 2017
1 parent 22199ab commit 06d4ad1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 58 deletions.
87 changes: 53 additions & 34 deletions db/schema_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ namespace schema_tables {

logging::logger logger("schema_tables");

struct push_back_and_return {
std::vector<mutation> muts;

std::vector<mutation> operator()(mutation&& m) {
muts.emplace_back(std::move(m));
return std::move(muts);
}
};

struct qualified_name {
sstring keyspace_name;
sstring table_name;
Expand Down Expand Up @@ -547,6 +556,14 @@ read_schema_partition_for_table(distributed<service::storage_proxy>& proxy, sche
return query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
}

future<mutation>
read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name) {
schema_ptr s = keyspaces();
auto key = partition_key::from_singular(*s, keyspace_name);
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), query::full_slice);
return query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
}

static semaphore the_merge_lock {1};

future<> merge_lock() {
Expand Down Expand Up @@ -1182,39 +1199,40 @@ void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp,
mutations.emplace_back(std::move(m));
}

std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
std::vector<mutation> mutations;
add_type_to_schema_mutation(type, timestamp, mutations);
return mutations;
}

std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
schema_ptr s = usertypes();
auto pkey = partition_key::from_singular(*s, type->_keyspace);
auto ckey = clustering_key::from_singular(*s, type->get_name_as_string());
mutation m{pkey, s};
m.partition().apply_delete(*s, ckey, tombstone(timestamp, gc_clock::now()));
mutations.emplace_back(std::move(m));

return mutations;
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

/*
* Table metadata serialization/deserialization.
*/

std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
std::vector<mutation> mutations;
add_table_or_view_to_schema_mutation(table, timestamp, true, mutations);
return mutations;

// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

static schema_mutations make_table_mutations(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers)
Expand Down Expand Up @@ -1347,15 +1365,13 @@ static void make_update_columns_mutations(schema_ptr old_table,
mutations.emplace_back(std::move(columns_mutation));
}

std::vector<mutation> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
future<std::vector<mutation>> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
api::timestamp_type timestamp,
bool from_thrift)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);

std::vector<mutation> mutations;
add_table_or_view_to_schema_mutation(new_table, timestamp, false, mutations);

make_update_columns_mutations(std::move(old_table), std::move(new_table), timestamp, from_thrift, mutations);
Expand All @@ -1373,7 +1389,8 @@ std::vector<mutation> make_update_table_mutations(lw_shared_ptr<keyspace_metadat
addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);

#endif
return mutations;
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

static void make_drop_table_or_view_mutations(schema_ptr schema_table,
Expand All @@ -1390,10 +1407,9 @@ static void make_drop_table_or_view_mutations(schema_ptr schema_table,
}
}

std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
std::vector<mutation> mutations;
make_drop_table_or_view_mutations(columnfamilies(), std::move(table), timestamp, mutations);

#if 0
Expand All @@ -1405,7 +1421,8 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
#endif
return mutations;
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s)
Expand Down Expand Up @@ -1899,37 +1916,39 @@ schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timesta
return s->is_view() ? make_view_mutations(view_ptr(s), timestamp, with_columns) : make_table_mutations(s, timestamp, with_columns);
}

std::vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp)
future<std::vector<mutation>> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
std::vector<mutation> mutations;
// And also the serialized base table.
auto base = keyspace->cf_meta_data().at(view->view_info()->base_name());
add_table_or_view_to_schema_mutation(base, timestamp, true, mutations);
add_table_or_view_to_schema_mutation(view, timestamp, true, mutations);
return mutations;

// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
future<std::vector<mutation>> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
view_ptr old_view,
view_ptr new_view,
api::timestamp_type timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
std::vector<mutation> mutations;
// And also the serialized base table.
auto base = keyspace->cf_meta_data().at(new_view->view_info()->base_name());
add_table_or_view_to_schema_mutation(base, timestamp, true, mutations);
add_table_or_view_to_schema_mutation(new_view, timestamp, false, mutations);
make_update_columns_mutations(old_view, new_view, timestamp, false, mutations);
return mutations;
}

std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp) {
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

future<std::vector<mutation>> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp) {
std::vector<mutation> mutations;
make_drop_table_or_view_mutations(views(), view, timestamp, mutations);
return mutations;
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
}

#if 0
Expand Down
17 changes: 9 additions & 8 deletions db/schema_tables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<ser

future<schema_result_value_type>
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, const sstring& keyspace_name);

future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations);

Expand All @@ -95,17 +96,17 @@ std::vector<mutation> make_drop_keyspace_mutations(lw_shared_ptr<keyspace_metada

lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result_value_type& partition);

std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);

std::vector<user_type> create_types_from_schema_partition(const schema_result_value_type& result);

std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);

void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp, std::vector<mutation>& mutations);

std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);

std::vector<mutation> make_update_table_mutations(
future<std::vector<mutation>> make_update_table_mutations(
lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
Expand All @@ -114,7 +115,7 @@ std::vector<mutation> make_update_table_mutations(

future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);

std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);

future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table);

Expand Down Expand Up @@ -149,11 +150,11 @@ schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timesta

void add_table_or_view_to_schema_mutation(schema_ptr view, api::timestamp_type timestamp, bool with_columns, std::vector<mutation>& mutations);

std::vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
future<std::vector<mutation>> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);

std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp);
future<std::vector<mutation>> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp);

std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
future<std::vector<mutation>> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);

sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);
Expand Down

0 comments on commit 06d4ad1

Please sign in to comment.