Skip to content

Commit

Permalink
ovsdb: (Re)hide struct db in ovsdb-server.c
Browse files Browse the repository at this point in the history
It seems odd that the currently replication implementation moves the
struct db from ovsdb-server.c (file private) to replication.h (global).

This patch moves the 'struct db' defintion back into ovsdb-server.c,

Signed-off-by: Andy Zhou <azhou@ovn.org>
  • Loading branch information
azhou-nicira committed Sep 4, 2016
1 parent 39e7e4e commit f72fc2e
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 70 deletions.
65 changes: 51 additions & 14 deletions ovsdb/ovsdb-server.c
Expand Up @@ -60,15 +60,23 @@

VLOG_DEFINE_THIS_MODULE(ovsdb_server);

struct db;
struct db {
/* Initialized in main(). */
char *filename;
struct ovsdb_file *file;
struct ovsdb *db;

/* Only used by update_remote_status(). */
struct ovsdb_txn *txn;
};

/* SSL configuration. */
static char *private_key_file;
static char *certificate_file;
static char *ca_cert_file;
static bool bootstrap_ca_cert;

/* Replication configuration. */
/* Replication current state. */
static bool is_backup_server;

static unixctl_cb_func ovsdb_server_exit;
Expand Down Expand Up @@ -122,6 +130,17 @@ static void save_config(struct server_config *);
static void load_config(FILE *config_file, struct sset *remotes,
struct sset *db_filenames);

static void
ovsdb_replication_init(struct shash *all_dbs)
{
replication_init();
struct shash_node *node;
SHASH_FOR_EACH (node, all_dbs) {
struct db *db = node->data;
replication_add_db(db->db->schema->name, db->db);
}
}

static void
main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
struct unixctl_server *unixctl, struct sset *remotes,
Expand Down Expand Up @@ -169,7 +188,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
ovsdb_jsonrpc_server_run(jsonrpc);

if (is_backup_server) {
replication_run(all_dbs);
replication_run();
}

SHASH_FOR_EACH(node, all_dbs) {
Expand Down Expand Up @@ -240,7 +259,6 @@ main(int argc, char *argv[])
service_start(&argc, &argv);
fatal_ignore_sigpipe();
process_init();
replication_init();

parse_options(&argc, &argv, &remotes, &unixctl_path, &run_command);
daemon_become_new_user(false);
Expand Down Expand Up @@ -358,13 +376,13 @@ main(int argc, char *argv[])
ovsdb_server_get_active_ovsdb_server, NULL);
unixctl_command_register("ovsdb-server/connect-active-ovsdb-server", "",
0, 0, ovsdb_server_connect_active_ovsdb_server,
NULL);
&all_dbs);
unixctl_command_register("ovsdb-server/disconnect-active-ovsdb-server", "",
0, 0, ovsdb_server_disconnect_active_ovsdb_server,
NULL);
unixctl_command_register("ovsdb-server/set-sync-excluded-tables", "",
0, 1, ovsdb_server_set_sync_excluded_tables,
NULL);
&all_dbs);
unixctl_command_register("ovsdb-server/get-sync-excluded-tables", "",
0, 0, ovsdb_server_get_sync_excluded_tables,
NULL);
Expand All @@ -374,6 +392,10 @@ main(int argc, char *argv[])
unixctl_command_register("ovsdb-server/disable-monitor-cond", "", 0, 0,
ovsdb_server_disable_monitor_cond, jsonrpc);

if (is_backup_server) {
ovsdb_replication_init(&all_dbs);
}

main_loop(jsonrpc, &all_dbs, unixctl, &remotes, run_process, &exiting);

ovsdb_jsonrpc_server_destroy(jsonrpc);
Expand Down Expand Up @@ -470,6 +492,21 @@ open_db(struct server_config *config, const char *filename)
return error;
}

static const struct db *
find_db(const struct shash *all_dbs, const char *db_name)
{
struct shash_node *node;

SHASH_FOR_EACH (node, all_dbs) {
struct db *db = node->data;
if (!strcmp(db->db->schema->name, db_name)) {
return db;
}
}

return NULL;
}

static char * OVS_WARN_UNUSED_RESULT
parse_db_column__(const struct shash *all_dbs,
const char *name_, char *name,
Expand Down Expand Up @@ -1059,8 +1096,6 @@ ovsdb_server_set_active_ovsdb_server(struct unixctl_conn *conn,
void *arg_ OVS_UNUSED)
{
set_active_ovsdb_server(argv[1]);
is_backup_server = true;
VLOG_INFO("become a backup server");
unixctl_command_reply(conn, NULL);
}

Expand All @@ -1077,10 +1112,12 @@ static void
ovsdb_server_connect_active_ovsdb_server(struct unixctl_conn *conn,
int argc OVS_UNUSED,
const char *argv[] OVS_UNUSED,
void *arg_ OVS_UNUSED)
void *all_dbs_)
{
if (is_backup_server) {
replication_init();
struct shash *all_dbs = all_dbs_;

if (!is_backup_server) {
ovsdb_replication_init(all_dbs);
}
is_backup_server = true;
unixctl_command_reply(conn, NULL);
Expand All @@ -1094,21 +1131,21 @@ ovsdb_server_disconnect_active_ovsdb_server(struct unixctl_conn *conn,
{
disconnect_active_server();
is_backup_server = false;
VLOG_INFO("become an active server");
unixctl_command_reply(conn, NULL);
}

static void
ovsdb_server_set_sync_excluded_tables(struct unixctl_conn *conn,
int argc OVS_UNUSED,
const char *argv[],
void *arg_ OVS_UNUSED)
void *all_dbs_)
{
struct shash *all_dbs = all_dbs_;
char *err = set_blacklist_tables(argv[1], true);

if (!err) {
if (is_backup_server) {
replication_init();
ovsdb_replication_init(all_dbs);
}
err = set_blacklist_tables(argv[1], false);
}
Expand Down
91 changes: 50 additions & 41 deletions ovsdb/replication.c
Expand Up @@ -49,17 +49,16 @@ static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
const char *database);

static void send_monitor_requests(struct shash *all_dbs);
static void send_monitor_requests(void);
static void add_monitored_table(struct ovsdb_table_schema *table,
struct json *monitor_requests);

static void get_initial_db_state(const struct db *database);
static void get_initial_db_state(struct ovsdb *db);
static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
static struct ovsdb_error *reset_databases(struct shash *all_dbs);
static struct ovsdb_error *reset_databases(void);

static void check_for_notifications(struct shash *all_dbs);
static void process_notification(struct json *table_updates,
struct ovsdb *database);
static void check_for_notifications(void);
static void process_notification(struct json *table_updates, struct ovsdb *db);
static struct ovsdb_error *process_table_update(struct json *table_update,
const char *table_name,
struct ovsdb *database,
Expand Down Expand Up @@ -99,22 +98,38 @@ static void request_ids_destroy(void);
void request_ids_clear(void);


/* Currently replicating DBs.
* replication_dbs is an shash of 'struct ovsdb *'s that stores the
* replicating dbs. */
static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
static struct ovsdb* find_db(const char *db_name);


void
replication_init(void)
{
shash_clear(&replication_dbs);
if (rpc) {
disconnect_active_server();
}
reset_dbs = true;
}

void
replication_run(struct shash *all_dbs)
replication_add_db(const char *database, struct ovsdb *db)
{
struct shash_node *node = xmalloc(sizeof *node);
shash_add_assert(&replication_dbs, database, db);
}

void
replication_run(void)
{
if (sset_is_empty(&monitored_tables) && active_ovsdb_server) {
/* Reset local databases. */
if (reset_dbs) {
struct ovsdb_error *error = reset_databases(all_dbs);
struct ovsdb_error *error = reset_databases();
if (error) {
/* In case reset DB fails, log the error before exiting. */
char *msg = ovsdb_error_to_string(error);
Expand All @@ -132,10 +147,10 @@ replication_run(struct shash *all_dbs)
}

/* Send monitor requests. */
send_monitor_requests(all_dbs);
send_monitor_requests();
}
if (!sset_is_empty(&monitored_tables)) {
check_for_notifications(all_dbs);
check_for_notifications();
}
}

Expand Down Expand Up @@ -176,6 +191,7 @@ set_blacklist_tables(const char *blacklist, bool dryrun)
const char *longname;

if (!dryrun) {
/* Can only add to an empty shash. */
blacklist_tables_clear();
}

Expand Down Expand Up @@ -278,6 +294,7 @@ disconnect_active_server(void)
jsonrpc_close(rpc);
rpc = NULL;
sset_clear(&monitored_tables);
shash_clear(&replication_dbs);
}

void
Expand All @@ -294,33 +311,25 @@ replication_destroy(void)
}

request_ids_destroy();
shash_destroy(&replication_dbs);
}

const struct db *
find_db(const struct shash *all_dbs, const char *db_name)
static struct ovsdb *
find_db(const char *db_name)
{
struct shash_node *node;

SHASH_FOR_EACH (node, all_dbs) {
struct db *db = node->data;
if (!strcmp(db->db->schema->name, db_name)) {
return db;
}
}

return NULL;
return shash_find_data(&replication_dbs, db_name);
}

static struct ovsdb_error *
reset_databases(struct shash *all_dbs)
reset_databases(void)
{
struct shash_node *db_node;
struct ovsdb_error *error = NULL;

SHASH_FOR_EACH (db_node, all_dbs) {
struct db *db = db_node->data;
struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
reset_database(db->db, txn);
SHASH_FOR_EACH (db_node, &replication_dbs) {
struct ovsdb *db = db_node->data;
struct ovsdb_txn *txn = ovsdb_txn_create(db);
reset_database(db, txn);
error = ovsdb_txn_commit(txn, false);
}

Expand Down Expand Up @@ -443,7 +452,7 @@ fetch_schema(struct jsonrpc *rpc, const char *database)
}

static void
send_monitor_requests(struct shash *all_dbs)
send_monitor_requests(void)
{
const char *db_name;
struct svec dbs;
Expand All @@ -452,12 +461,12 @@ send_monitor_requests(struct shash *all_dbs)
svec_init(&dbs);
fetch_dbs(rpc, &dbs);
SVEC_FOR_EACH (i, db_name, &dbs) {
const struct db *database = find_db(all_dbs, db_name);
struct ovsdb *db = find_db(db_name);

if (database) {
if (db) {
struct ovsdb_schema *local_schema, *remote_schema;

local_schema = database->db->schema;
local_schema = db->schema;
remote_schema = fetch_schema(rpc, db_name);
if (ovsdb_schema_equal(local_schema, remote_schema)) {
struct jsonrpc_msg *request;
Expand Down Expand Up @@ -485,7 +494,7 @@ send_monitor_requests(struct shash *all_dbs)
monitor_request);
request = jsonrpc_create_request("monitor", monitor, NULL);
jsonrpc_send(rpc, request);
get_initial_db_state(database);
get_initial_db_state(db);
}
ovsdb_schema_destroy(remote_schema);
}
Expand All @@ -494,14 +503,14 @@ send_monitor_requests(struct shash *all_dbs)
}

static void
get_initial_db_state(const struct db *database)
get_initial_db_state(struct ovsdb *db)
{
struct jsonrpc_msg *msg;

jsonrpc_recv_block(rpc, &msg);

if (msg->type == JSONRPC_REPLY) {
process_notification(msg->result, database->db);
process_notification(msg->result, db);
}

jsonrpc_msg_destroy(msg);
Expand All @@ -522,7 +531,7 @@ add_monitored_table(struct ovsdb_table_schema *table,
}

static void
check_for_notifications(struct shash *all_dbs)
check_for_notifications(void)
{
struct jsonrpc_msg *msg;
int error;
Expand All @@ -549,9 +558,9 @@ check_for_notifications(struct shash *all_dbs)
if (params->type == JSON_ARRAY
&& params->u.array.n == 2) {
char *db_name = params->u.array.elems[0]->u.string;
const struct db *database = find_db(all_dbs, db_name);
if (database) {
process_notification(params->u.array.elems[1], database->db);
struct ovsdb *db = find_db(db_name);
if (db) {
process_notification(params->u.array.elems[1], db);
}
}
}
Expand All @@ -560,7 +569,7 @@ check_for_notifications(struct shash *all_dbs)
}

static void
process_notification(struct json *table_updates, struct ovsdb *database)
process_notification(struct json *table_updates, struct ovsdb *db)
{
struct ovsdb_error *error;
struct ovsdb_txn *txn;
Expand All @@ -570,15 +579,15 @@ process_notification(struct json *table_updates, struct ovsdb *database)
return;
}

txn = ovsdb_txn_create(database);
txn = ovsdb_txn_create(db);
error = NULL;

/* Process each table update. */
struct shash_node *node;
SHASH_FOR_EACH (node, json_object(table_updates)) {
struct json *table_update = node->data;
if (table_update) {
error = process_table_update(table_update, node->name, database, txn);
error = process_table_update(table_update, node->name, db, txn);
if (error) {
break;
}
Expand Down

0 comments on commit f72fc2e

Please sign in to comment.