Skip to content

Commit

Permalink
ovsdb-server: Allow replication from older schema version servers.
Browse files Browse the repository at this point in the history
Presently, replication is not allowed if there is a schema version mismatch between
the schema returned by the active ovsdb-server and the local db schema. This is
causing failures in OVN DB HA deployments during uprades.

In the case of OpenStack tripleo deployment with OVN, OVN DB ovsdb-servers are
deployed on a multi node controller cluster in active/standby mode. During
minor updates or major upgrades, the cluster is updated one at a time. If
a node A is running active OVN DB ovsdb-servers and when it is updated, another
node B becomes active. After the update when OVN DB ovsdb-servers in A are started,
these ovsdb-servers fail to replicate from the active if there is a schema
version mismatch.

This patch addresses this issue by allowing replication even if there is a
schema version mismatch only if all the active db schema tables and its colums are
present in the local db schema.

This should not result in any data loss.

Signed-off-by: Numan Siddique <numans@ovn.org>
Signed-off-by: Ben Pfaff <blp@ovn.org>
  • Loading branch information
numansiddique authored and blp committed Oct 24, 2019
1 parent f049775 commit 7ce27c9
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 25 deletions.
152 changes: 127 additions & 25 deletions ovsdb/replication.c
Expand Up @@ -43,7 +43,7 @@ static struct uuid server_uuid;
static struct jsonrpc_session *session;
static unsigned int session_seqno = UINT_MAX;

static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
static void add_monitored_table(struct ovsdb_table_schema *table,
struct json *monitor_requests);

Expand Down Expand Up @@ -100,16 +100,27 @@ enum ovsdb_replication_state {
static enum ovsdb_replication_state state;


struct replication_db {
struct ovsdb *db;
bool schema_version_higher;
/* Points to the schema received from the active server if
* the local db schema version is higher. NULL otherwise. */
struct ovsdb_schema *active_db_schema;
};

static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
struct ovsdb_schema *active_db_schema);

/* All DBs known to ovsdb-server. The actual replication dbs are stored
* in 'replication dbs', which is a subset of all dbs and remote dbs whose
* schema matches. */
static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
static struct shash *replication_dbs;

static struct shash *replication_db_clone(struct shash *dbs);
static struct shash *replication_dbs_create(void);
static void replication_dbs_destroy(void);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
static struct ovsdb* find_db(const char *db_name);
static struct replication_db *find_db(const char *db_name);


void
Expand Down Expand Up @@ -152,16 +163,16 @@ send_schema_requests(const struct json *result)
if (name->type == JSON_STRING) {
/* Send one schema request for each remote DB. */
const char *db_name = json_string(name);
struct ovsdb *db = find_db(db_name);
if (db) {
struct replication_db *rdb = find_db(db_name);
if (rdb) {
struct jsonrpc_msg *request =
jsonrpc_create_request(
"get_schema",
json_array_create_1(
json_string_create(db_name)),
NULL);

request_ids_add(request->id, db);
request_ids_add(request->id, rdb->db);
jsonrpc_session_send(session, request);
}
}
Expand Down Expand Up @@ -206,18 +217,19 @@ replication_run(void)
&& msg->params->array.n == 2
&& msg->params->array.elems[0]->type == JSON_STRING) {
char *db_name = msg->params->array.elems[0]->string;
struct ovsdb *db = find_db(db_name);
if (db) {
struct replication_db *rdb = find_db(db_name);
if (rdb) {
struct ovsdb_error *error;
error = process_notification(msg->params->array.elems[1],
db);
rdb->db);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
}
}
}
} else if (msg->type == JSONRPC_REPLY) {
struct replication_db *rdb;
struct ovsdb *db;
if (!request_ids_lookup_and_free(msg->id, &db)) {
VLOG_WARN("received unexpected reply");
Expand Down Expand Up @@ -256,7 +268,7 @@ replication_run(void)
jsonrpc_session_send(session, request);

replication_dbs_destroy();
replication_dbs = replication_db_clone(&local_dbs);
replication_dbs = replication_dbs_create();
state = RPL_S_DB_REQUESTED;
break;
}
Expand Down Expand Up @@ -284,17 +296,37 @@ replication_run(void)
state = RPL_S_ERR;
}

if (db != find_db(schema->name)) {
rdb = find_db(schema->name);
if (!rdb) {
/* Unexpected schema. */
VLOG_WARN("unexpected schema %s", schema->name);
state = RPL_S_ERR;
} else if (!ovsdb_schema_equal(schema, db->schema)) {
} else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
/* Schmea version mismatch. */
VLOG_INFO("Schema version mismatch, %s not replicated",
VLOG_INFO("Schema version mismatch, checking if %s can "
"still be replicated or not.",
schema->name);
shash_find_and_delete(replication_dbs, schema->name);
if (is_replication_possible(rdb->db->schema, schema)) {
VLOG_INFO("%s can be replicated.", schema->name);
rdb->schema_version_higher = true;
if (rdb->active_db_schema) {
ovsdb_schema_destroy(rdb->active_db_schema);
}
rdb->active_db_schema = schema;
} else {
VLOG_INFO("%s cannot be replicated.", schema->name);
struct replication_db *r =
shash_find_and_delete(replication_dbs,
schema->name);
if (r->active_db_schema) {
ovsdb_schema_destroy(r->active_db_schema);
}
free(r);
ovsdb_schema_destroy(schema);
}
} else {
ovsdb_schema_destroy(schema);
}
ovsdb_schema_destroy(schema);

/* After receiving schemas, reset the local databases that
* will be monitored and send out monitor requests for them. */
Expand All @@ -306,11 +338,13 @@ replication_run(void)
state = RPL_S_ERR;
} else {
SHASH_FOR_EACH (node, replication_dbs) {
db = node->data;
rdb = node->data;
struct jsonrpc_msg *request =
create_monitor_request(db);
create_monitor_request(
rdb->schema_version_higher ?
rdb->active_db_schema : rdb->db->schema);

request_ids_add(request->id, db);
request_ids_add(request->id, rdb->db);
jsonrpc_session_send(session, request);
VLOG_DBG("Send monitor requests");
state = RPL_S_MONITOR_REQUESTED;
Expand Down Expand Up @@ -509,7 +543,7 @@ replication_destroy(void)
shash_destroy(&local_dbs);
}

static struct ovsdb *
static struct replication_db *
find_db(const char *db_name)
{
return shash_find_data(replication_dbs, db_name);
Expand Down Expand Up @@ -541,11 +575,10 @@ reset_database(struct ovsdb *db)
* Caller is responsible for disposing 'request'.
*/
static struct jsonrpc_msg *
create_monitor_request(struct ovsdb *db)
create_monitor_request(struct ovsdb_schema *schema)
{
struct jsonrpc_msg *request;
struct json *monitor;
struct ovsdb_schema *schema = db->schema;
const char *db_name = schema->name;

struct json *monitor_request = json_object_create();
Expand Down Expand Up @@ -779,14 +812,18 @@ request_ids_clear(void)
}

static struct shash *
replication_db_clone(struct shash *dbs)
replication_dbs_create(void)
{
struct shash *new = xmalloc(sizeof *new);
shash_init(new);

struct shash_node *node;
SHASH_FOR_EACH (node, dbs) {
shash_add(new, node->name, node->data);
SHASH_FOR_EACH (node, &local_dbs) {
struct replication_db *repl_db = xmalloc(sizeof *repl_db);
repl_db->db = node->data;
repl_db->schema_version_higher = false;
repl_db->active_db_schema = NULL;
shash_add(new, node->name, repl_db);
}

return new;
Expand All @@ -795,7 +832,24 @@ replication_db_clone(struct shash *dbs)
static void
replication_dbs_destroy(void)
{
shash_destroy(replication_dbs);
if (!replication_dbs) {
return;
}

struct shash_node *node, *next;

SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
hmap_remove(&replication_dbs->map, &node->node);
struct replication_db *rdb = node->data;
if (rdb->active_db_schema) {
ovsdb_schema_destroy(rdb->active_db_schema);
}
free(rdb);
free(node->name);
free(node);
}

hmap_destroy(&replication_dbs->map);
free(replication_dbs);
replication_dbs = NULL;
}
Expand Down Expand Up @@ -877,6 +931,54 @@ replication_status(void)
return ds_steal_cstr(&ds);
}

/* Checks if it's possible to replicate to the local db from the active db
* schema. Returns true, if 'local_db_schema' has all the tables and columns
* of 'active_db_schema', false otherwise.
*/
static bool
is_replication_possible(struct ovsdb_schema *local_db_schema,
struct ovsdb_schema *active_db_schema)
{
struct shash_node *node;
SHASH_FOR_EACH (node, &active_db_schema->tables) {
struct ovsdb_table_schema *ldb_table_schema =
shash_find_data(&local_db_schema->tables, node->name);
if (!ldb_table_schema) {
VLOG_INFO("Table %s not present in the local db schema",
node->name);
return false;
}

/* Local schema table should have all the columns
* of active schema table. */
struct ovsdb_table_schema *adb_table_schema = node->data;
struct shash_node *n;
SHASH_FOR_EACH (n, &adb_table_schema->columns) {
struct ovsdb_column *ldb_col =
shash_find_data(&ldb_table_schema->columns, n->name);
if (!ldb_col) {
VLOG_INFO("Column - %s not present in the local "
"db schema table - %s", n->name, node->name);
return false;
}

struct json *ldb_col_json = ovsdb_column_to_json(ldb_col);
struct json *adb_col_json = ovsdb_column_to_json(n->data);
bool cols_equal = json_equal(ldb_col_json, adb_col_json);
json_destroy(ldb_col_json);
json_destroy(adb_col_json);

if (!cols_equal) {
VLOG_INFO("Column - %s mismatch in the local "
"db schema table - %s", n->name, node->name);
return false;
}
}
}

return true;
}

void
replication_usage(void)
{
Expand Down
23 changes: 23 additions & 0 deletions tests/ovsdb-replication.at
Expand Up @@ -19,6 +19,29 @@ replication_schema () {
}
EOF
}
replication_schema_v2 () {
cat <<'EOF'
{"name": "mydb",
"tables": {
"a": {
"columns": {
"number": {"type": "integer"},
"name": {"type": "string"}},
"indexes": [["number"]]},
"b": {
"columns": {
"number": {"type": "integer"},
"name": {"type": "string"},
"foo" : {"type": "string"}},
"indexes": [["number"]]},
"c": {
"columns": {
"number": {"type": "integer"},
"name": {"type": "string"}},
"indexes": [["number"]]}}
}
EOF
}
]
m4_divert_pop([PREPARE_TESTS])

Expand Down

0 comments on commit 7ce27c9

Please sign in to comment.