diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index 9892cbeefb4..4de370c047d 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -137,7 +137,7 @@ ovsdb_replication_init(struct shash *all_dbs) struct shash_node *node; SHASH_FOR_EACH (node, all_dbs) { struct db *db = node->data; - replication_add_db(db->db->schema->name, db->db); + replication_add_local_db(db->db->schema->name, db->db); } } @@ -188,7 +188,11 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs, ovsdb_jsonrpc_server_run(jsonrpc); if (is_backup_server) { - replication_run(); + replication_run(); + if (!replication_is_alive()) { + int retval = replication_get_last_error(); + ovs_fatal(retval, "replication connection failed"); + } } SHASH_FOR_EACH(node, all_dbs) { @@ -212,6 +216,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs, if (is_backup_server) { replication_wait(); } + ovsdb_jsonrpc_server_wait(jsonrpc); unixctl_server_wait(unixctl); SHASH_FOR_EACH(node, all_dbs) { @@ -231,7 +236,6 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs, } } - disconnect_active_server(); free(remotes_error); } @@ -1346,6 +1350,9 @@ ovsdb_server_add_database(struct unixctl_conn *conn, int argc OVS_UNUSED, error = open_db(config, filename); if (!error) { save_config(config); + if (is_backup_server) { + ovsdb_replication_init(config->all_dbs); + } unixctl_command_reply(conn, NULL); } else { unixctl_command_reply_error(conn, error); @@ -1376,6 +1383,9 @@ ovsdb_server_remove_database(struct unixctl_conn *conn, int argc OVS_UNUSED, shash_delete(config->all_dbs, node); save_config(config); + if (is_backup_server) { + ovsdb_replication_init(config->all_dbs); + } unixctl_command_reply(conn, NULL); } diff --git a/ovsdb/replication.c b/ovsdb/replication.c index b4aab50948a..22455667dc9 100644 --- a/ovsdb/replication.c +++ b/ovsdb/replication.c @@ -1,6 +1,6 @@ /* * (c) Copyright 2016 Hewlett Packard Enterprise Development LP - * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc. + * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,27 +38,16 @@ VLOG_DEFINE_THIS_MODULE(replication); static char *active_ovsdb_server; -static struct jsonrpc *rpc; -static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables); -static bool reset_dbs = true; - -static struct jsonrpc *open_jsonrpc(const char *server); -static struct ovsdb_error *check_jsonrpc_error(int error, - struct jsonrpc_msg **reply_); -static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs); -static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc, - const char *database); - -static void send_monitor_requests(void); +static struct jsonrpc_session *session = NULL; +static unsigned int session_seqno = UINT_MAX; + +static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db); static void add_monitored_table(struct ovsdb_table_schema *table, struct json *monitor_requests); -static void get_initial_db_state(struct ovsdb *db); -static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn); -static struct ovsdb_error *reset_databases(void); +static struct ovsdb_error *reset_database(struct ovsdb *db); -static void check_for_notifications(void); -static void process_notification(struct json *table_updates, struct ovsdb *db); +static struct ovsdb_error *process_notification(struct json *, struct ovsdb *); static struct ovsdb_error *process_table_update(struct json *table_update, const char *table_name, struct ovsdb *database, @@ -97,11 +86,23 @@ bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db); static void request_ids_destroy(void); void request_ids_clear(void); +enum ovsdb_replication_state { + RPL_S_DB_REQUESTED, + RPL_S_SCHEMA_REQUESTED, + RPL_S_MONITOR_REQUESTED, + RPL_S_REPLICATING, + RPL_S_ERR /* Error, no longer replicating. */ +}; +static enum ovsdb_replication_state state; + + +/* All DBs known to ovsdb-server. The actual replication dbs are stored + * in 'replication dbs', which is a subset of all dbs and remote dbs whose + * schema matches. */ +static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs); +static struct shash *replication_dbs = NULL; -/* Currently replicating DBs. - * replication_dbs is an shash of 'struct ovsdb *'s that stores the - * replicating dbs. */ -static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs); +static struct shash *replication_db_clone(struct shash *dbs); /* Find 'struct ovsdb' by name within 'replication_dbs' */ static struct ovsdb* find_db(const char *db_name); @@ -109,56 +110,211 @@ static struct ovsdb* find_db(const char *db_name); void replication_init(void) { - shash_clear(&replication_dbs); - if (rpc) { - disconnect_active_server(); + shash_destroy(replication_dbs); + replication_dbs = NULL; + + shash_clear(&local_dbs); + if (session) { + jsonrpc_session_close(session); } - reset_dbs = true; + + session = jsonrpc_session_open(active_ovsdb_server, true); + session_seqno = UINT_MAX; } void -replication_add_db(const char *database, struct ovsdb *db) +replication_add_local_db(const char *database, struct ovsdb *db) { struct shash_node *node = xmalloc(sizeof *node); - shash_add_assert(&replication_dbs, database, db); + shash_add_assert(&local_dbs, database, db); } void replication_run(void) { - if (sset_is_empty(&monitored_tables) && active_ovsdb_server) { - /* Reset local databases. */ - if (reset_dbs) { - struct ovsdb_error *error = reset_databases(); - if (error) { - /* In case reset DB fails, log the error before exiting. */ - char *msg = ovsdb_error_to_string(error); - ovsdb_error_destroy(error); - VLOG_FATAL("Failed to reset DB (%s).", msg); - } - reset_dbs = false; + if (!session) { + return; + } + + jsonrpc_session_run(session); + + for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) { + struct jsonrpc_msg *msg; + unsigned int seqno; + + seqno = jsonrpc_session_get_seqno(session); + if (seqno != session_seqno) { + session_seqno = seqno; + request_ids_clear(); + struct jsonrpc_msg *request; + request = jsonrpc_create_request("list_dbs", + json_array_create_empty(), NULL); + request_ids_add(request->id, NULL); + jsonrpc_session_send(session, request); + + shash_destroy(replication_dbs); + replication_dbs = replication_db_clone(&local_dbs); + + state = RPL_S_DB_REQUESTED; } - /* Open JSON-RPC. */ - jsonrpc_close(rpc); - rpc = open_jsonrpc(active_ovsdb_server); - if (!rpc) { - return; + msg = jsonrpc_session_recv(session); + if (!msg) { + continue; } - /* Send monitor requests. */ - send_monitor_requests(); - } - if (!sset_is_empty(&monitored_tables)) { - check_for_notifications(); + if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR + && !strcmp(msg->method, "update")) { + if (msg->params->type == JSON_ARRAY + && msg->params->u.array.n == 2 + && msg->params->u.array.elems[0]->type == JSON_STRING) { + char *db_name = msg->params->u.array.elems[0]->u.string; + struct ovsdb *db = find_db(db_name); + if (db) { + struct ovsdb_error *error; + error = process_notification(msg->params->u.array.elems[1], + db); + if (error) { + ovsdb_error_assert(error); + state = RPL_S_ERR; + } + } + } + } else if (msg->type == JSONRPC_REPLY) { + struct ovsdb *db; + if (!request_ids_lookup_and_free(msg->id, &db)) { + VLOG_WARN("received unexpected reply"); + goto next; + } + + switch (state) { + case RPL_S_DB_REQUESTED: + if (msg->result->type != JSON_ARRAY) { + struct ovsdb_error *error; + error = ovsdb_error("list-dbs failed", + "list_dbs response is not array"); + ovsdb_error_assert(error); + state = RPL_S_ERR; + } else { + size_t i; + for (i = 0; i < msg->result->u.array.n; i++) { + const struct json *name = msg->result->u.array.elems[i]; + if (name->type == JSON_STRING) { + /* Send one schema request for each remote DB. */ + const char *db_name = json_string(name); + struct ovsdb *db = find_db(db_name); + if (db) { + struct jsonrpc_msg *request = + jsonrpc_create_request( + "get_schema", + json_array_create_1( + json_string_create(db_name)), + NULL); + + request_ids_add(request->id, db); + jsonrpc_session_send(session, request); + } + } + } + state = RPL_S_SCHEMA_REQUESTED; + } + break; + + case RPL_S_SCHEMA_REQUESTED: { + struct ovsdb_schema *schema; + struct ovsdb_error *error; + + error = ovsdb_schema_from_json(msg->result, &schema); + if (error) { + ovsdb_error_assert(error); + state = RPL_S_ERR; + } + + if (db != find_db(schema->name)) { + /* Unexpected schema. */ + VLOG_WARN("unexpected schema %s", schema->name); + state = RPL_S_ERR; + } else if (!ovsdb_schema_equal(schema, db->schema)) { + /* Schmea version mismatch. */ + VLOG_INFO("Schema version mismatch, %s not replicated", + schema->name); + shash_find_and_delete(replication_dbs, schema->name); + } + ovsdb_schema_destroy(schema); + + /* After receiving schemas, reset the local databases that + * will be monitored and send out monitor requests for them. */ + if (hmap_is_empty(&request_ids)) { + struct shash_node *node, *next; + + SHASH_FOR_EACH_SAFE (node, next, replication_dbs) { + db = node->data; + struct ovsdb_error *error = reset_database(db); + if (error) { + const char *db_name = db->schema->name; + shash_find_and_delete(replication_dbs, db_name); + ovsdb_error_assert(error); + VLOG_WARN("Failed to reset database, " + "%s not replicated.", db_name); + } + } + + if (shash_is_empty(replication_dbs)) { + VLOG_WARN("Nothing to replicate."); + state = RPL_S_ERR; + } else { + SHASH_FOR_EACH (node, replication_dbs) { + db = node->data; + struct ovsdb *db = node->data; + struct jsonrpc_msg *request = + create_monitor_request(db); + + request_ids_add(request->id, db); + jsonrpc_session_send(session, request); + state = RPL_S_MONITOR_REQUESTED; + } + } + } + break; + } + + case RPL_S_MONITOR_REQUESTED: { + /* Reply to monitor requests. */ + struct ovsdb_error *error; + error = process_notification(msg->result, db); + if (error) { + ovsdb_error_assert(error); + state = RPL_S_ERR; + } else { + /* Transition to replicating state after receiving + * all replies of "monitor" requests. */ + if (hmap_is_empty(&request_ids)) { + state = RPL_S_REPLICATING; + } + } + break; + } + + case RPL_S_ERR: + /* Ignore all messages */ + break; + + case RPL_S_REPLICATING: + default: + OVS_NOT_REACHED(); + } + } + next: + jsonrpc_msg_destroy(msg); } } void replication_wait(void) { - if (rpc) { - jsonrpc_wait(rpc); + if (session) { + jsonrpc_session_wait(session); + jsonrpc_session_recv_wait(session); } } @@ -291,17 +447,13 @@ blacklist_tables_find(const char *database, const char *table) void disconnect_active_server(void) { - jsonrpc_close(rpc); - rpc = NULL; - sset_clear(&monitored_tables); - shash_clear(&replication_dbs); + jsonrpc_session_close(session); + session = NULL; } void replication_destroy(void) { - disconnect_active_server(); - sset_destroy(&monitored_tables); blacklist_tables_clear(); shash_destroy(&blacklist_tables); @@ -311,34 +463,22 @@ replication_destroy(void) } request_ids_destroy(); - shash_destroy(&replication_dbs); + shash_destroy(replication_dbs); + replication_dbs = NULL; + + shash_destroy(&local_dbs); } static struct ovsdb * find_db(const char *db_name) { - return shash_find_data(&replication_dbs, db_name); + return shash_find_data(replication_dbs, db_name); } static struct ovsdb_error * -reset_databases(void) -{ - struct shash_node *db_node; - struct ovsdb_error *error = NULL; - - SHASH_FOR_EACH (db_node, &replication_dbs) { - struct ovsdb *db = db_node->data; - struct ovsdb_txn *txn = ovsdb_txn_create(db); - reset_database(db, txn); - error = ovsdb_txn_commit(txn, false); - } - - return error; -} - -static void -reset_database(struct ovsdb *db, struct ovsdb_txn *txn) +reset_database(struct ovsdb *db) { + struct ovsdb_txn *txn = ovsdb_txn_create(db); struct shash_node *table_node; SHASH_FOR_EACH (table_node, &db->tables) { @@ -351,169 +491,45 @@ reset_database(struct ovsdb *db, struct ovsdb_txn *txn) } } } -} - -static struct jsonrpc * -open_jsonrpc(const char *server) -{ - struct stream *stream; - int error; - - error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT); - - return error ? NULL : jsonrpc_open(stream); -} - -static struct ovsdb_error * -check_jsonrpc_error(int error, struct jsonrpc_msg **reply_) -{ - struct jsonrpc_msg *reply = *reply_; - - if (error) { - return ovsdb_error("transaction failed", - "transaction returned error %d", - error); - } - - if (reply->error) { - return ovsdb_error("transaction failed", - "transaction returned error: %s", - json_to_string(reply->error, 0)); - } - return NULL; -} - -static void -fetch_dbs(struct jsonrpc *rpc, struct svec *dbs) -{ - struct jsonrpc_msg *request, *reply; - struct ovsdb_error *error; - size_t i; - - request = jsonrpc_create_request("list_dbs", json_array_create_empty(), - NULL); - - error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply), - &reply); - if (error) { - ovsdb_error_assert(error); - return; - } - - if (reply->result->type != JSON_ARRAY) { - ovsdb_error_assert(ovsdb_error("list-dbs failed", - "list_dbs response is not array")); - return; - } - - for (i = 0; i < reply->result->u.array.n; i++) { - const struct json *name = reply->result->u.array.elems[i]; - - if (name->type != JSON_STRING) { - ovsdb_error_assert(ovsdb_error( - "list_dbs failed", - "list_dbs response %"PRIuSIZE" is not string", - i)); - } - svec_add(dbs, name->u.string); - } - jsonrpc_msg_destroy(reply); - svec_sort(dbs); -} - -static struct ovsdb_schema * -fetch_schema(struct jsonrpc *rpc, const char *database) -{ - struct jsonrpc_msg *request, *reply; - struct ovsdb_schema *schema; - struct ovsdb_error *error; - - request = jsonrpc_create_request("get_schema", - json_array_create_1( - json_string_create(database)), - NULL); - error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply), - &reply); - if (error) { - jsonrpc_msg_destroy(reply); - ovsdb_error_assert(error); - return NULL; - } - - error = ovsdb_schema_from_json(reply->result, &schema); - if (error) { - jsonrpc_msg_destroy(reply); - ovsdb_error_assert(error); - return NULL; - } - jsonrpc_msg_destroy(reply); - return schema; + return ovsdb_txn_commit(txn, false); } -static void -send_monitor_requests(void) +/* Create a monitor request for 'db'. The monitor request will include + * any tables from 'blacklisted_tables' + * + * Caller is responsible for disposing 'request'. + */ +static struct jsonrpc_msg * +create_monitor_request(struct ovsdb *db) { - const char *db_name; - struct svec dbs; - size_t i; - - svec_init(&dbs); - fetch_dbs(rpc, &dbs); - SVEC_FOR_EACH (i, db_name, &dbs) { - struct ovsdb *db = find_db(db_name); + struct jsonrpc_msg *request; + struct json *monitor; + struct ovsdb_schema *schema = db->schema; + const char *db_name = schema->name; - if (db) { - struct ovsdb_schema *local_schema, *remote_schema; + struct json *monitor_request = json_object_create(); + size_t n = shash_count(&schema->tables); + const struct shash_node **nodes = shash_sort(&schema->tables); - local_schema = db->schema; - remote_schema = fetch_schema(rpc, db_name); - if (ovsdb_schema_equal(local_schema, remote_schema)) { - struct jsonrpc_msg *request; - struct json *monitor, *monitor_request; + for (int j = 0; j < n; j++) { + struct ovsdb_table_schema *table = nodes[j]->data; - monitor_request = json_object_create(); - size_t n = shash_count(&local_schema->tables); - const struct shash_node **nodes = shash_sort( - &local_schema->tables); - - for (int j = 0; j < n; j++) { - struct ovsdb_table_schema *table = nodes[j]->data; - - /* Monitor all tables not blacklisted. */ - if (!blacklist_tables_find(db_name, table->name)) { - add_monitored_table(table, monitor_request); - } - } - free(nodes); - - /* Send monitor request. */ - monitor = json_array_create_3( - json_string_create(db_name), - json_string_create(db_name), - monitor_request); - request = jsonrpc_create_request("monitor", monitor, NULL); - jsonrpc_send(rpc, request); - get_initial_db_state(db); - } - ovsdb_schema_destroy(remote_schema); + /* Monitor all tables not blacklisted. */ + if (!blacklist_tables_find(db_name, table->name)) { + add_monitored_table(table, monitor_request); } } - svec_destroy(&dbs); -} - -static void -get_initial_db_state(struct ovsdb *db) -{ - struct jsonrpc_msg *msg; - - jsonrpc_recv_block(rpc, &msg); + free(nodes); - if (msg->type == JSONRPC_REPLY) { - process_notification(msg->result, db); - } + /* Create a monitor request. */ + monitor = json_array_create_3( + json_string_create(db_name), + json_string_create(db_name), + monitor_request); + request = jsonrpc_create_request("monitor", monitor, NULL); - jsonrpc_msg_destroy(msg); + return request; } static void @@ -522,91 +538,44 @@ add_monitored_table(struct ovsdb_table_schema *table, { struct json *monitor_request_array; - sset_add(&monitored_tables, table->name); - monitor_request_array = json_array_create_empty(); json_array_add(monitor_request_array, json_object_create()); json_object_put(monitor_request, table->name, monitor_request_array); } - -static void -check_for_notifications(void) -{ - struct jsonrpc_msg *msg; - int error; - - error = jsonrpc_recv(rpc, &msg); - if (error == EAGAIN) { - return; - } else if (error) { - jsonrpc_close(rpc); - rpc = open_jsonrpc(active_ovsdb_server); - if (!rpc) { - /* Active server went down. */ - disconnect_active_server(); - } - jsonrpc_msg_destroy(msg); - return; - } - if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { - jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params), - msg->id)); - } else if (msg->type == JSONRPC_NOTIFY - && !strcmp(msg->method, "update")) { - struct json *params = msg->params; - if (params->type == JSON_ARRAY - && params->u.array.n == 2) { - char *db_name = params->u.array.elems[0]->u.string; - struct ovsdb *db = find_db(db_name); - if (db) { - process_notification(params->u.array.elems[1], db); - } - } - } - jsonrpc_msg_destroy(msg); - jsonrpc_run(rpc); -} -static void + +static struct ovsdb_error * process_notification(struct json *table_updates, struct ovsdb *db) { - struct ovsdb_error *error; + struct ovsdb_error *error = NULL; struct ovsdb_txn *txn; - if (table_updates->type != JSON_OBJECT) { - sset_clear(&monitored_tables); - return; - } - - txn = ovsdb_txn_create(db); - error = NULL; - - /* Process each table update. */ - struct shash_node *node; - SHASH_FOR_EACH (node, json_object(table_updates)) { - struct json *table_update = node->data; - if (table_update) { - error = process_table_update(table_update, node->name, db, txn); - if (error) { - break; + if (table_updates->type == JSON_OBJECT) { + txn = ovsdb_txn_create(db); + + /* Process each table update. */ + struct shash_node *node; + SHASH_FOR_EACH (node, json_object(table_updates)) { + struct json *table_update = node->data; + if (table_update) { + error = process_table_update(table_update, node->name, db, txn); + if (error) { + break; + } } } - } - if (error) { - ovsdb_txn_abort(txn); - goto error; + if (error) { + ovsdb_txn_abort(txn); + return error; + } else { + /* Commit transaction. */ + error = ovsdb_txn_commit(txn, false); + } } - /* Commit transaction. */ - error = ovsdb_txn_commit(txn, false); - -error: - if (error) { - ovsdb_error_assert(error); - disconnect_active_server(); - } + return error; } static struct ovsdb_error * @@ -839,6 +808,53 @@ request_ids_clear(void) hmap_init(&request_ids); } +static struct shash * +replication_db_clone(struct shash *dbs) +{ + struct shash *new = xmalloc(sizeof *new); + shash_init(new); + + struct shash_node *node; + SHASH_FOR_EACH (node, dbs) { + shash_add(new, node->name, node->data); + } + + return new; +} + +/* Return true if replication just started or is ongoing. + * Return false if the connection failed, or the replication + * was not able to start. */ +bool +replication_is_alive(void) +{ + if (session) { + return jsonrpc_session_is_alive(session) && state != RPL_S_ERR; + } + return false; +} + +/* Return the last error reported on a connection by 'session'. The + * return value is 0 if replication is not currently running, or + * if replication session has not encountered any error. + * + * Return a negative value if replication session has error, or the + * replication was not able to start. */ +int +replication_get_last_error(void) +{ + int err = 0; + + if (session) { + err = jsonrpc_session_get_last_error(session); + if (!err) { + err = (state == RPL_S_ERR) ? ENOENT : 0; + } + } + + return err; +} + void replication_usage(void) { diff --git a/ovsdb/replication.h b/ovsdb/replication.h index cffeebf09d8..c87364875e7 100644 --- a/ovsdb/replication.h +++ b/ovsdb/replication.h @@ -26,7 +26,9 @@ void replication_run(void); void replication_wait(void); void replication_destroy(void); void replication_usage(void); -void replication_add_db(const char *databse, struct ovsdb *db); +void replication_add_local_db(const char *databse, struct ovsdb *db); +bool replication_is_alive(void); +int replication_get_last_error(void); void set_active_ovsdb_server(const char *remote_server); const char *get_active_ovsdb_server(void);