diff --git a/pelix/rsa/__init__.py b/pelix/rsa/__init__.py index 28d7fd76..424fc58e 100644 --- a/pelix/rsa/__init__.py +++ b/pelix/rsa/__init__.py @@ -313,6 +313,17 @@ def match_sr(self, svc_ref, cid=None): """ raise Exception("{0}.get_description not implemented".format(self)) + def update(self, properties): + # type: (dictionary) -> EndpointDescription + """ + Updates ExportRegistration with new properties. + + :param properties a dictionary of new properties. May be None. + :return: EndpointDescription for ExportRegistration, or None if not + updated. + """ + raise Exception("{0}.update not implemented".format(self)) + def close(self): # type: () -> None """ @@ -514,6 +525,17 @@ def get_description(self): """ raise Exception("{0}.get_description not implemented".format(self)) + def update(self, endpoint_description): + # type: (EndpointDescription) -> boolean + """ + Update the service properties of the imported service. + + :param endpoint_description: EndpointDescription for updated endpoint. Will not + be None. + :return: True if update completed successfully, False if not. + """ + raise Exception("{0}.update not implemented".format(self)) + def close(self): # type: () -> None """ @@ -737,6 +759,60 @@ def fromexportreg(cls, bundle, export_reg): export_reg.get_description(), ) + @classmethod + def fromexportupdate(cls, bundle, export_reg): + # type: (Bundle, ExportRegistration) -> RemoteServiceAdminEvent + exc = export_reg.get_exception() + if exc: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.EXPORT_ERROR, + bundle, + export_reg.get_export_container_id(), + export_reg.get_remoteservice_id(), + None, + export_reg.get_export_reference(), + None, + export_reg.get_description(), + ) + else: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.EXPORT_UPDATE, + bundle, + export_reg.get_export_container_id(), + export_reg.get_remoteservice_id(), + None, + export_reg.get_export_reference(), + None, + export_reg.get_description(), + ) + + @classmethod + def fromimportupdate(cls, bundle, import_reg): + # type: (Bundle, ImportRegistration) -> RemoteServiceAdminEvent + exc = import_reg.get_exception() + if exc: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.IMPORT_ERROR, + bundle, + import_reg.get_import_container_id(), + import_reg.get_remoteservice_id(), + None, + None, + exc, + import_reg.get_description(), + ) + else: + return RemoteServiceAdminEvent( + RemoteServiceAdminEvent.IMPORT_UPDATE, + bundle, + import_reg.get_import_container_id(), + import_reg.get_remoteservice_id(), + import_reg.get_import_reference(), + None, + None, + import_reg.get_description(), + ) + @classmethod def fromimportunreg(cls, bundle, cid, rsid, import_ref, exception, ed): # type: (Bundle, Tuple[str, str], Tuple[Tuple[str, str], int], ImportReference, Optional[Tuple[Any, Any, Any]], EndpointDescription) -> RemoteServiceAdminEvent diff --git a/pelix/rsa/providers/discovery/__init__.py b/pelix/rsa/providers/discovery/__init__.py index 6b615eae..ab4312c0 100644 --- a/pelix/rsa/providers/discovery/__init__.py +++ b/pelix/rsa/providers/discovery/__init__.py @@ -86,6 +86,27 @@ def advertise_endpoint(self, endpoint_description): if advertise_result: self._add_advertised(endpoint_description, advertise_result) return True + else: + return False + + def update_endpoint(self, updated_ed): + """ + Update a previously advertised endpoint_description. + + :param endpoint_description an instance of EndpointDescription to update. Must not be None. + :return True if advertised, False if not (e.g. it's already been advertised) + """ + endpointid = updated_ed.get_id() + with self._published_endpoints_lock: + if self.get_advertised_endpoint(endpointid) == None: + return False + advertise_result = self._update(updated_ed) + if advertise_result: + self._remove_advertised(endpointid) + self._add_advertised(updated_ed, advertise_result) + return True + else: + return False def unadvertise_endpoint(self, endpointid): """ @@ -151,6 +172,9 @@ def _remove_advertised(self, endpointid): def _advertise(self, endpoint_description): raise Exception("Endpoint._advertise must be overridden by subclasses") + def _update(self, endpoint_description): + raise Exception("Endpoint._update must be overrridden by subclasses") + def _unadvertise(self, advertised): raise Exception( "Endpoint._unadvertise must be overridden by subclasses" @@ -295,6 +319,10 @@ def _get_matching_endpoint_event_listeners(self, ed): result.append((l[0], matching_filter)) return result + def _has_discovered_endpoint(self, ed_id): + with self._discovered_endpoints_lock: + return self._discovered_endpoints.get(ed_id,None) + def _add_discovered_endpoint(self, ed): with self._discovered_endpoints_lock: _logger.debug("_add_discovered_endpoint ed=%s", ed) diff --git a/pelix/rsa/providers/discovery/etcd.py b/pelix/rsa/providers/discovery/etcd.py index 357b056a..01dcf520 100644 --- a/pelix/rsa/providers/discovery/etcd.py +++ b/pelix/rsa/providers/discovery/etcd.py @@ -158,12 +158,8 @@ def _validate_component(self): def _invalidate(self, _): self._disconnect() - # implementation of EndpointAdvertiser service. These methods - # are called when (e.g.) RSA asks us to advertise/unadvertise - # an endpoint_description - def _advertise(self, endpoint_description): + def _write_description(self, endpoint_description): # type: (EndpointDescription) -> etcd.EtcdResult - _logger.debug("advertising ed=%s", endpoint_description) # encode props as string -> string encoded_props = encode_endpoint_props(endpoint_description) # get copy of service props @@ -186,6 +182,19 @@ def _advertise(self, endpoint_description): value=props_json, ) + # implementation of EndpointAdvertiser service. These methods + # are called when (e.g.) RSA asks us to advertise/unadvertise + # an endpoint_description + def _advertise(self, endpoint_description): + # type: (EndpointDescription) -> etcd.EtcdResult + _logger.debug("advertising ed=%s", endpoint_description) + return self._write_description(endpoint_description) + + def _update(self, endpoint_description): + # type: (EndpointDescription) -> etcd.EtcdResult + _logger.debug("updating ed=%s", endpoint_description) + return self._write_description(endpoint_description) + def _unadvertise(self, advertised): # type: (List[EndpointDescription]) -> etcd.EtcdResult _logger.debug("unadvertising ed=%s", advertised[0]) @@ -306,12 +315,45 @@ def _handle_add_nodes(self, nodes): } # decode decoded_props = decode_endpoint_props(raw_props) - ed = EndpointDescription(properties=decoded_props) - # add discovered endpoint to our internal list - self._add_discovered_endpoint(ed) - # dispatch - self._fire_endpoint_event(EndpointEvent.ADDED, ed) - + new_ed = EndpointDescription(properties=decoded_props) + old_ed = self._has_discovered_endpoint(new_ed.get_id()) + if not old_ed: + # add discovered endpoint to our internal list + self._add_discovered_endpoint(new_ed) + # dispatch + self._fire_endpoint_event(EndpointEvent.ADDED, new_ed) + else: + # get timestamp and make sure new one is newer (an update) + old_ts = old_ed.get_timestamp() + new_ts = new_ed.get_timestamp() + if new_ts > old_ts: + self._remove_discovered_endpoint(old_ed.get_id()) + self._add_discovered_endpoint(new_ed) + self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed) + + def _handle_update_nodes(self, nodes): + for node in nodes: + # we only care about properties + node_val = node.value + if node_val: + json_obj = json.loads(node_val) + if isinstance(json_obj, dict): + json_properties = json_obj["properties"] + # get the name and value from each entry + raw_props = { + entry["name"]: entry["value"] + for entry in json_properties + if entry["type"] == "string" + } + # decode + decoded_props = decode_endpoint_props(raw_props) + new_ed = EndpointDescription(properties=decoded_props) + old_ed = self._remove_discovered_endpoint(new_ed.get_id()) + if old_ed: + self._add_discovered_endpoint(new_ed) + # dispatch + self._fire_endpoint_event(EndpointEvent.MODIFIED, new_ed) + def _handle_remove_node(self, endpointid): ed = self._remove_discovered_endpoint(endpointid) if ed: @@ -352,26 +394,26 @@ def _watch_job(self): # we are done return else: - if action != "update": - # split id into [sessionid] or [sessionid,endpointid] - splitid = key[len(self._top_path) + 1 :].split("/") - sessionid = splitid[0] - if self._sessionid != sessionid: - if isinstance(splitid, list): - endpointid = splitid[len(splitid) - 1] - else: - endpointid = None - if not endpointid: - if action in self.REMOVE_ACTIONS: - # other session deleted - self._handle_remove_dir(sessionid) - elif action in self.ADD_ACTIONS: - self._handle_add_dir(result) - else: - if action in self.REMOVE_ACTIONS: - self._handle_remove_node(endpointid) - elif action in self.ADD_ACTIONS: - self._handle_add_nodes([result]) + # split id into [sessionid] or [sessionid,endpointid] + splitid = key[len(self._top_path) + 1 :].split("/") + sessionid = splitid[0] + if self._sessionid != sessionid: + if isinstance(splitid, list): + endpointid = splitid[len(splitid) - 1] + else: + endpointid = None + if not endpointid: + if action in self.REMOVE_ACTIONS: + # other session deleted + self._handle_remove_dir(sessionid) + elif action in self.ADD_ACTIONS: + self._handle_add_dir(result) + else: + if action in self.REMOVE_ACTIONS: + self._handle_remove_node(endpointid) + elif action in self.ADD_ACTIONS: + self._handle_add_nodes([result]) + except: _logger.exception("watch_job:Exception in watch loop") diff --git a/pelix/rsa/providers/distribution/__init__.py b/pelix/rsa/providers/distribution/__init__.py index cb1bde57..f09e939e 100644 --- a/pelix/rsa/providers/distribution/__init__.py +++ b/pelix/rsa/providers/distribution/__init__.py @@ -173,7 +173,9 @@ def _match_required_configs(self, required_configs): self._supported_configs to make sure that all required configs are present for this distribution provider. """ - if required_configs is None or not self._supported_configs: + if not required_configs: + return True + if not self._supported_configs: return False return len( [x for x in required_configs if x in self._supported_configs] diff --git a/pelix/rsa/remoteserviceadmin.py b/pelix/rsa/remoteserviceadmin.py index c2762f0a..6711e890 100644 --- a/pelix/rsa/remoteserviceadmin.py +++ b/pelix/rsa/remoteserviceadmin.py @@ -931,7 +931,7 @@ def get_exception(self): with self.__lock: return ( self.__updateexception - if self.__closed + if self.__updateexception or self.__closed else self.__exportref.get_exception() ) @@ -945,6 +945,34 @@ def get_description(self): with self.__lock: return None if self.__closed else self.__exportref.get_description() + def update(self, properties): + # type: (dictionary) -> Optional[EndpointDescription] + with self.__lock: + if self.__closed: + self.__updateexception = ValueError("Update failed since ExportRegistration already closed") + return None + # if properties is set then copy + props = properties.copy() if properties else dict() + try: + updated_ed = self.__exportref.update(props) + except Exception as e: + self.__updateexception = e + return None + + if not updated_ed: + self.__updatexception = ValueError("Update failed because ExportEndpoint was None") + return None + + self.__updateexception = None + if self.__rsa: + self.__rsa._publish_event( + RemoteServiceAdminEvent.fromexportupdate( + self.__rsa._get_bundle(), + self + ) + ) + return updated_ed + def close(self): """ Cleans up the export endpoint @@ -1237,6 +1265,7 @@ def __init__(self, endpoint=None, exception=None, errored=None): endpoint._add_import_registration(self) self.__importref = ImportReferenceImpl.fromendpoint(endpoint) self.__closed = False + self.__updateexception = None self.__lock = threading.RLock() def _import_endpoint(self): @@ -1288,13 +1317,37 @@ def get_reference(self): def get_exception(self): # type: () -> Optional[Tuple[Any, Any, Any]] with self.__lock: - return None if self.__closed else self.__importref.get_exception() - + return ( + self.__updateexception + if self.__updateexception or self.__closed + else self.__importref.get_exception() + ) + def get_description(self): # type: () -> Optional[EndpointDescription] with self.__lock: return None if self.__closed else self.__importref.get_description() + def update(self, endpoint_description): + # type: (EndpointDescription) -> boolean + with self.__lock: + if self.__closed: + self.__updateexception = ValueError("Update failed since ImportRegistration already closed") + return False + try: + self.__importref.update(endpoint_description) + except Exception as e: + self.__updateexception = e + return False + if self.__rsa: + self.__rsa._publish_event(RemoteServiceAdminEvent.fromimportupdate( + self.__rsa._get_bundle(), + self) + ) + return True + else: + return False + def close(self): publish = False importerid = rsid = import_ref = exception = ed = None diff --git a/pelix/rsa/topologymanagers/__init__.py b/pelix/rsa/topologymanagers/__init__.py index 0aba9bd7..a295325b 100644 --- a/pelix/rsa/topologymanagers/__init__.py +++ b/pelix/rsa/topologymanagers/__init__.py @@ -62,6 +62,7 @@ ) from pelix.rsa.endpointdescription import EndpointDescription +from pelix.rsa.endpointdescription import EndpointDescription # ------------------------------------------------------------------------------ # Module version @@ -113,7 +114,14 @@ def _unimport_removed_endpoint(self, endpoint_description): for import_reg in import_regs: if import_reg.match_ed(endpoint_description): import_reg.close() - + + def _update_imported_endpoint(self, endpoint_description): + # type: (EndpointDescription) -> None + import_regs = self._rsa._get_import_regs() + for import_reg in import_regs: + if import_reg.match_ed(endpoint_description): + import_reg.update(endpoint_description) + def _handle_service_registered(self, service_ref): # type: (ServiceReference) -> None exp_intfs = get_exported_interfaces(service_ref) @@ -138,12 +146,23 @@ def _handle_service_unregistering(self, service_ref): export_reg.close() def _handle_service_modified(self, service_ref): - # type: (ServiceReference) -> None + # type: (ServiceReference) -> EndpointDescription export_regs = self._rsa._get_export_regs() if export_regs: for export_reg in export_regs: if export_reg.match_sr(service_ref): - export_reg.get_export_reference().update({}) + _logger.debug( + "_handle_service_modified. updating " + "export_registration for service reference=%s", + service_ref + ) + + # actually update the export_reg here + if not export_reg.update(None): + _logger.warning( + "_handle_service_modified. updating" + "update for service_ref=%s failed", service_ref + ) def _handle_event(self, service_event): # type: (ServiceEvent) -> None @@ -173,6 +192,17 @@ def _advertise_endpoint(self, ed): adv, ed ) + def _update_endpoint(self, ed): + # type: (EndpointDescription) -> None + for adv in self._advertisers: + try: + adv.update_endpoint(ed) + except: + _logger.exception( + "Exception in update_endpoint for advertiser=%s " + "endpoint=%s", adv, ed + ) + def _unadvertise_endpoint(self, ed): # type: (EndpointDescription) -> None for adv in self._advertisers: @@ -192,6 +222,8 @@ def remote_admin_event(self, event): self._advertise_endpoint(event.get_description()) elif kind == RemoteServiceAdminEvent.EXPORT_UNREGISTRATION: self._unadvertise_endpoint(event.get_description()) + elif kind == RemoteServiceAdminEvent.EXPORT_UPDATE: + self._update_endpoint(event.get_description()) def endpoint_changed(self, endpoint_event, matched_filter): # type: (EndpointEvent, Any) -> None diff --git a/pelix/rsa/topologymanagers/basic.py b/pelix/rsa/topologymanagers/basic.py index 40f61d2b..c23221b9 100644 --- a/pelix/rsa/topologymanagers/basic.py +++ b/pelix/rsa/topologymanagers/basic.py @@ -115,6 +115,12 @@ def endpoint_changed(self, endpoint_event, matched_filter): "BasicTopologyManager: endpoint removed. endpoint.id=%s", ed_id ) + elif event_type == EndpointEvent.MODIFIED: + self._update_imported_endpoint(ed) + _logger.debug( + "BasicTopologyManager: endpoint updated. endpoint.id=%s", + ed_id + ) def remote_admin_event(self, event): # type: (RemoteServiceAdminEvent) -> None diff --git a/samples/rsa/helloconsumer_xmlrpc.py b/samples/rsa/helloconsumer_xmlrpc.py new file mode 100644 index 00000000..0cf59bc0 --- /dev/null +++ b/samples/rsa/helloconsumer_xmlrpc.py @@ -0,0 +1,41 @@ +# This remote service consumer requires some java or python distribution provider +# (e.g. py4j or xmlrpc) and an implementation service (java or python) that +# exports the org.eclipse.ecf.examples.hello.IHello service interface: +# https://github.com/ECF/AsyncRemoteServiceExamples/blob/master/hello/org.eclipse.ecf.examples.hello.javahost/src/org/eclipse/ecf/examples/hello/javahost/HelloImpl.java +# +# When the IHello remote service impl is discovered and imported via discovery provider +# and topology manager, or imported via RSA importservice command, a IHello proxy will +# be injected by ipopo into the _helloservice field of the RemoteHelloConsumer instance, +# and _validate will then be called. With the implementation below, the _validate +# method then calls the proxy's IHello.sayHello, sayHelloAsync, and sayHelloPromise on +# the remote service. +from pelix.ipopo.decorators import ComponentFactory,Instantiate,Requires,Validate + +from concurrent.futures import ThreadPoolExecutor + +@ComponentFactory("remote-hello-consumer-factory") +# The '(service.imported=*)' filter only allows remote services to be injected +@Requires("_helloservice", "org.eclipse.ecf.examples.hello.IHello", False, False, "(service.imported=*)", False) +@Instantiate("remote-hello-consumer") +class RemoteHelloConsumer(object): + + def __init__(self): + self._helloservice = None + self._name = 'Python' + self._msg = 'Hello Java' + self._executor = ThreadPoolExecutor() + + @Validate + def _validate(self,bundle_context): + # call it! + resp = self._helloservice.sayHello(self._name+'Sync', self._msg) + print("{0} IHello service consumer received sync response: {1}".format(self._name,resp)) + # call sayHelloAsync which returns Future and we add lambda to print the result when done + self._executor.submit(self._helloservice.sayHelloAsync,self._name+'Async', self._msg).add_done_callback(lambda f: print('async response: {0}'.format(f.result()))) + print("done with sayHelloAsync method") + # call sayHelloAsync which returns Future and we add lambda to print the result when done + self._executor.submit(self._helloservice.sayHelloPromise,self._name+'Promise', self._msg).add_done_callback(lambda f: print('promise response: {0}'.format(f.result()))) + print("done with sayHelloPromise method") + + + \ No newline at end of file diff --git a/samples/rsa/helloimpl_update.py b/samples/rsa/helloimpl_update.py new file mode 100644 index 00000000..7359a5d1 --- /dev/null +++ b/samples/rsa/helloimpl_update.py @@ -0,0 +1,24 @@ +from pelix.ipopo.decorators import ComponentFactory,Instantiate,Validate + +from samples.rsa.helloimpl import HelloImpl + +@ComponentFactory("remote-hello-consumer-update-factory") +@Instantiate("remote-hello-consumer-update") +class RemoteHelloConsumer(object): + + @Validate + def _validate(self,bundle_context): + # first register + registration = bundle_context.register_service("org.eclipse.ecf.examples.hello.IHello", + HelloImpl(), + { 'service.exported.interfaces':'*', + 'service.intents': ['osgi.async'], + 'osgi.basic.timeout':60000 + } + ) + # this will trigger an export update, publish the update via discovery (if any) + # which will propagate to import side + registration.set_properties({ 'one': 'myone', 'two': 2 }) + + + \ No newline at end of file diff --git a/samples/rsa/helloimpl_xmlrpc.py b/samples/rsa/helloimpl_xmlrpc.py index 6a74ca9b..a06ddc37 100644 --- a/samples/rsa/helloimpl_xmlrpc.py +++ b/samples/rsa/helloimpl_xmlrpc.py @@ -12,13 +12,11 @@ # rsa importservice command. from pelix.ipopo.decorators import Instantiate, ComponentFactory, Provides from samples.rsa.helloimpl import HelloImpl + @ComponentFactory('helloimpl-xmlrpc-factory') @Provides('org.eclipse.ecf.examples.hello.IHello') # Provides IHello interface as specified by Java interface. #See IHello service interface @Instantiate('helloimpl-xmlrpc', { 'service.intents': ['osgi.async'], # Required to use osgi.async intent 'osgi.basic.timeout':60000}) class XmlRpcHelloImpl(HelloImpl): - ''' - All method impls handled by HelloImpl superclass. See samples.rsa.helloimpl module. - ''' pass \ No newline at end of file diff --git a/samples/run_rsa_etcd_xmlrpc.py b/samples/run_rsa_etcd_xmlrpc.py index bcf4d674..58d57cec 100644 --- a/samples/run_rsa_etcd_xmlrpc.py +++ b/samples/run_rsa_etcd_xmlrpc.py @@ -59,7 +59,7 @@ def main(): 'pelix.rsa.providers.discovery.etcd', # etcd discovery provider (opt) 'pelix.rsa.topologymanagers.basic', # basic topology manager (opt) 'pelix.rsa.shell', # RSA shell commands (opt) - 'samples.rsa.helloconsumer' ] # Example helloconsumer. Only uses remote proxies + 'samples.rsa.helloconsumer_xmlrpc' ] # Example helloconsumer. Only uses remote proxies # Use the utility method to create, run and delete the framework framework = pelix.create_framework( diff --git a/samples/run_rsa_xmlrpc.py b/samples/run_rsa_xmlrpc.py index caa51299..cc9f20aa 100644 --- a/samples/run_rsa_xmlrpc.py +++ b/samples/run_rsa_xmlrpc.py @@ -55,7 +55,7 @@ def main(): 'pelix.rsa.providers.distribution.xmlrpc', # xmlrpc distribution provider (opt) 'pelix.rsa.topologymanagers.basic', # basic topology manager (opt) 'pelix.rsa.shell', # RSA shell commands (opt) - 'samples.rsa.helloconsumer' ] # Example helloconsumer. Only uses remote proxies + 'samples.rsa.helloconsumer_xmlrpc' ] # Example helloconsumer. Only uses remote proxies # Use the utility method to create, run and delete the framework framework = pelix.create_framework(