Skip to content

Commit

Permalink
Added support for update of remote service properties. On export side,
Browse files Browse the repository at this point in the history
if properties are changed via
ServiceRegistration.set_properties(new_props), the
ExportRegistration.update is called by the TopologyManager, and this
results in the updates being made to the ExportEndpoint, and then a
RemoteServiceAdminEvent EXPORT_UPDATE event.  If discovery is used (e.g.
etcd has been changed to support), this will update the advertisement
and on the import side an EndpointEvent.MODIFIED will be generated by
the discovery mechanism.  Via the topology manager, this will result in
an ImportRegistrationImpl.update call, which updates both the proxy
properties and the import endpoint description.
  • Loading branch information
scottslewis committed Jun 24, 2018
1 parent 31460ce commit 13cee08
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 43 deletions.
76 changes: 76 additions & 0 deletions pelix/rsa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,17 @@ def match_sr(self, svc_ref, cid=None):
"""
raise Exception("{0}.get_description not implemented".format(self))

def update(self, properties):
# type: (dictionary) -> EndpointDescription
"""
Updates ExportRegistration with new properties.
:param properties a dictionary of new properties. May be None.
:return: EndpointDescription for ExportRegistration, or None if not
updated.
"""
raise Exception("{0}.update not implemented".format(self))

def close(self):
# type: () -> None
"""
Expand Down Expand Up @@ -514,6 +525,17 @@ def get_description(self):
"""
raise Exception("{0}.get_description not implemented".format(self))

def update(self, endpoint_description):
# type: (EndpointDescription) -> boolean
"""
Update the service properties of the imported service.
:param endpoint_description: EndpointDescription for updated endpoint. Will not
be None.
:return: True if update completed successfully, False if not.
"""
raise Exception("{0}.update not implemented".format(self))

def close(self):
# type: () -> None
"""
Expand Down Expand Up @@ -737,6 +759,60 @@ def fromexportreg(cls, bundle, export_reg):
export_reg.get_description(),
)

@classmethod
def fromexportupdate(cls, bundle, export_reg):
# type: (Bundle, ExportRegistration) -> RemoteServiceAdminEvent
exc = export_reg.get_exception()
if exc:
return RemoteServiceAdminEvent(
RemoteServiceAdminEvent.EXPORT_ERROR,
bundle,
export_reg.get_export_container_id(),
export_reg.get_remoteservice_id(),
None,
export_reg.get_export_reference(),
None,
export_reg.get_description(),
)
else:
return RemoteServiceAdminEvent(
RemoteServiceAdminEvent.EXPORT_UPDATE,
bundle,
export_reg.get_export_container_id(),
export_reg.get_remoteservice_id(),
None,
export_reg.get_export_reference(),
None,
export_reg.get_description(),
)

@classmethod
def fromimportupdate(cls, bundle, import_reg):
# type: (Bundle, ImportRegistration) -> RemoteServiceAdminEvent
exc = import_reg.get_exception()
if exc:
return RemoteServiceAdminEvent(
RemoteServiceAdminEvent.IMPORT_ERROR,
bundle,
import_reg.get_import_container_id(),
import_reg.get_remoteservice_id(),
None,
None,
exc,
import_reg.get_description(),
)
else:
return RemoteServiceAdminEvent(
RemoteServiceAdminEvent.IMPORT_UPDATE,
bundle,
import_reg.get_import_container_id(),
import_reg.get_remoteservice_id(),
import_reg.get_import_reference(),
None,
None,
import_reg.get_description(),
)

@classmethod
def fromimportunreg(cls, bundle, cid, rsid, import_ref, exception, ed):
# type: (Bundle, Tuple[str, str], Tuple[Tuple[str, str], int], ImportReference, Optional[Tuple[Any, Any, Any]], EndpointDescription) -> RemoteServiceAdminEvent
Expand Down
28 changes: 28 additions & 0 deletions pelix/rsa/providers/discovery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ def advertise_endpoint(self, endpoint_description):
if advertise_result:
self._add_advertised(endpoint_description, advertise_result)
return True
else:
return False

def update_endpoint(self, updated_ed):
"""
Update a previously advertised endpoint_description.
:param endpoint_description an instance of EndpointDescription to update. Must not be None.
:return True if advertised, False if not (e.g. it's already been advertised)
"""
endpointid = updated_ed.get_id()
with self._published_endpoints_lock:
if self.get_advertised_endpoint(endpointid) == None:
return False
advertise_result = self._update(updated_ed)
if advertise_result:
self._remove_advertised(endpointid)
self._add_advertised(updated_ed, advertise_result)
return True
else:
return False

def unadvertise_endpoint(self, endpointid):
"""
Expand Down Expand Up @@ -151,6 +172,9 @@ def _remove_advertised(self, endpointid):
def _advertise(self, endpoint_description):
raise Exception("Endpoint._advertise must be overridden by subclasses")

def _update(self, endpoint_description):
raise Exception("Endpoint._update must be overrridden by subclasses")

def _unadvertise(self, advertised):
raise Exception(
"Endpoint._unadvertise must be overridden by subclasses"
Expand Down Expand Up @@ -295,6 +319,10 @@ def _get_matching_endpoint_event_listeners(self, ed):
result.append((l[0], matching_filter))
return result

def _has_discovered_endpoint(self, ed_id):
with self._discovered_endpoints_lock:
return self._discovered_endpoints.get(ed_id,None)

def _add_discovered_endpoint(self, ed):
with self._discovered_endpoints_lock:
_logger.debug("_add_discovered_endpoint ed=%s", ed)
Expand Down
104 changes: 73 additions & 31 deletions pelix/rsa/providers/discovery/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ def _validate_component(self):
def _invalidate(self, _):
self._disconnect()

# implementation of EndpointAdvertiser service. These methods
# are called when (e.g.) RSA asks us to advertise/unadvertise
# an endpoint_description
def _advertise(self, endpoint_description):
def _write_description(self, endpoint_description):
# type: (EndpointDescription) -> etcd.EtcdResult
_logger.debug("advertising ed=%s", endpoint_description)
# encode props as string -> string
encoded_props = encode_endpoint_props(endpoint_description)
# get copy of service props
Expand All @@ -186,6 +182,19 @@ def _advertise(self, endpoint_description):
value=props_json,
)

# implementation of EndpointAdvertiser service. These methods
# are called when (e.g.) RSA asks us to advertise/unadvertise
# an endpoint_description
def _advertise(self, endpoint_description):
# type: (EndpointDescription) -> etcd.EtcdResult
_logger.debug("advertising ed=%s", endpoint_description)
return self._write_description(endpoint_description)

def _update(self, endpoint_description):
# type: (EndpointDescription) -> etcd.EtcdResult
_logger.debug("updating ed=%s", endpoint_description)
return self._write_description(endpoint_description)

def _unadvertise(self, advertised):
# type: (List[EndpointDescription]) -> etcd.EtcdResult
_logger.debug("unadvertising ed=%s", advertised[0])
Expand Down Expand Up @@ -306,12 +315,45 @@ def _handle_add_nodes(self, nodes):
}
# decode
decoded_props = decode_endpoint_props(raw_props)
ed = EndpointDescription(properties=decoded_props)
# add discovered endpoint to our internal list
self._add_discovered_endpoint(ed)
# dispatch
self._fire_endpoint_event(EndpointEvent.ADDED, ed)

new_ed = EndpointDescription(properties=decoded_props)
old_ed = self._has_discovered_endpoint(new_ed.get_id())
if not old_ed:
# add discovered endpoint to our internal list
self._add_discovered_endpoint(new_ed)
# dispatch
self._fire_endpoint_event(EndpointEvent.ADDED, new_ed)
else:
# get timestamp and make sure new one is newer (an update)
old_ts = old_ed.get_timestamp()
new_ts = new_ed.get_timestamp()
if new_ts > old_ts:
self._remove_discovered_endpoint(old_ed.get_id())
self._add_discovered_endpoint(new_ed)
self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed)

def _handle_update_nodes(self, nodes):
for node in nodes:
# we only care about properties
node_val = node.value
if node_val:
json_obj = json.loads(node_val)
if isinstance(json_obj, dict):
json_properties = json_obj["properties"]
# get the name and value from each entry
raw_props = {
entry["name"]: entry["value"]
for entry in json_properties
if entry["type"] == "string"
}
# decode
decoded_props = decode_endpoint_props(raw_props)
new_ed = EndpointDescription(properties=decoded_props)
old_ed = self._remove_discovered_endpoint(new_ed.get_id())
if old_ed:
self._add_discovered_endpoint(new_ed)
# dispatch
self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed)

def _handle_remove_node(self, endpointid):
ed = self._remove_discovered_endpoint(endpointid)
if ed:
Expand Down Expand Up @@ -352,26 +394,26 @@ def _watch_job(self):
# we are done
return
else:
if action != "update":
# split id into [sessionid] or [sessionid,endpointid]
splitid = key[len(self._top_path) + 1 :].split("/")
sessionid = splitid[0]
if self._sessionid != sessionid:
if isinstance(splitid, list):
endpointid = splitid[len(splitid) - 1]
else:
endpointid = None
if not endpointid:
if action in self.REMOVE_ACTIONS:
# other session deleted
self._handle_remove_dir(sessionid)
elif action in self.ADD_ACTIONS:
self._handle_add_dir(result)
else:
if action in self.REMOVE_ACTIONS:
self._handle_remove_node(endpointid)
elif action in self.ADD_ACTIONS:
self._handle_add_nodes([result])
# split id into [sessionid] or [sessionid,endpointid]
splitid = key[len(self._top_path) + 1 :].split("/")
sessionid = splitid[0]
if self._sessionid != sessionid:
if isinstance(splitid, list):
endpointid = splitid[len(splitid) - 1]
else:
endpointid = None
if not endpointid:
if action in self.REMOVE_ACTIONS:
# other session deleted
self._handle_remove_dir(sessionid)
elif action in self.ADD_ACTIONS:
self._handle_add_dir(result)
else:
if action in self.REMOVE_ACTIONS:
self._handle_remove_node(endpointid)
elif action in self.ADD_ACTIONS:
self._handle_add_nodes([result])

except:
_logger.exception("watch_job:Exception in watch loop")

Expand Down
4 changes: 3 additions & 1 deletion pelix/rsa/providers/distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def _match_required_configs(self, required_configs):
self._supported_configs to make sure that all required configs are
present for this distribution provider.
"""
if required_configs is None or not self._supported_configs:
if not required_configs:
return True
if not self._supported_configs:
return False
return len(
[x for x in required_configs if x in self._supported_configs]
Expand Down
59 changes: 56 additions & 3 deletions pelix/rsa/remoteserviceadmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ def get_exception(self):
with self.__lock:
return (
self.__updateexception
if self.__closed
if self.__updateexception or self.__closed
else self.__exportref.get_exception()
)

Expand All @@ -945,6 +945,34 @@ def get_description(self):
with self.__lock:
return None if self.__closed else self.__exportref.get_description()

def update(self, properties):
# type: (dictionary) -> Optional[EndpointDescription]
with self.__lock:
if self.__closed:
self.__updateexception = ValueError("Update failed since ExportRegistration already closed")
return None
# if properties is set then copy
props = properties.copy() if properties else dict()
try:
updated_ed = self.__exportref.update(props)
except Exception as e:
self.__updateexception = e
return None

if not updated_ed:
self.__updatexception = ValueError("Update failed because ExportEndpoint was None")
return None

self.__updateexception = None
if self.__rsa:
self.__rsa._publish_event(
RemoteServiceAdminEvent.fromexportupdate(
self.__rsa._get_bundle(),
self
)
)
return updated_ed

def close(self):
"""
Cleans up the export endpoint
Expand Down Expand Up @@ -1237,6 +1265,7 @@ def __init__(self, endpoint=None, exception=None, errored=None):
endpoint._add_import_registration(self)
self.__importref = ImportReferenceImpl.fromendpoint(endpoint)
self.__closed = False
self.__updateexception = None
self.__lock = threading.RLock()

def _import_endpoint(self):
Expand Down Expand Up @@ -1288,13 +1317,37 @@ def get_reference(self):
def get_exception(self):
# type: () -> Optional[Tuple[Any, Any, Any]]
with self.__lock:
return None if self.__closed else self.__importref.get_exception()

return (
self.__updateexception
if self.__updateexception or self.__closed
else self.__importref.get_exception()
)

def get_description(self):
# type: () -> Optional[EndpointDescription]
with self.__lock:
return None if self.__closed else self.__importref.get_description()

def update(self, endpoint_description):
# type: (EndpointDescription) -> boolean
with self.__lock:
if self.__closed:
self.__updateexception = ValueError("Update failed since ImportRegistration already closed")
return False
try:
self.__importref.update(endpoint_description)
except Exception as e:
self.__updateexception = e
return False
if self.__rsa:
self.__rsa._publish_event(RemoteServiceAdminEvent.fromimportupdate(
self.__rsa._get_bundle(),
self)
)
return True
else:
return False

def close(self):
publish = False
importerid = rsid = import_ref = exception = ed = None
Expand Down
Loading

0 comments on commit 13cee08

Please sign in to comment.