Skip to content

Commit

Permalink
Monitor Database table to manage lifecycle of IDL client.
Browse files Browse the repository at this point in the history
The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434f, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6.

The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.
Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com>
Signed-off-by: 0-day Robot <robot@bytheb.org>
  • Loading branch information
ted-nutanix authored and ovsrobot committed Jan 4, 2019
1 parent 46df7fa commit d9fb0e5
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 45 deletions.
217 changes: 201 additions & 16 deletions python/ovs/db/idl.py
Expand Up @@ -38,6 +38,7 @@
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1

CLUSTERED = "clustered"

class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
Expand Down Expand Up @@ -92,10 +93,12 @@ class Idl(object):
"""

IDL_S_INITIAL = 0
IDL_S_MONITOR_REQUESTED = 1
IDL_S_MONITOR_COND_REQUESTED = 2
IDL_S_SERVER_SCHEMA_REQUESTED = 1
IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4

def __init__(self, remote, schema_helper, probe_interval=None):
def __init__(self, remote, schema_helper, leader_only=True, probe_interval=None):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
Expand All @@ -119,6 +122,9 @@ def __init__(self, remote, schema_helper, probe_interval=None):
The IDL uses and modifies 'schema' directly.
If 'leader_only' is set to True (default value) the IDL will only monitor
and transact with the leader of the cluster.
If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
Expand All @@ -137,6 +143,20 @@ def __init__(self, remote, schema_helper, probe_interval=None):
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()

# Server monitor.
self._server_schema_request_id = None
self._server_monitor_request_id = None
self._db_change_aware_request_id = None
self._server_db_name = '_Server'
self._server_db_table = 'Database'
self.server_tables = None
self._server_db = None
self.server_monitor_uuid = uuid.uuid1()
self.leader_only = leader_only
self.cluster_id = None
self._min_index = 0

self.state = self.IDL_S_INITIAL

# Database locking.
Expand Down Expand Up @@ -172,6 +192,15 @@ def _parse_remotes(self, remote):
remotes.append(r)
return remotes

def set_cluster_id(self, cluster_id):
"""Set the id of the cluster that this idl must connect to."""
if cluster_id:
self.cluster_id = str(cluster_id)
else:
self.cluster_id = None
if self.state != self.IDL_S_INITIAL:
self.force_reconnect()

def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
Expand Down Expand Up @@ -222,14 +251,15 @@ def run(self):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
self.__send_monitor_request()
self.__send_server_schema_request()
if self.lock_name:
self.__send_lock_request()
break

msg = self._session.recv()
if msg is None:
break

if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
Expand All @@ -239,7 +269,15 @@ def run(self):
and msg.method == "update"
and len(msg.params) == 2):
# Database contents changed.
self.__parse_update(msg.params[1], OVSDB_UPDATE)
if msg.params[0] == str(self.server_monitor_uuid):
self.__parse_update(msg.params[1], OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if not self.__check_server_db():
self.force_reconnect()
break
else:
self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
Expand All @@ -248,16 +286,65 @@ def run(self):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
assert self.state == self.IDL_S_MONITOR_REQUESTED
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE)

except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_schema_request_id is not None
and self._server_schema_request_id == msg.id):
# Reply to our "get_schema" of _Server request.
try:
self._server_schema_request_id = None
sh = SchemaHelper(None, msg.result)
sh.register_table(self._server_db_table)
schema = sh.get_idl_schema()
self._server_db = schema
self.server_tables = schema.tables
self.__send_server_monitor_request()
except error.Error as e:
vlog.err("%s: error receiving server schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_monitor_request_id is not None
and self._server_monitor_request_id == msg.id):
# Reply to our "monitor" of _Server request.
try:
self._server_monitor_request_id = None
self.__parse_update(msg.result, OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if self.__check_server_db():
self.__send_monitor_request()
self.__send_db_change_aware()
else:
self.force_reconnect()
break
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._db_change_aware_request_id is not None
and self._db_change_aware_request_id == msg.id):
# Reply to us notifying the server of our change awarness.
self._db_change_aware_request_id = None
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
and self._lock_request_id == msg.id):
Expand All @@ -275,10 +362,20 @@ def run(self):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_MONITOR_COND_REQUESTED and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
self._server_schema_request_id = None
if self.cluster_id:
self.force_reconnect()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
Expand Down Expand Up @@ -440,12 +537,19 @@ def __parse_lock_notify(self, params, new_has_lock):
if not new_has_lock:
self.is_lock_contended = True

def __send_db_change_aware(self):
msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
[True])
self._db_change_aware_request_id = msg.id
self._session.send(msg)

def __send_monitor_request(self):
if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_MONITOR_COND_REQUESTED
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
self.IDL_S_INITIAL]):
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
else:
self.state = self.IDL_S_MONITOR_REQUESTED
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"

monitor_requests = {}
Expand All @@ -467,20 +571,50 @@ def __send_monitor_request(self):
self._monitor_request_id = msg.id
self._session.send(msg)

def __parse_update(self, update, version):
def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
res = self._session.send(msg)

def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
monitor_requests = {}
table = self.server_tables[self._server_db_table]
columns = [column for column in six.iterkeys(table.columns)]
for column in six.itervalues(table.columns):
if not hasattr(column, 'alert'):
column.alert = True
table.rows = custom_index.IndexedRows(table)
table.need_table = False
table.idl = self
monitor_request = {"columns": columns}
monitor_requests[table.name] = [monitor_request]
msg = ovs.jsonrpc.Message.create_request(
'monitor', [self._server_db.name,
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
self._session.send(msg)

def __parse_update(self, update, version, tables=None):
try:
self.__do_parse_update(update, version)
if not tables:
self.__do_parse_update(update, version, self.tables)
else:
self.__do_parse_update(update, version, tables)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))

def __do_parse_update(self, table_updates, version):
def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)

for table_name, table_update in six.iteritems(table_updates):
table = self.tables.get(table_name)
table = tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
Expand Down Expand Up @@ -605,6 +739,57 @@ def __process_update(self, table, uuid, old, new):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed

def __check_server_db(self):
"""Returns True if this is a valid ovsdb server, False otherwise."""
session_name = self._session.get_name()

if self._server_db_table not in self.server_tables:
vlog.info("%s: server does not have %s table in its %s database"
% (session_name, self._server_db_table, self._server_db_name))
return False

rows = self.server_tables[self._server_db_table].rows

database = None
for row in six.itervalues(rows):
if self.cluster_id:
if self.cluster_id in \
map(lambda x: str(x)[:4], row.cid):
database = row
break
elif row.name == self._db.name:
database = row
break

if not database:
vlog.info("%s: server does not have %s database"
% (session_name, self._db.name))
return False

if (database.model == CLUSTERED and
self._session.get_num_of_remotes() > 1):
if not database.schema:
vlog.info('%s: clustered database server has not yet joined '
'cluster; trying another server' % session_name)
return False
if not database.connected:
vlog.info('%s: clustered database server is disconnected '
'from cluster; trying another server' % session_name)
return False
if (self.leader_only and
not database.leader):
vlog.info('%s: clustered database server is not cluster '
'leader; trying another server' % session_name)
return False
if database.index:
if database.index[0] < self._min_index:
vlog.warn('%s: clustered database server has stale data; '
'trying another server' % session_name)
return False
self._min_index = database.index[0]

return True

def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)
Expand Down
3 changes: 3 additions & 0 deletions python/ovs/reconnect.py
Expand Up @@ -344,6 +344,9 @@ def disconnected(self, now, error):
else:
self.info_level("%s: error listening for connections"
% self.name)
elif self.state == Reconnect.Reconnect:
self.info_level("%s: connection closed by client"
% self.name)
elif self.backoff < self.max_backoff:
if self.passive:
type_ = "listen"
Expand Down

0 comments on commit d9fb0e5

Please sign in to comment.