From 13cee08c7832e1452854a8337ae8011a6a42eda7 Mon Sep 17 00:00:00 2001 From: slewis Date: Sat, 23 Jun 2018 22:45:59 -0700 Subject: [PATCH] Added support for update of remote service properties. On export side, if properties are changed via ServiceRegistration.set_properties(new_props), the ExportRegistration.update is called by the TopologyManager, and this results in the updates being made to the ExportEndpoint, and then a RemoteServiceAdminEvent EXPORT_UPDATE event. If discovery is used (e.g. etcd has been changed to support), this will update the advertisement and on the import side an EndpointEvent.MODIFIED will be generated by the discovery mechanism. Via the topology manager, this will result in an ImportRegistrationImpl.update call, which updates both the proxy properties and the import endpoint description. --- pelix/rsa/__init__.py | 76 ++++++++++++++ pelix/rsa/providers/discovery/__init__.py | 28 +++++ pelix/rsa/providers/discovery/etcd.py | 104 +++++++++++++------ pelix/rsa/providers/distribution/__init__.py | 4 +- pelix/rsa/remoteserviceadmin.py | 59 ++++++++++- pelix/rsa/topologymanagers/__init__.py | 38 ++++++- pelix/rsa/topologymanagers/basic.py | 6 ++ samples/rsa/helloconsumer_xmlrpc.py | 41 ++++++++ samples/rsa/helloimpl_update.py | 24 +++++ samples/rsa/helloimpl_xmlrpc.py | 4 +- samples/run_rsa_etcd_xmlrpc.py | 2 +- samples/run_rsa_xmlrpc.py | 2 +- 12 files changed, 345 insertions(+), 43 deletions(-) create mode 100644 samples/rsa/helloconsumer_xmlrpc.py create mode 100644 samples/rsa/helloimpl_update.py diff --git a/pelix/rsa/__init__.py b/pelix/rsa/__init__.py index 28d7fd76..424fc58e 100644 --- a/pelix/rsa/__init__.py +++ b/pelix/rsa/__init__.py @@ -313,6 +313,17 @@ def match_sr(self, svc_ref, cid=None): """ raise Exception("{0}.get_description not implemented".format(self)) + def update(self, properties): + # type: (dictionary) -> EndpointDescription + """ + Updates ExportRegistration with new properties. + + :param properties a dictionary of new properties. May be None. + :return: EndpointDescription for ExportRegistration, or None if not + updated. + """ + raise Exception("{0}.update not implemented".format(self)) + def close(self): # type: () -> None """ @@ -514,6 +525,17 @@ def get_description(self): """ raise Exception("{0}.get_description not implemented".format(self)) + def update(self, endpoint_description): + # type: (EndpointDescription) -> boolean + """ + Update the service properties of the imported service. + + :param endpoint_description: EndpointDescription for updated endpoint. Will not + be None. + :return: True if update completed successfully, False if not. + """ + raise Exception("{0}.update not implemented".format(self)) + def close(self): # type: () -> None """ @@ -737,6 +759,60 @@ def fromexportreg(cls, bundle, export_reg): export_reg.get_description(), ) + @classmethod + def fromexportupdate(cls, bundle, export_reg): + # type: (Bundle, ExportRegistration) -> RemoteServiceAdminEvent + exc = export_reg.get_exception() + if exc: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.EXPORT_ERROR, + bundle, + export_reg.get_export_container_id(), + export_reg.get_remoteservice_id(), + None, + export_reg.get_export_reference(), + None, + export_reg.get_description(), + ) + else: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.EXPORT_UPDATE, + bundle, + export_reg.get_export_container_id(), + export_reg.get_remoteservice_id(), + None, + export_reg.get_export_reference(), + None, + export_reg.get_description(), + ) + + @classmethod + def fromimportupdate(cls, bundle, import_reg): + # type: (Bundle, ImportRegistration) -> RemoteServiceAdminEvent + exc = import_reg.get_exception() + if exc: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.IMPORT_ERROR, + bundle, + import_reg.get_import_container_id(), + import_reg.get_remoteservice_id(), + None, + None, + exc, + import_reg.get_description(), + ) + else: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.IMPORT_UPDATE, + bundle, + import_reg.get_import_container_id(), + import_reg.get_remoteservice_id(), + import_reg.get_import_reference(), + None, + None, + import_reg.get_description(), + ) + @classmethod def fromimportunreg(cls, bundle, cid, rsid, import_ref, exception, ed): # type: (Bundle, Tuple[str, str], Tuple[Tuple[str, str], int], ImportReference, Optional[Tuple[Any, Any, Any]], EndpointDescription) -> RemoteServiceAdminEvent diff --git a/pelix/rsa/providers/discovery/__init__.py b/pelix/rsa/providers/discovery/__init__.py index 6b615eae..ab4312c0 100644 --- a/pelix/rsa/providers/discovery/__init__.py +++ b/pelix/rsa/providers/discovery/__init__.py @@ -86,6 +86,27 @@ def advertise_endpoint(self, endpoint_description): if advertise_result: self._add_advertised(endpoint_description, advertise_result) return True + else: + return False + + def update_endpoint(self, updated_ed): + """ + Update a previously advertised endpoint_description. + + :param endpoint_description an instance of EndpointDescription to update. Must not be None. + :return True if advertised, False if not (e.g. it's already been advertised) + """ + endpointid = updated_ed.get_id() + with self._published_endpoints_lock: + if self.get_advertised_endpoint(endpointid) == None: + return False + advertise_result = self._update(updated_ed) + if advertise_result: + self._remove_advertised(endpointid) + self._add_advertised(updated_ed, advertise_result) + return True + else: + return False def unadvertise_endpoint(self, endpointid): """ @@ -151,6 +172,9 @@ def _remove_advertised(self, endpointid): def _advertise(self, endpoint_description): raise Exception("Endpoint._advertise must be overridden by subclasses") + def _update(self, endpoint_description): + raise Exception("Endpoint._update must be overrridden by subclasses") + def _unadvertise(self, advertised): raise Exception( "Endpoint._unadvertise must be overridden by subclasses" @@ -295,6 +319,10 @@ def _get_matching_endpoint_event_listeners(self, ed): result.append((l[0], matching_filter)) return result + def _has_discovered_endpoint(self, ed_id): + with self._discovered_endpoints_lock: + return self._discovered_endpoints.get(ed_id,None) + def _add_discovered_endpoint(self, ed): with self._discovered_endpoints_lock: _logger.debug("_add_discovered_endpoint ed=%s", ed) diff --git a/pelix/rsa/providers/discovery/etcd.py b/pelix/rsa/providers/discovery/etcd.py index 357b056a..01dcf520 100644 --- a/pelix/rsa/providers/discovery/etcd.py +++ b/pelix/rsa/providers/discovery/etcd.py @@ -158,12 +158,8 @@ def _validate_component(self): def _invalidate(self, _): self._disconnect() - # implementation of EndpointAdvertiser service. These methods - # are called when (e.g.) RSA asks us to advertise/unadvertise - # an endpoint_description - def _advertise(self, endpoint_description): + def _write_description(self, endpoint_description): # type: (EndpointDescription) -> etcd.EtcdResult - _logger.debug("advertising ed=%s", endpoint_description) # encode props as string -> string encoded_props = encode_endpoint_props(endpoint_description) # get copy of service props @@ -186,6 +182,19 @@ def _advertise(self, endpoint_description): value=props_json, ) + # implementation of EndpointAdvertiser service. These methods + # are called when (e.g.) RSA asks us to advertise/unadvertise + # an endpoint_description + def _advertise(self, endpoint_description): + # type: (EndpointDescription) -> etcd.EtcdResult + _logger.debug("advertising ed=%s", endpoint_description) + return self._write_description(endpoint_description) + + def _update(self, endpoint_description): + # type: (EndpointDescription) -> etcd.EtcdResult + _logger.debug("updating ed=%s", endpoint_description) + return self._write_description(endpoint_description) + def _unadvertise(self, advertised): # type: (List[EndpointDescription]) -> etcd.EtcdResult _logger.debug("unadvertising ed=%s", advertised[0]) @@ -306,12 +315,45 @@ def _handle_add_nodes(self, nodes): } # decode decoded_props = decode_endpoint_props(raw_props) - ed = EndpointDescription(properties=decoded_props) - # add discovered endpoint to our internal list - self._add_discovered_endpoint(ed) - # dispatch - self._fire_endpoint_event(EndpointEvent.ADDED, ed) - + new_ed = EndpointDescription(properties=decoded_props) + old_ed = self._has_discovered_endpoint(new_ed.get_id()) + if not old_ed: + # add discovered endpoint to our internal list + self._add_discovered_endpoint(new_ed) + # dispatch + self._fire_endpoint_event(EndpointEvent.ADDED, new_ed) + else: + # get timestamp and make sure new one is newer (an update) + old_ts = old_ed.get_timestamp() + new_ts = new_ed.get_timestamp() + if new_ts > old_ts: + self._remove_discovered_endpoint(old_ed.get_id()) + self._add_discovered_endpoint(new_ed) + self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed) + + def _handle_update_nodes(self, nodes): + for node in nodes: + # we only care about properties + node_val = node.value + if node_val: + json_obj = json.loads(node_val) + if isinstance(json_obj, dict): + json_properties = json_obj["properties"] + # get the name and value from each entry + raw_props = { + entry["name"]: entry["value"] + for entry in json_properties + if entry["type"] == "string" + } + # decode + decoded_props = decode_endpoint_props(raw_props) + new_ed = EndpointDescription(properties=decoded_props) + old_ed = self._remove_discovered_endpoint(new_ed.get_id()) + if old_ed: + self._add_discovered_endpoint(new_ed) + # dispatch + self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed) + def _handle_remove_node(self, endpointid): ed = self._remove_discovered_endpoint(endpointid) if ed: @@ -352,26 +394,26 @@ def _watch_job(self): # we are done return else: - if action != "update": - # split id into [sessionid] or [sessionid,endpointid] - splitid = key[len(self._top_path) + 1 :].split("/") - sessionid = splitid[0] - if self._sessionid != sessionid: - if isinstance(splitid, list): - endpointid = splitid[len(splitid) - 1] - else: - endpointid = None - if not endpointid: - if action in self.REMOVE_ACTIONS: - # other session deleted - self._handle_remove_dir(sessionid) - elif action in self.ADD_ACTIONS: - self._handle_add_dir(result) - else: - if action in self.REMOVE_ACTIONS: - self._handle_remove_node(endpointid) - elif action in self.ADD_ACTIONS: - self._handle_add_nodes([result]) + # split id into [sessionid] or [sessionid,endpointid] + splitid = key[len(self._top_path) + 1 :].split("/") + sessionid = splitid[0] + if self._sessionid != sessionid: + if isinstance(splitid, list): + endpointid = splitid[len(splitid) - 1] + else: + endpointid = None + if not endpointid: + if action in self.REMOVE_ACTIONS: + # other session deleted + self._handle_remove_dir(sessionid) + elif action in self.ADD_ACTIONS: + self._handle_add_dir(result) + else: + if action in self.REMOVE_ACTIONS: + self._handle_remove_node(endpointid) + elif action in self.ADD_ACTIONS: + self._handle_add_nodes([result]) + except: _logger.exception("watch_job:Exception in watch loop") diff --git a/pelix/rsa/providers/distribution/__init__.py b/pelix/rsa/providers/distribution/__init__.py index cb1bde57..f09e939e 100644 --- a/pelix/rsa/providers/distribution/__init__.py +++ b/pelix/rsa/providers/distribution/__init__.py @@ -173,7 +173,9 @@ def _match_required_configs(self, required_configs): self._supported_configs to make sure that all required configs are present for this distribution provider. """ - if required_configs is None or not self._supported_configs: + if not required_configs: + return True + if not self._supported_configs: return False return len( [x for x in required_configs if x in self._supported_configs] diff --git a/pelix/rsa/remoteserviceadmin.py b/pelix/rsa/remoteserviceadmin.py index c2762f0a..6711e890 100644 --- a/pelix/rsa/remoteserviceadmin.py +++ b/pelix/rsa/remoteserviceadmin.py @@ -931,7 +931,7 @@ def get_exception(self): with self.__lock: return ( self.__updateexception - if self.__closed + if self.__updateexception or self.__closed else self.__exportref.get_exception() ) @@ -945,6 +945,34 @@ def get_description(self): with self.__lock: return None if self.__closed else self.__exportref.get_description() + def update(self, properties): + # type: (dictionary) -> Optional[EndpointDescription] + with self.__lock: + if self.__closed: + self.__updateexception = ValueError("Update failed since ExportRegistration already closed") + return None + # if properties is set then copy + props = properties.copy() if properties else dict() + try: + updated_ed = self.__exportref.update(props) + except Exception as e: + self.__updateexception = e + return None + + if not updated_ed: + self.__updatexception = ValueError("Update failed because ExportEndpoint was None") + return None + + self.__updateexception = None + if self.__rsa: + self.__rsa._publish_event( + RemoteServiceAdminEvent.fromexportupdate( + self.__rsa._get_bundle(), + self + ) + ) + return updated_ed + def close(self): """ Cleans up the export endpoint @@ -1237,6 +1265,7 @@ def __init__(self, endpoint=None, exception=None, errored=None): endpoint._add_import_registration(self) self.__importref = ImportReferenceImpl.fromendpoint(endpoint) self.__closed = False + self.__updateexception = None self.__lock = threading.RLock() def _import_endpoint(self): @@ -1288,13 +1317,37 @@ def get_reference(self): def get_exception(self): # type: () -> Optional[Tuple[Any, Any, Any]] with self.__lock: - return None if self.__closed else self.__importref.get_exception() - + return ( + self.__updateexception + if self.__updateexception or self.__closed + else self.__importref.get_exception() + ) + def get_description(self): # type: () -> Optional[EndpointDescription] with self.__lock: return None if self.__closed else self.__importref.get_description() + def update(self, endpoint_description): + # type: (EndpointDescription) -> boolean + with self.__lock: + if self.__closed: + self.__updateexception = ValueError("Update failed since ImportRegistration already closed") + return False + try: + self.__importref.update(endpoint_description) + except Exception as e: + self.__updateexception = e + return False + if self.__rsa: + self.__rsa._publish_event(RemoteServiceAdminEvent.fromimportupdate( + self.__rsa._get_bundle(), + self) + ) + return True + else: + return False + def close(self): publish = False importerid = rsid = import_ref = exception = ed = None diff --git a/pelix/rsa/topologymanagers/__init__.py b/pelix/rsa/topologymanagers/__init__.py index 0aba9bd7..a295325b 100644 --- a/pelix/rsa/topologymanagers/__init__.py +++ b/pelix/rsa/topologymanagers/__init__.py @@ -62,6 +62,7 @@ ) from pelix.rsa.endpointdescription import EndpointDescription +from pelix.rsa.endpointdescription import EndpointDescription # ------------------------------------------------------------------------------ # Module version @@ -113,7 +114,14 @@ def _unimport_removed_endpoint(self, endpoint_description): for import_reg in import_regs: if import_reg.match_ed(endpoint_description): import_reg.close() - + + def _update_imported_endpoint(self, endpoint_description): + # type: (EndpointDescription) -> None + import_regs = self._rsa._get_import_regs() + for import_reg in import_regs: + if import_reg.match_ed(endpoint_description): + import_reg.update(endpoint_description) + def _handle_service_registered(self, service_ref): # type: (ServiceReference) -> None exp_intfs = get_exported_interfaces(service_ref) @@ -138,12 +146,23 @@ def _handle_service_unregistering(self, service_ref): export_reg.close() def _handle_service_modified(self, service_ref): - # type: (ServiceReference) -> None + # type: (ServiceReference) -> EndpointDescription export_regs = self._rsa._get_export_regs() if export_regs: for export_reg in export_regs: if export_reg.match_sr(service_ref): - export_reg.get_export_reference().update({}) + _logger.debug( + "_handle_service_modified. updating " + "export_registration for service reference=%s", + service_ref + ) + + # actually update the export_reg here + if not export_reg.update(None): + _logger.warning( + "_handle_service_modified. updating" + "update for service_ref=%s failed", service_ref + ) def _handle_event(self, service_event): # type: (ServiceEvent) -> None @@ -173,6 +192,17 @@ def _advertise_endpoint(self, ed): adv, ed ) + def _update_endpoint(self, ed): + # type: (EndpointDescription) -> None + for adv in self._advertisers: + try: + adv.update_endpoint(ed) + except: + _logger.exception( + "Exception in update_endpoint for advertiser=%s " + "endpoint=%s", adv, ed + ) + def _unadvertise_endpoint(self, ed): # type: (EndpointDescription) -> None for adv in self._advertisers: @@ -192,6 +222,8 @@ def remote_admin_event(self, event): self._advertise_endpoint(event.get_description()) elif kind == RemoteServiceAdminEvent.EXPORT_UNREGISTRATION: self._unadvertise_endpoint(event.get_description()) + elif kind == RemoteServiceAdminEvent.EXPORT_UPDATE: + self._update_endpoint(event.get_description()) def endpoint_changed(self, endpoint_event, matched_filter): # type: (EndpointEvent, Any) -> None diff --git a/pelix/rsa/topologymanagers/basic.py b/pelix/rsa/topologymanagers/basic.py index 40f61d2b..c23221b9 100644 --- a/pelix/rsa/topologymanagers/basic.py +++ b/pelix/rsa/topologymanagers/basic.py @@ -115,6 +115,12 @@ def endpoint_changed(self, endpoint_event, matched_filter): "BasicTopologyManager: endpoint removed. endpoint.id=%s", ed_id ) + elif event_type == EndpointEvent.MODIFIED: + self._update_imported_endpoint(ed) + _logger.debug( + "BasicTopologyManager: endpoint updated. endpoint.id=%s", + ed_id + ) def remote_admin_event(self, event): # type: (RemoteServiceAdminEvent) -> None diff --git a/samples/rsa/helloconsumer_xmlrpc.py b/samples/rsa/helloconsumer_xmlrpc.py new file mode 100644 index 00000000..0cf59bc0 --- /dev/null +++ b/samples/rsa/helloconsumer_xmlrpc.py @@ -0,0 +1,41 @@ +# This remote service consumer requires some java or python distribution provider +# (e.g. py4j or xmlrpc) and an implementation service (java or python) that +# exports the org.eclipse.ecf.examples.hello.IHello service interface: +# https://github.com/ECF/AsyncRemoteServiceExamples/blob/master/hello/org.eclipse.ecf.examples.hello.javahost/src/org/eclipse/ecf/examples/hello/javahost/HelloImpl.java +# +# When the IHello remote service impl is discovered and imported via discovery provider +# and topology manager, or imported via RSA importservice command, a IHello proxy will +# be injected by ipopo into the _helloservice field of the RemoteHelloConsumer instance, +# and _validate will then be called. With the implementation below, the _validate +# method then calls the proxy's IHello.sayHello, sayHelloAsync, and sayHelloPromise on +# the remote service. +from pelix.ipopo.decorators import ComponentFactory,Instantiate,Requires,Validate + +from concurrent.futures import ThreadPoolExecutor + +@ComponentFactory("remote-hello-consumer-factory") +# The '(service.imported=*)' filter only allows remote services to be injected +@Requires("_helloservice", "org.eclipse.ecf.examples.hello.IHello", False, False, "(service.imported=*)", False) +@Instantiate("remote-hello-consumer") +class RemoteHelloConsumer(object): + + def __init__(self): + self._helloservice = None + self._name = 'Python' + self._msg = 'Hello Java' + self._executor = ThreadPoolExecutor() + + @Validate + def _validate(self,bundle_context): + # call it! + resp = self._helloservice.sayHello(self._name+'Sync', self._msg) + print("{0} IHello service consumer received sync response: {1}".format(self._name,resp)) + # call sayHelloAsync which returns Future and we add lambda to print the result when done + self._executor.submit(self._helloservice.sayHelloAsync,self._name+'Async', self._msg).add_done_callback(lambda f: print('async response: {0}'.format(f.result()))) + print("done with sayHelloAsync method") + # call sayHelloAsync which returns Future and we add lambda to print the result when done + self._executor.submit(self._helloservice.sayHelloPromise,self._name+'Promise', self._msg).add_done_callback(lambda f: print('promise response: {0}'.format(f.result()))) + print("done with sayHelloPromise method") + + + \ No newline at end of file diff --git a/samples/rsa/helloimpl_update.py b/samples/rsa/helloimpl_update.py new file mode 100644 index 00000000..7359a5d1 --- /dev/null +++ b/samples/rsa/helloimpl_update.py @@ -0,0 +1,24 @@ +from pelix.ipopo.decorators import ComponentFactory,Instantiate,Validate + +from samples.rsa.helloimpl import HelloImpl + +@ComponentFactory("remote-hello-consumer-update-factory") +@Instantiate("remote-hello-consumer-update") +class RemoteHelloConsumer(object): + + @Validate + def _validate(self,bundle_context): + # first register + registration = bundle_context.register_service("org.eclipse.ecf.examples.hello.IHello", + HelloImpl(), + { 'service.exported.interfaces':'*', + 'service.intents': ['osgi.async'], + 'osgi.basic.timeout':60000 + } + ) + # this will trigger an export update, publish the update via discovery (if any) + # which will propagate to import side + registration.set_properties({ 'one': 'myone', 'two': 2 }) + + + \ No newline at end of file diff --git a/samples/rsa/helloimpl_xmlrpc.py b/samples/rsa/helloimpl_xmlrpc.py index 6a74ca9b..a06ddc37 100644 --- a/samples/rsa/helloimpl_xmlrpc.py +++ b/samples/rsa/helloimpl_xmlrpc.py @@ -12,13 +12,11 @@ # rsa importservice command. from pelix.ipopo.decorators import Instantiate, ComponentFactory, Provides from samples.rsa.helloimpl import HelloImpl + @ComponentFactory('helloimpl-xmlrpc-factory') @Provides('org.eclipse.ecf.examples.hello.IHello') # Provides IHello interface as specified by Java interface. #See IHello service interface @Instantiate('helloimpl-xmlrpc', { 'service.intents': ['osgi.async'], # Required to use osgi.async intent 'osgi.basic.timeout':60000}) class XmlRpcHelloImpl(HelloImpl): - ''' - All method impls handled by HelloImpl superclass. See samples.rsa.helloimpl module. - ''' pass \ No newline at end of file diff --git a/samples/run_rsa_etcd_xmlrpc.py b/samples/run_rsa_etcd_xmlrpc.py index bcf4d674..58d57cec 100644 --- a/samples/run_rsa_etcd_xmlrpc.py +++ b/samples/run_rsa_etcd_xmlrpc.py @@ -59,7 +59,7 @@ def main(): 'pelix.rsa.providers.discovery.etcd', # etcd discovery provider (opt) 'pelix.rsa.topologymanagers.basic', # basic topology manager (opt) 'pelix.rsa.shell', # RSA shell commands (opt) - 'samples.rsa.helloconsumer' ] # Example helloconsumer. Only uses remote proxies + 'samples.rsa.helloconsumer_xmlrpc' ] # Example helloconsumer. Only uses remote proxies # Use the utility method to create, run and delete the framework framework = pelix.create_framework( diff --git a/samples/run_rsa_xmlrpc.py b/samples/run_rsa_xmlrpc.py index caa51299..cc9f20aa 100644 --- a/samples/run_rsa_xmlrpc.py +++ b/samples/run_rsa_xmlrpc.py @@ -55,7 +55,7 @@ def main(): 'pelix.rsa.providers.distribution.xmlrpc', # xmlrpc distribution provider (opt) 'pelix.rsa.topologymanagers.basic', # basic topology manager (opt) 'pelix.rsa.shell', # RSA shell commands (opt) - 'samples.rsa.helloconsumer' ] # Example helloconsumer. Only uses remote proxies + 'samples.rsa.helloconsumer_xmlrpc' ] # Example helloconsumer. Only uses remote proxies # Use the utility method to create, run and delete the framework framework = pelix.create_framework(