diff --git a/pelix/framework.py b/pelix/framework.py index 6f02ebe..1cff60f 100644 --- a/pelix/framework.py +++ b/pelix/framework.py @@ -222,16 +222,16 @@ def __get_activator_method(self, method_name: str): ) return getattr(activator, method_name, None) - def _fire_bundle_event(self, kind: int) -> None: + async def _fire_bundle_event(self, kind: int) -> None: """ - Fires a bundle event of the given kind + Async Fires a bundle event of the given kind :param kind: Kind of event """ - return self.__framework._dispatcher.fire_bundle_event(BundleEvent(kind, self)) + return await self.__framework._dispatcher.fire_bundle_event(BundleEvent(kind, self)) async def _registered_service(self, registration: Type[ServiceRegistration]) -> None: """ - Bundle is notified by the framework that a service has been registered + Async Bundle is notified by the framework that a service has been registered in the name of this bundle. :param registration: The service registration object """ @@ -240,7 +240,7 @@ async def _registered_service(self, registration: Type[ServiceRegistration]) -> async def _unregistered_service(self, registration: Type[ServiceRegistration]) -> None: """ - Bundle is notified by the framework that a service has been + Async Bundle is notified by the framework that a service has been unregistered in the name of this bundle. :param registration: The service registration object """ @@ -275,9 +275,9 @@ def get_module(self) -> types.ModuleType: """ return self.__module - def get_registered_services(self) -> List[ServiceReference]: + async def get_registered_services(self) -> List[ServiceReference]: """ - Returns this bundle's ServiceReference list for all services it has + Async Returns this bundle's ServiceReference list for all services it has registered or an empty list The list is valid at the time of the call to this method, however, as the Framework is a very dynamic environment, services can be modified @@ -290,11 +290,11 @@ def get_registered_services(self) -> List[ServiceReference]: "Can't call 'get_registered_services' on an " "uninstalled bundle" ) - return self.__framework._registry.get_bundle_registered_services(self) + return await self.__framework._registry.get_bundle_registered_services(self) - def get_services_in_use(self) -> List[ServiceReference]: + async def get_services_in_use(self) -> List[ServiceReference]: """ - Returns this bundle's ServiceReference list for all services it is + Async Returns this bundle's ServiceReference list for all services it is using or an empty list. A bundle is considered to be using a service if its use count for that service is greater than zero. @@ -308,7 +308,7 @@ def get_services_in_use(self) -> List[ServiceReference]: raise BundleException( "Can't call 'get_services_in_use' on an uninstalled bundle" ) - return self.__framework._registry.get_bundle_imported_services(self) + return await self.__framework._registry.get_bundle_imported_services(self) def get_state(self) -> int: """ @@ -345,7 +345,7 @@ def get_version(self) -> str: async def start(self) -> None: """ - Starts the bundle. Does nothing if the bundle is already starting or + Async Starts the bundle. Does nothing if the bundle is already starting or active. :raise BundleException: The framework is not yet started or the bundle activator failed. @@ -361,23 +361,19 @@ async def start(self) -> None: # Already started bundle, do nothing return - # Get Event Loop - loop = asyncio.get_running_loop() - # Store the bundle current state previous_state = self._state # Starting... self._state = Bundle.STARTING - self._fire_bundle_event(BundleEvent.STARTING) + await self._fire_bundle_event(BundleEvent.STARTING) # Retriving Method starter = self.__get_activator_method("start") # Call the activator, if any if starter is not None: try: - method_starter: asyncio.Task = asyncio.create_task(starter(self.__context)) - await method_starter + await starter(self.__context) except (FrameworkException, BundleException): # Restore previous state @@ -400,11 +396,11 @@ async def start(self) -> None: # Bundle is now active self._state = Bundle.ACTIVE - self._fire_bundle_event(BundleEvent.STARTED) + await self._fire_bundle_event(BundleEvent.STARTED) async def stop(self) -> None: """ - Stops the bundle. Does nothing if the bundle is already stopped. + Async Stops the bundle. Does nothing if the bundle is already stopped. :raise BundleException: The bundle activator failed. """ if self._state != Bundle.ACTIVE: @@ -412,23 +408,22 @@ async def stop(self) -> None: return exception = None - async with self._lock: - # Get Event Loop - loop = asyncio.get_running_loop() + # Get EventLoop + loop = asyncio.get_running_loop() + async with self._lock: # Store the bundle current state previous_state = self._state # Stopping... self._state = Bundle.STOPPING - self._fire_bundle_event(BundleEvent.STOPPING) + await self._fire_bundle_event(BundleEvent.STOPPING) # Call the activator, if any stopper = self.__get_activator_method("stop") if stopper is not None: try: - method_stopper: asyncio.Task = asyncio.create_task(stopper(self.__context)) - await method_stopper + await stopper(self.__context) except (FrameworkException, BundleException) as ex: # Restore previous state @@ -447,24 +442,22 @@ async def stop(self) -> None: exception = BundleException(ex) # Hide remaining services - self.__framework._hide_bundle_services(self) + await loop.create_task(self.__framework._hide_bundle_services(self)) # Intermediate bundle event : activator should have cleaned up # everything, but some element could stay (iPOPO components, ...) - self._fire_bundle_event(BundleEvent.STOPPING_PRECLEAN) + await self._fire_bundle_event(BundleEvent.STOPPING_PRECLEAN) # Remove remaining services (the hard way) - unregister_services = asyncio.create_task( - self.__unregister_services() - ) - await unregister_services + await self.__unregister_services() # Cleanup service usages - self.__framework._unget_used_services(self) + await loop.create_task(self.__framework._unget_used_services(self)) # Bundle is now stopped and all its services have been unregistered self._state = Bundle.RESOLVED - self._fire_bundle_event(BundleEvent.STOPPED) + # Schedule STOPPED BundleEvent and await until it's done + await self._fire_bundle_event(BundleEvent.STOPPED) # Raise the exception, if any # pylint: disable=E0702 @@ -474,18 +467,21 @@ async def stop(self) -> None: async def __unregister_services(self) -> None: """ - Unregisters all bundle services + Async Unregisters all bundle services """ # Copy the services list, as it will be modified during the process async with self.__registration_lock: registered_services = self.__registered_services.copy() - workload = [registration.unregister() for registration in registered_services] - try: - await asyncio.gather(*workload) - except BundleException: - # Ignore errors at this level - pass + loop = asyncio.get_running_loop() + # Schedule the unregistration of all registered_services + registered_services = [loop.create_task(registration.unregister()) for registration in registered_services] + for unregistration in registered_services: + try: + await unregistration + except Exception: + # Ignore errors at this level + pass if self.__registered_services: _logger.warning("Not all services have been unregistered...") @@ -496,7 +492,7 @@ async def __unregister_services(self) -> None: async def uninstall(self) -> None: """ - Uninstalls the bundle + Async Uninstalls the bundle """ async with self._lock: if self._state == Bundle.ACTIVE: @@ -506,28 +502,25 @@ async def uninstall(self) -> None: self._state = Bundle.UNINSTALLED # Call the framework - uninstall_bundle = asyncio.create_task( - self.__framework.uninstall_bundle(self) - ) - await uninstall_bundle + await self.__framework.uninstall_bundle(self) async def update(self) -> None: """ - Updates the bundle + Async Updates the bundle """ async with self._lock: # Was it active ? restart = self._state == Bundle.ACTIVE # Send the update event - self._fire_bundle_event(BundleEvent.UPDATE_BEGIN) + await self._fire_bundle_event(BundleEvent.UPDATE_BEGIN) try: # Stop the bundle await self.stop() except: # Something wrong occurred, notify listeners - self._fire_bundle_event(BundleEvent.UPDATE_FAILED) + await self._fire_bundle_event(BundleEvent.UPDATE_FAILED) raise # Change the source file age @@ -590,11 +583,11 @@ async def update(self) -> None: await self.start() except: # Something wrong occurred, notify listeners - self._fire_bundle_event(BundleEvent.UPDATE_FAILED) + await self._fire_bundle_event(BundleEvent.UPDATE_FAILED) raise # Bundle update finished - self._fire_bundle_event(BundleEvent.UPDATED) + await self._fire_bundle_event(BundleEvent.UPDATED) # ------------------------------------------------------------------------------ @@ -658,7 +651,7 @@ def __init__(self, properties=None) -> None: async def add_property(self, name: str, value: object) -> bool: """ - Adds a property to the framework **if it is not yet set**. + Async Adds a property to the framework **if it is not yet set**. If the property already exists (same name), then nothing is done. Properties can't be updated. :param name: The property name @@ -673,11 +666,11 @@ async def add_property(self, name: str, value: object) -> bool: self.__properties[name] = value return True - def find_service_references( + async def find_service_references( self, clazz: Optional[str] = None, ldap_filter: Optional[str] = None, only_one: bool = False ) -> Optional[List[ServiceReference]]: """ - Finds all services references matching the given filter. + Async Finds all services references matching the given filter. :param clazz: Class implemented by the service :param ldap_filter: Service filter :param only_one: Return the first matching service reference only @@ -685,13 +678,13 @@ def find_service_references( :raise BundleException: An error occurred looking for service references """ - return self._registry.find_service_references( + return await self._registry.find_service_references( clazz, ldap_filter, only_one ) async def get_bundle_by_id(self, bundle_id: int) -> Union[Bundle, Type[Bundle]]: """ - Retrieves the bundle with the given ID + Async Retrieves the bundle with the given ID :param bundle_id: ID of an installed bundle :return: The requested bundle :raise BundleException: The ID is invalid @@ -708,7 +701,7 @@ async def get_bundle_by_id(self, bundle_id: int) -> Union[Bundle, Type[Bundle]]: async def get_bundle_by_name(self, bundle_name: str) -> Optional[Bundle]: """ - Retrieves the bundle with the given name + Async Retrieves the bundle with the given name :param bundle_name: Name of the bundle to look for :return: The requested bundle, None if not found """ @@ -731,7 +724,7 @@ async def get_bundle_by_name(self, bundle_name: str) -> Optional[Bundle]: async def get_bundles(self) -> List[Bundle]: """ - Returns the list of all installed bundles + Async Returns the list of all installed bundles :return: the list of all installed bundles """ async with self.__bundles_lock: @@ -742,14 +735,14 @@ async def get_bundles(self) -> List[Bundle]: async def get_properties(self) -> dict: """ - Retrieves a copy of the stored framework properties. + Async Retrieves a copy of the stored framework properties. """ async with self.__properties_lock: return self.__properties.copy() async def get_property(self, name: str) -> object: """ - Retrieves a framework or system property. As framework properties don't + Async Retrieves a framework or system property. As framework properties don't change while it's running, this method don't need to be protected. :param name: The property name """ @@ -758,15 +751,15 @@ async def get_property(self, name: str) -> object: async def get_property_keys(self) -> tuple: """ - Returns an array of the keys in the properties of the service + Async Returns an array of the keys in the properties of the service :return: An array of property keys. """ async with self.__properties_lock: return tuple(self.__properties.keys()) - def get_service(self, bundle: Bundle, reference: ServiceReference) -> Any: + async def get_service(self, bundle: Bundle, reference: ServiceReference) -> Any: """ - Retrieves the service corresponding to the given reference + Async Retrieves the service corresponding to the given reference :param bundle: The bundle requiring the service :param reference: A service reference :return: The requested service @@ -778,11 +771,12 @@ def get_service(self, bundle: Bundle, reference: ServiceReference) -> Any: elif not isinstance(reference, ServiceReference): raise TypeError("Second argument must be a ServiceReference object") - try: + if reference in self.__unregistering_services: # Unregistering service, just give it return self.__unregistering_services[reference] - except KeyError: - return self._registry.get_service(bundle, reference) + else: + # Await registry.get_service result + return await self._registry.get_service(bundle, reference) def _get_service_objects(self, bundle: Bundle, reference: ServiceReference): """ @@ -803,7 +797,7 @@ def get_symbolic_name(self) -> str: async def install_bundle(self, name: str, path: str = None) -> Bundle: """ - Installs the bundle with the given name + Async Installs the bundle with the given name *Note:* Before Pelix 0.5.0, this method returned the ID of the installed bundle, instead of the Bundle object. **WARNING:** The behavior of the loading process is subject to changes, @@ -827,10 +821,10 @@ async def install_bundle(self, name: str, path: str = None) -> Bundle: # Use the given path in priority sys.path.insert(0, path) - try: + if name in sys.modules: # The module has already been loaded module_ = sys.modules[name] - except KeyError: + else: # Load the module # __import__(name) -> package level # import_module -> module level @@ -864,12 +858,13 @@ async def install_bundle(self, name: str, path: str = None) -> Bundle: # Fire the bundle installed event event = BundleEvent(BundleEvent.INSTALLED, bundle) - self._dispatcher.fire_bundle_event(event) + # Schedule dispatcher.fire_bundle_event coroutine and await until it's done + await self._dispatcher.fire_bundle_event(event) return bundle async def install_package(self, path: str, recursive: bool = False, prefix: str = None) -> tuple: """ - Installs all the modules found in the given package + Async Installs all the modules found in the given package :param path: Path of the package (folder) :param recursive: If True, install the sub-packages too :param prefix: (**internal**) Prefix for all found modules @@ -925,7 +920,7 @@ def visitor(fullname, is_package, module_path): async def install_visiting(self, path: str, visitor, prefix: str = None) -> tuple: """ - Installs all the modules found in the given path if they are accepted + Async Installs all the modules found in the given path if they are accepted by the visitor. The visitor must be a callable accepting 3 parameters: * fullname: The full name of the module @@ -1006,7 +1001,7 @@ async def register_service( prototype: bool = False, ) -> ServiceRegistration: """ - Registers a service and calls the listeners + Async Registers a service and calls the listeners :param bundle: The bundle registering the service :param clazz: Name(s) of the interface(s) implemented by service :param service: The service to register @@ -1049,29 +1044,29 @@ async def register_service( # Class OK classes.append(svc_clazz) - # Make the service registration - registration = self._registry.register( - bundle, classes, properties, service, factory, prototype - ) + # Get EventLoop + loop = asyncio.get_running_loop() - # Update the bundle registration information - update_registration: asyncio.Task = asyncio.create_task( - bundle._registered_service(registration) + # Schedule the service registration and await the ServiceRegistation Instance + registration: ServiceRegistration = await loop.create_task( + self._registry.register(bundle, classes, properties, service, factory, prototype) ) - await update_registration + + # Schedule the Update of the bundle registration information and await it's completition + await bundle._registered_service(registration) if send_event: # Call the listeners event = ServiceEvent( ServiceEvent.REGISTERED, registration.get_reference() ) - self._dispatcher.fire_service_event(event) + await self._dispatcher.fire_service_event(event) return registration async def start(self) -> bool: """ - Starts the framework + Async Starts the framework :return: True if the bundle has been started, False if it was already running :raise BundleException: A bundle failed to start @@ -1086,14 +1081,16 @@ async def start(self) -> bool: # Starting... self._state = Bundle.STARTING - self._dispatcher.fire_bundle_event( - BundleEvent(BundleEvent.STARTING, self) - ) + # Schedule _dispatcher.fire_bundle_event coroutine and await it's completition + await self._dispatcher.fire_bundle_event(BundleEvent(BundleEvent.STARTING, self)) + # Get Event Loop + loop = asyncio.get_running_loop() # Start all registered bundles (use a copy, just in case...) - for bundle in self.__bundles.copy().values(): + bundle_start = {bundle : loop.create_task(bundle.start()) for bundle in self.__bundles.copy().values()} + for bundle in bundle_start: try: - await bundle.start() + await bundle_start[bundle] except FrameworkException as ex: # Important error _logger.exception( @@ -1114,7 +1111,7 @@ async def start(self) -> bool: async def stop(self) -> bool: """ - Stops the framework + Async Stops the framework :return: True if the framework stopped, False it wasn't running """ async with self._lock: @@ -1122,19 +1119,23 @@ async def stop(self) -> bool: # Invalid state return False - + # Get Event Loop + loop = asyncio.get_running_loop() # Hide all services (they will be deleted by bundle.stop()) - for bundle in self.__bundles.values(): - self._registry.hide_bundle_services(bundle) + hide_bundle_services = [ + loop.create_task(self._registry.hide_bundle_services(bundle)) for bundle in self.__bundles.values() + ] + for services in hide_bundle_services: + await services # Stopping... self._state = Bundle.STOPPING - self._dispatcher.fire_bundle_event( - BundleEvent(BundleEvent.STOPPING, self) - ) - # Notify listeners that the bundle is stopping - self._dispatcher.fire_framework_stopping() + # Schedule _dispatcher.fire_bundle_event coroutine and await it's completition + await self._dispatcher.fire_bundle_event(BundleEvent(BundleEvent.STOPPING, self)) + + # Notify listeners that the bundle is stopping nad await until the task it's done + await self._dispatcher.fire_framework_stopping() bid = self.__next_bundle_id - 1 while bid > 0: @@ -1157,20 +1158,18 @@ async def stop(self) -> bool: # Framework is now stopped self._state = Bundle.RESOLVED - self._dispatcher.fire_bundle_event( - BundleEvent(BundleEvent.STOPPED, self) - ) + await self._dispatcher.fire_bundle_event(BundleEvent(BundleEvent.STOPPED, self)) # All bundles have been stopped, release "wait_for_stop" self._fw_stop_event.set() # Force the registry clean up - self._registry.clear() + await self._registry.clear() return True async def delete(self, force: bool = False) -> bool: """ - Deletes the current framework + Async Deletes the current framework :param force: If True, stops the framework before deleting it :return: True if the framework has been delete, False if is couldn't """ @@ -1194,7 +1193,7 @@ def uninstall(self) -> None: async def uninstall_bundle(self, bundle: Bundle) -> None: """ - Ends the uninstallation of the given bundle (must be called by Bundle) + Async Ends the uninstallation of the given bundle (must be called by Bundle) :param bundle: The bundle to uninstall :raise BundleException: Invalid bundle """ @@ -1211,9 +1210,7 @@ async def uninstall_bundle(self, bundle: Bundle) -> None: raise BundleException("Invalid bundle {0}".format(bundle)) # Notify listeners - self._dispatcher.fire_bundle_event( - BundleEvent(BundleEvent.UNINSTALLED, bundle) - ) + await self._dispatcher.fire_bundle_event(BundleEvent(BundleEvent.UNINSTALLED, bundle)) # Remove it from the dictionary del self.__bundles[bundle_id] @@ -1221,24 +1218,22 @@ async def uninstall_bundle(self, bundle: Bundle) -> None: # Remove it from the system => avoid unintended behaviors and # forces a complete module reload if it is re-installed name = bundle.get_symbolic_name() - try: + if name in sys.modules: del sys.modules[name] - except KeyError: - # Ignore - pass - try: - # Clear reference in parent - parent, basename = name.rsplit(".", 1) - if parent: - delattr(sys.modules[parent], basename) - except (KeyError, AttributeError, ValueError): - # Ignore errors - pass + # Clear reference in parent + parent, basename = name.rsplit(".", 1) + if parent: + if parent in sys.modules: + try: + delattr(sys.modules[parent], basename) + except (AttributeError, ValueError): + # Ignore errors + pass async def unregister_service(self, registration: ServiceRegistration) -> bool: """ - Unregisters the given service + Async Unregisters the given service :param registration: A ServiceRegistration to the service to unregister :raise BundleException: Invalid reference """ @@ -1246,44 +1241,41 @@ async def unregister_service(self, registration: ServiceRegistration) -> bool: reference = registration.get_reference() # Remove the service from the registry - svc_instance = self._registry.unregister(reference) + svc_instance = await self._registry.unregister(reference) # Keep a track of the unregistering reference self.__unregistering_services[reference] = svc_instance # Call the listeners event = ServiceEvent(ServiceEvent.UNREGISTERING, reference) - self._dispatcher.fire_service_event(event) + await self._dispatcher.fire_service_event(event) # Update the bundle registration information - bundle = reference.get_bundle() - update_registration: asyncio.Task = asyncio.create_task( - bundle._unregistered_service(registration) - ) - await update_registration + bundle: Bundle = reference.get_bundle() + await bundle._unregistered_service(registration) # Remove the unregistering reference del self.__unregistering_services[reference] return True - def _hide_bundle_services(self, bundle: Bundle) -> List[ServiceReference]: + async def _hide_bundle_services(self, bundle: Bundle) -> List[ServiceReference]: """ - Hides the services of the given bundle in the service registry + Async Hides the services of the given bundle in the service registry :param bundle: The bundle providing services :return: The references of the hidden services """ - return self._registry.hide_bundle_services(bundle) + return await self._registry.hide_bundle_services(bundle) - def _unget_used_services(self, bundle: Bundle) -> None: + async def _unget_used_services(self, bundle: Bundle) -> None: """ - Cleans up all service usages of the given bundle + Async Cleans up all service usages of the given bundle :param bundle: Bundle to be cleaned up """ - self._registry.unget_used_services(bundle) + await self._registry.unget_used_services(bundle) async def update(self) -> None: """ - Stops and starts the framework, if the framework is active. + Async Stops and starts the framework, if the framework is active. :raise BundleException: Something wrong occurred while stopping or starting the framework. """ @@ -1294,7 +1286,7 @@ async def update(self) -> None: async def wait_for_stop(self, timeout: Optional[int] = None) -> bool: """ - Waits for the framework to stop. Does nothing if the framework bundle + Async Waits for the framework to stop. Does nothing if the framework bundle is not in ACTIVE state. Uses a threading.Condition object :param timeout: The maximum time to wait (in seconds) @@ -1331,11 +1323,11 @@ def __init__(self, registry: ServiceRegistry, bundle: Bundle, svc_ref: ServiceRe self.__bundle = bundle self.__reference = svc_ref - def get_service(self) -> Any: + async def get_service(self) -> Any: """ - Returns a service object for the associated service. + Async Returns a service object for the associated service. """ - return self.__registry.get_service(self.__bundle, self.__reference) + return await self.__registry.get_service(self.__bundle, self.__reference) def get_service_reference(self) -> ServiceReference: """ @@ -1345,13 +1337,13 @@ def get_service_reference(self) -> ServiceReference: """ return self.__reference - def unget_service(self, service: Any) -> bool: + async def unget_service(self, service: Any) -> bool: """ - Releases a service object for the associated service. + Async Releases a service object for the associated service. :param service: An instance of a service returned by ``get_service()`` :return: True if the bundle usage has been removed """ - return self.__registry.unget_service( + return await self.__registry.unget_service( self.__bundle, self.__reference, service ) @@ -1378,9 +1370,9 @@ def __str__(self) -> str: """ return "BundleContext({0})".format(self.__bundle) - def add_bundle_listener(self, listener) -> bool: + async def add_bundle_listener(self, listener) -> bool: """ - Registers a bundle listener, which will be notified each time a bundle + Async Registers a bundle listener, which will be notified each time a bundle is installed, started, stopped or updated. The listener must be a callable accepting a single parameter:\ * **event** -- The description of the event @@ -1389,11 +1381,11 @@ def add_bundle_listener(self, listener) -> bool: :return: True if the listener has been registered, False if it already was """ - return self.__framework._dispatcher.add_bundle_listener(listener) + return await self.__framework._dispatcher.add_bundle_listener(listener) - def add_framework_stop_listener(self, listener) -> bool: + async def add_framework_stop_listener(self, listener) -> bool: """ - Registers a listener that will be called back right before the + Async Registers a listener that will be called back right before the framework stops The framework listener must have a method with the following prototype:: def framework_stopping(self): @@ -1404,13 +1396,13 @@ def framework_stopping(self): :param listener: The framework stop listener :return: True if the listener has been registered """ - return self.__framework._dispatcher.add_framework_listener(listener) + return await self.__framework._dispatcher.add_framework_listener(listener) - def add_service_listener( + async def add_service_listener( self, listener, ldap_filter: Optional[str] = None, specification: Optional[str] = None ) -> bool: """ - Registers a service listener + Async Registers a service listener The service listener must have a method with the following prototype:: def service_changed(self, event): ''' @@ -1426,13 +1418,13 @@ def service_changed(self, event): (optional, None to accept all services) :return: True if the listener has been successfully registered """ - return self.__framework._dispatcher.add_service_listener( + return await self.__framework._dispatcher.add_service_listener( self, listener, specification, ldap_filter ) - def get_all_service_references(self, clazz: Type[object], ldap_filter: Optional[str] = None): + async def get_all_service_references(self, clazz: Type[object], ldap_filter: Optional[str] = None): """ - Returns an array of ServiceReference objects. + Async Returns an array of ServiceReference objects. The returned array of ServiceReference objects contains services that were registered under the specified class and match the specified filter expression. @@ -1440,11 +1432,11 @@ def get_all_service_references(self, clazz: Type[object], ldap_filter: Optional[ :param ldap_filter: Service filter :return: The sorted list of all matching service references, or None """ - return self.__framework.find_service_references(clazz, ldap_filter) + return await self.__framework.find_service_references(clazz, ldap_filter) async def get_bundle(self, bundle_id: Union[Bundle, int] = None) -> Bundle: """ - Retrieves the :class:`~pelix.framework.Bundle` object for the bundle + Async Retrieves the :class:`~pelix.framework.Bundle` object for the bundle matching the given ID (int). If no ID is given (None), the bundle associated to this context is returned. :param bundle_id: A bundle ID (optional) @@ -1462,7 +1454,7 @@ async def get_bundle(self, bundle_id: Union[Bundle, int] = None) -> Bundle: async def get_bundles(self) -> List[Bundle]: """ - Returns the list of all installed bundles + Async Returns the list of all installed bundles :return: A list of :class:`~pelix.framework.Bundle` objects """ return await self.__framework.get_bundles() @@ -1477,19 +1469,19 @@ def get_framework(self) -> Framework: async def get_property(self, name: str) -> object: """ - Returns the value of a property of the framework, else returns the OS + Async Returns the value of a property of the framework, else returns the OS environment value. :param name: A property name """ return await self.__framework.get_property(name) - def get_service(self, reference: ServiceReference) -> Any: + async def get_service(self, reference: ServiceReference) -> Any: """ - Returns the service described with the given reference + Async Returns the service described with the given reference :param reference: A ServiceReference object :return: The service object itself """ - return self.__framework.get_service(self.__bundle, reference) + return await self.__framework.get_service(self.__bundle, reference) def get_service_objects(self, reference: ServiceReference) -> ServiceObjects: """ @@ -1500,15 +1492,15 @@ def get_service_objects(self, reference: ServiceReference) -> ServiceObjects: """ return self.__framework._get_service_objects(self.__bundle, reference) - def get_service_reference(self, clazz: Optional[str], ldap_filter: Optional[str] = None) -> Optional[ServiceReference]: + async def get_service_reference(self, clazz: Optional[str], ldap_filter: Optional[str] = None) -> Optional[ServiceReference]: """ - Returns a ServiceReference object for a service that implements and + Async Returns a ServiceReference object for a service that implements and was registered under the specified class :param clazz: The class name with which the service was registered. :param ldap_filter: A filter on service properties :return: A service reference, None if not found """ - result = self.__framework.find_service_references( + result = await self.__framework.find_service_references( clazz, ldap_filter, True ) try: @@ -1516,16 +1508,16 @@ def get_service_reference(self, clazz: Optional[str], ldap_filter: Optional[str] except TypeError: return None - def get_service_references(self, clazz: Optional[str], ldap_filter: Optional[str] = None) -> Optional[List[ServiceReference]]: + async def get_service_references(self, clazz: Optional[str], ldap_filter: Optional[str] = None) -> Optional[List[ServiceReference]]: """ - Returns the service references for services that were registered under + Async Returns the service references for services that were registered under the specified class by this bundle and matching the given filter :param clazz: The class name with which the service was registered. :param ldap_filter: A filter on service properties :return: The list of references to the services registered by the calling bundle and matching the filters. """ - refs = self.__framework.find_service_references(clazz, ldap_filter) + refs = await self.__framework.find_service_references(clazz, ldap_filter) if refs: for ref in refs: if ref.get_bundle() is not self.__bundle: @@ -1535,7 +1527,7 @@ def get_service_references(self, clazz: Optional[str], ldap_filter: Optional[str async def install_bundle(self, name: str, path: str = None) -> Bundle: """ - Installs the bundle (module) with the given name. + Async Installs the bundle (module) with the given name. If a path is given, it is inserted in first place in the Python loading path (``sys.path``). All modules loaded alongside this bundle, *i.e.* by this bundle or its dependencies, will be looked after in this path @@ -1573,9 +1565,9 @@ async def install_package(self, path: str, recursive: bool = False) -> tuple: async def install_visiting(self, path: str, visitor) -> tuple: """ - Looks for modules in the given path and installs those accepted by the + Async Looks for modules in the given path and installs those accepted by the given visitor. - The visitor must be a callable accepting 3 parameters:\ + The visitor must be a callable accepting 3 parameters: * **fullname** -- The full name of the module * **is_package** -- If True, the module is a package * **module_path** -- The path to the module file @@ -1598,7 +1590,7 @@ async def register_service( prototype: bool = False, ) -> ServiceRegistration: """ - Registers a service + Async Registers a service :param clazz: Class or Classes (list) implemented by this service :param service: The service instance :param properties: The services properties (dictionary) @@ -1619,38 +1611,38 @@ async def register_service( prototype, ) - def remove_bundle_listener(self, listener) -> bool: + async def remove_bundle_listener(self, listener) -> bool: """ - Unregisters the given bundle listener + Async Unregisters the given bundle listener :param listener: The bundle listener to remove :return: True if the listener has been unregistered, False if it wasn't registered """ - return self.__framework._dispatcher.remove_bundle_listener(listener) + return await self.__framework._dispatcher.remove_bundle_listener(listener) - def remove_framework_stop_listener(self, listener) -> bool: + async def remove_framework_stop_listener(self, listener) -> bool: """ - Unregisters a framework stop listener + Async Unregisters a framework stop listener :param listener: The framework stop listener :return: True if the listener has been unregistered """ - return self.__framework._dispatcher.remove_framework_listener(listener) + return await self.__framework._dispatcher.remove_framework_listener(listener) - def remove_service_listener(self, listener) -> bool: + async def remove_service_listener(self, listener) -> bool: """ - Unregisters a service listener + Async Unregisters a service listener :param listener: The service listener :return: True if the listener has been unregistered """ - return self.__framework._dispatcher.remove_service_listener(listener) + return await self.__framework._dispatcher.remove_service_listener(listener) - def unget_service(self, reference: ServiceReference) -> bool: + async def unget_service(self, reference: ServiceReference) -> bool: """ - Disables a reference to the service + Async Disables a reference to the service :return: True if the bundle was using this reference, else False """ # Lose the dependency - return self.__framework._registry.unget_service( + return await self.__framework._registry.unget_service( self.__bundle, reference ) @@ -1698,7 +1690,7 @@ def is_framework_running(cls, framework: Optional[Framework] = None) -> bool: async def delete_framework(cls, framework: Optional[Framework] = None) -> bool: # pylint: disable=W0212 """ - Removes the framework singleton + Async Removes the framework singleton :return: True on success, else False """ if framework is None: @@ -1711,30 +1703,20 @@ async def delete_framework(cls, framework: Optional[Framework] = None) -> bool: except: _logger.exception("Error stopping the framework") + loop = asyncio.get_running_loop() # Uninstall its bundles - bundles = await framework.get_bundles() - for bundle in bundles: + bundles: List[Bundle] = await framework.get_bundles() + bundle_uninstall = {bundle : loop.create_task(bundle.uninstall()) for bundle in bundles} + for bundle in bundle_uninstall: try: - await bundle.uninstall() + await bundle_uninstall[bundle] except: _logger.exception( - "Error uninstalling bundle %s", - bundle.get_symbolic_name(), + "Error uninstalling bundle %s", bundle.get_symbolic_name() ) - #workload = [bundle.uninstall() for bundle in bundles] - #results = await asyncio.gather(*workload, return_exceptions=True) - - # Collect all failed bundle - #failed_bundles = [i for i, v in enumerate(results) if v is not None] - #if len(failed_bundles) > 0: - # for bundle in failed_bundles: - # _logger.exception( - # "Error uninstalling bundle %s", bundles[bundle].get_symbolic_name() - # ) - # Clear the event dispatcher - framework._dispatcher.clear() + await framework._dispatcher.clear() # Clear the singleton cls.__singleton = None @@ -1754,7 +1736,7 @@ async def create_framework( auto_delete: bool = False, ) -> Framework: """ - Creates a Pelix framework, installs the given bundles and returns its + Async Creates a Pelix framework, installs the given bundles and returns its instance reference. If *auto_start* is True, the framework will be started once all bundles will have been installed @@ -1780,10 +1762,12 @@ async def create_framework( # Create the framework framework = FrameworkFactory.get_framework(properties) + loop = asyncio.get_running_loop() # Install bundles context = framework.get_bundle_context() - workload = [context.install_bundle(bundle) for bundle in bundles] - await asyncio.gather(*workload) + install_bundle = [loop.create_task(context.install_bundle(bundle)) for bundle in bundles] + for bundle in install_bundle: + await bundle if auto_start: # Automatically start the framework diff --git a/pelix/internals/hooks.py b/pelix/internals/hooks.py index becfd3e..3d0500d 100644 --- a/pelix/internals/hooks.py +++ b/pelix/internals/hooks.py @@ -93,7 +93,7 @@ def __len__(self): return len(self._delegate) -class ListenerInfo(object): +class ListenerInfo: """ Keeps information about a listener """ @@ -172,9 +172,9 @@ class EventListenerHook: pattern """ - def event(self, service_event, listener_dict): + async def event(self, service_event, listener_dict): """ - Method called when a service event is triggered. + Async Method called when a service event is triggered. :param service_event: The ServiceEvent being triggered :param listener_dict: A dictionary associating a bundle context to a diff --git a/pelix/internals/registry.py b/pelix/internals/registry.py index 855313d..d88acc6 100644 --- a/pelix/internals/registry.py +++ b/pelix/internals/registry.py @@ -29,10 +29,10 @@ import bisect import logging import asyncio -import threading # pylint: disable=W0611 from typing import Any, Dict, List, Optional, Set, Tuple, Union, Type - +from aioitertools import iter, next, list +from pelix.utilities import RLock # Pelix beans from pelix.constants import ( OBJECTCLASS, @@ -46,7 +46,8 @@ BundleException, ) from pelix.services import SERVICE_EVENT_LISTENER_HOOK -from pelix.internals.events import ServiceEvent +# Import BundleEvent only for typing purpose +from pelix.internals.events import ServiceEvent, BundleEvent # Pelix utility modules import pelix.ldapfilter as ldapfilter @@ -110,7 +111,7 @@ class _FactoryCounter: __slots__ = ("__bundle", "__factored") - def __init__(self, bundle): + def __init__(self, bundle) -> None: """ Sets up members @@ -119,7 +120,7 @@ def __init__(self, bundle): self.__bundle = bundle # Service Factory Reference -> (Service instance, Usage counter) - self.__factored = {} + self.__factored: Dict[ServiceReference, Tuple[ServiceRegistration, _UsageCounter]] = {} def is_used(self) -> bool: """ @@ -129,22 +130,22 @@ def is_used(self) -> bool: """ return bool(self.__factored) - def _get_from_factory(self, factory: Any, svc_registration: Type[object]) -> Any: + async def _get_from_factory(self, factory: Any, svc_registration: Type[object]) -> Any: """ - Returns a service instance from a Prototype Service Factory + Async Returns a service instance from a Prototype Service Factory :param factory: The prototype service factory :param svc_registration: The ServiceRegistration object :return: The requested service instance returned by the factory """ - svc_ref = svc_registration.get_reference() - try: + svc_ref: ServiceReference = svc_registration.get_reference() + if svc_ref in self.__factored: # Use the existing service service, counter = self.__factored[svc_ref] counter.inc() - except KeyError: + else: # Create the service - service = factory.get_service(self.__bundle, svc_registration) + service = await factory.get_service(self.__bundle, svc_registration) counter = _UsageCounter() counter.inc() @@ -153,23 +154,23 @@ def _get_from_factory(self, factory: Any, svc_registration: Type[object]) -> Any return service - def _get_from_prototype(self, factory: Any, svc_registration: Type[object]) -> Any: + async def _get_from_prototype(self, factory: Any, svc_registration: Type[object]) -> Any: """ - Returns a service instance from a Prototype Service Factory + Async Returns a service instance from a Prototype Service Factory :param factory: The service factory :param svc_registration: The ServiceRegistration object :return: The requested service instance returned by the factory """ - svc_ref = svc_registration.get_reference() - service = factory.get_service(self.__bundle, svc_registration) + svc_ref: ServiceReference = svc_registration.get_reference() + service = await factory.get_service(self.__bundle, svc_registration) - try: + if svc_ref in self.__factored: # Check if the service already exists services, counter = self.__factored[svc_ref] services.append(service) counter.inc() - except KeyError: + else: counter = _UsageCounter() counter.inc() @@ -178,13 +179,13 @@ def _get_from_prototype(self, factory: Any, svc_registration: Type[object]) -> A return service - def get_service( + async def get_service( self, factory: Any, svc_registration: Type[object] ) -> Any: """ - Returns the service required by the bundle. The Service Factory is + Async Returns the service required by the bundle. The Service Factory is called only when necessary while the Prototype Service Factory is called each time @@ -192,20 +193,20 @@ def get_service( :param svc_registration: The ServiceRegistration object :return: The requested service instance (created if necessary) """ - svc_ref = svc_registration.get_reference() + svc_ref: ServiceReference = svc_registration.get_reference() if svc_ref.is_prototype(): - return self._get_from_prototype(factory, svc_registration) + return await self._get_from_prototype(factory, svc_registration) - return self._get_from_factory(factory, svc_registration) + return await self._get_from_factory(factory, svc_registration) - def unget_service( + async def unget_service( self, factory: Any, svc_registration: Type[object], service: Optional[Any] = None ) -> bool: """ - Releases references to the given service reference + Async Releases references to the given service reference :param factory: The service factory :param svc_registration: The ServiceRegistration object @@ -213,17 +214,13 @@ def unget_service( :return: True if all service references to this service factory have been released """ - svc_ref = svc_registration.get_reference() - try: + svc_ref: ServiceReference = svc_registration.get_reference() + if svc_ref in self.__factored: _, counter = self.__factored[svc_ref] - except KeyError: - logging.warning( - "Trying to release an unknown service factory: %s", svc_ref - ) - else: + if svc_ref.is_prototype(): # Notify the factory to clean up this instance - factory.unget_service_instance( + await factory.unget_service_instance( self.__bundle, svc_registration, service ) @@ -232,51 +229,64 @@ def unget_service( del self.__factored[svc_ref] # Call the factory - factory.unget_service(self.__bundle, svc_registration) + await factory.unget_service(self.__bundle, svc_registration) # No more reference to this service return True + else: + logging.warning( + "Trying to release an unknown service factory: %s", svc_ref + ) # Some references are still there return False - def cleanup_service( + async def cleanup_service( self, factory: Any, svc_registration: Type[object] ) -> bool: """ - If this bundle used that factory, releases the reference; else does + Async If this bundle used that factory, releases the reference; else does nothing :param factory: The service factory :param svc_registration: The ServiceRegistration object :return: True if the bundle was using the factory, else False """ - svc_ref = svc_registration.get_reference() - try: + svc_ref: ServiceReference = svc_registration.get_reference() + if svc_ref in self.__factored: # "service" for factories, "services" for prototypes services, _ = self.__factored.pop(svc_ref) - except KeyError: - return False - else: + if svc_ref.is_prototype() and services: - for service in services: - try: + #Get EventLoop + loop = asyncio.get_running_loop() + unget_service = [ + loop.create_task( factory.unget_service_instance( - self.__bundle, svc_registration, service + self.__bundle, + svc_registration, + service ) - except Exception: - # Ignore instance-level exceptions, potential errors - # will reappear in unget_service() + ) + for service in services + ] + for instance in unget_service: + try: + await instance + except: pass + # Ignore instance-level exceptions, potential errors + # will reappear in unget_service() # Call the factory - factory.unget_service(self.__bundle, svc_registration) + await factory.unget_service(self.__bundle, svc_registration) # No more association - svc_ref.unused_by(self.__bundle) + await svc_ref.unused_by(self.__bundle) return True + return False # ------------------------------------------------------------------------------ @@ -314,10 +324,10 @@ def __init__(self, bundle, properties): ) # Properties lock (used by ServiceRegistration too) - self._props_lock = threading.RLock() + self._props_lock = RLock() # Usage lock - self.__usage_lock = threading.Lock() + self.__usage_lock = asyncio.Lock() # Service details self.__bundle = bundle @@ -399,39 +409,39 @@ def get_bundle(self): """ return self.__bundle - def get_using_bundles(self): + async def get_using_bundles(self): """ - Returns the list of bundles that use this service + Async Returns the list of bundles that use this service :return: A list of Bundle objects """ - return list(self.__using_bundles.keys()) + return await list(self.__using_bundles.keys()) - def get_properties(self): + async def get_properties(self): """ - Returns a copy of the service properties + Async Returns a copy of the service properties :return: A copy of the service properties """ - with self._props_lock: + async with self._props_lock: return self.__properties.copy() - def get_property(self, name): + async def get_property(self, name): """ - Retrieves the property value for the given name + Async Retrieves the property value for the given name :return: The property value, None if not found """ - with self._props_lock: + async with self._props_lock: return self.__properties.get(name) - def get_property_keys(self): + async def get_property_keys(self): """ - Returns an array of the keys in the properties of the service + Async Returns an array of the keys in the properties of the service :return: An array of property keys. """ - with self._props_lock: + async with self._props_lock: return tuple(self.__properties.keys()) def is_factory(self): @@ -453,9 +463,9 @@ def is_prototype(self): """ return self.__properties[SERVICE_SCOPE] == SCOPE_PROTOTYPE - def unused_by(self, bundle): + async def unused_by(self, bundle): """ - Indicates that this reference is not being used anymore by the given + Async Indicates that this reference is not being used anymore by the given bundle. This method should only be used by the framework. @@ -465,19 +475,15 @@ def unused_by(self, bundle): # Ignore return - with self.__usage_lock: - try: + async with self.__usage_lock: + if bundle in self.__using_bundles: if not self.__using_bundles[bundle].dec(): - # This bundle has cleaner all of its usages of this - # reference + # This bundle has cleaner all of its usages of this reference del self.__using_bundles[bundle] - except KeyError: - # Ignore error - pass - def used_by(self, bundle): + async def used_by(self, bundle): """ - Indicates that this reference is being used by the given bundle. + Async Indicates that this reference is being used by the given bundle. This method should only be used by the framework. :param bundle: A bundle using this reference @@ -486,7 +492,7 @@ def used_by(self, bundle): # Ignore return - with self.__usage_lock: + async with self.__usage_lock: self.__using_bundles.setdefault(bundle, _UsageCounter()).inc() def __compute_key(self): @@ -533,7 +539,7 @@ class ServiceRegistration: "__update_callback", ) - def __init__(self, framework, reference, properties, update_callback): + def __init__(self, framework, reference: ServiceReference, properties, update_callback): """ :param framework: The host framework :param reference: A service reference @@ -560,9 +566,9 @@ def get_reference(self) -> ServiceReference: """ return self.__reference - def set_properties(self, properties): + async def set_properties(self, properties): """ - Updates the service properties + Async Updates the service properties :param properties: The new properties :raise TypeError: The argument is not a dictionary @@ -570,42 +576,37 @@ def set_properties(self, properties): if not isinstance(properties, dict): raise TypeError("Waiting for dictionary") - # Keys that must not be updated - for forbidden_key in OBJECTCLASS, SERVICE_ID: - try: - del properties[forbidden_key] - except KeyError: - pass - - to_delete = [] - for key, value in properties.items(): - if self.__properties.get(key) == value: - # No update - to_delete.append(key) - - for key in to_delete: - # Remove unchanged properties - del properties[key] + if not isinstance(properties, dict): + raise TypeError("Waiting for dictionary") - if not properties: + # Properties that will be updated + update = { + key: properties[key] + for key, value in properties.items() + # Keys that must be updated + if + key not in {OBJECTCLASS, SERVICE_ID} + and + value != self.__properties.get(key) + } + + if not update: # Nothing to do return # Ensure that the service has a valid service ranking - try: - properties[SERVICE_RANKING] = int(properties[SERVICE_RANKING]) - except (ValueError, TypeError): - # Bad value: ignore update - del properties[SERVICE_RANKING] - except KeyError: - # Service ranking not updated: ignore - pass + if SERVICE_RANKING in update: + try: + update[SERVICE_RANKING] = int(update[SERVICE_RANKING]) + except (ValueError, TypeError): + # Bad value: ignore update + del update[SERVICE_RANKING] # pylint: disable=W0212 - with self.__reference._props_lock: + async with self.__reference._props_lock: # Update the properties previous = self.__properties.copy() - self.__properties.update(properties) + self.__properties.update(update) if self.__reference.needs_sort_update(): # The sort key and the registry must be updated @@ -616,11 +617,11 @@ def set_properties(self, properties): ServiceEvent.MODIFIED, self.__reference, previous ) - self.__framework._dispatcher.fire_service_event(event) + await self.__framework._dispatcher.fire_service_event(event) async def unregister(self): """ - Unregisters the service + Async Unregisters the service """ return await self.__framework.unregister_service(self) @@ -640,41 +641,41 @@ def __init__(self, registry, logger=None): :param registry: The service registry :param logger: The logger to be used """ - self._registry = registry + self._registry: ServiceRegistry = registry # Logger self._logger = logger or logging.getLogger("EventDispatcher") # Bundle listeners self.__bnd_listeners = [] - self.__bnd_lock = threading.Lock() + self.__bnd_lock = asyncio.Lock() # Service listeners (specification -> listener bean) self.__svc_listeners = {} # listener instance -> listener bean self.__listeners_data = {} - self.__svc_lock = threading.Lock() + self.__svc_lock = asyncio.Lock() # Framework stop listeners self.__fw_listeners = [] - self.__fw_lock = threading.Lock() + self.__fw_lock = asyncio.Lock() - def clear(self): + async def clear(self): """ - Clears the event dispatcher + Async Clears the event dispatcher """ - with self.__bnd_lock: + async with self.__bnd_lock: self.__bnd_listeners = [] - with self.__svc_lock: + async with self.__svc_lock: self.__svc_listeners.clear() - with self.__fw_lock: + async with self.__fw_lock: self.__fw_listeners = [] - def add_bundle_listener(self, listener): + async def add_bundle_listener(self, listener): """ - Adds a bundle listener + Async Adds a bundle listener :param listener: The bundle listener to register :return: True if the listener has been registered, False if it was @@ -684,7 +685,7 @@ def add_bundle_listener(self, listener): if listener is None or not hasattr(listener, "bundle_changed"): raise BundleException("Invalid bundle listener given") - with self.__bnd_lock: + async with self.__bnd_lock: if listener in self.__bnd_listeners: self._logger.warning( "Already known bundle listener '%s'", listener @@ -694,9 +695,9 @@ def add_bundle_listener(self, listener): self.__bnd_listeners.append(listener) return True - def add_framework_listener(self, listener): + async def add_framework_listener(self, listener): """ - Registers a listener that will be called back right before the + Async Registers a listener that will be called back right before the framework stops. :param listener: The framework stop listener @@ -707,7 +708,7 @@ def add_framework_listener(self, listener): if listener is None or not hasattr(listener, "framework_stopping"): raise BundleException("Invalid framework listener given") - with self.__fw_lock: + async with self.__fw_lock: if listener in self.__fw_listeners: self._logger.warning( "Already known framework listener '%s'", listener @@ -717,11 +718,11 @@ def add_framework_listener(self, listener): self.__fw_listeners.append(listener) return True - def add_service_listener( + async def add_service_listener( self, bundle_context, listener, specification=None, ldap_filter=None ): """ - Registers a service listener + Async Registers a service listener :param bundle_context: The bundle_context of the service listener :param listener: The service listener @@ -736,7 +737,7 @@ def add_service_listener( if listener is None or not hasattr(listener, "service_changed"): raise BundleException("Invalid service listener given") - with self.__svc_lock: + async with self.__svc_lock: if listener in self.__listeners_data: self._logger.warning( "Already known service listener '%s'", listener @@ -755,95 +756,107 @@ def add_service_listener( self.__svc_listeners.setdefault(specification, []).append(stored) return True - def remove_bundle_listener(self, listener): + async def remove_bundle_listener(self, listener): """ - Unregisters a bundle listener + Async Unregisters a bundle listener :param listener: The bundle listener to unregister :return: True if the listener has been unregistered, else False """ - with self.__bnd_lock: + async with self.__bnd_lock: if listener not in self.__bnd_listeners: return False self.__bnd_listeners.remove(listener) return True - def remove_framework_listener(self, listener): + async def remove_framework_listener(self, listener): """ - Unregisters a framework stop listener + Async Unregisters a framework stop listener :param listener: The framework listener to unregister :return: True if the listener has been unregistered, else False """ - with self.__fw_lock: + async with self.__fw_lock: try: self.__fw_listeners.remove(listener) return True except ValueError: return False - def remove_service_listener(self, listener): + async def remove_service_listener(self, listener): """ - Unregisters a service listener + Async Unregisters a service listener :param listener: The service listener :return: True if the listener has been unregistered """ - with self.__svc_lock: - try: + async with self.__svc_lock: + if listener in self.__listeners_data: data = self.__listeners_data.pop(listener) - spec_listeners = self.__svc_listeners[data.specification] - spec_listeners.remove(data) - if not spec_listeners: - del self.__svc_listeners[data.specification] - return True - except KeyError: + if data.specification in self.__svc_listeners: + spec_listeners = self.__svc_listeners[data.specification] + spec_listeners.remove(data) + if not spec_listeners: + del self.__svc_listeners[data.specification] + return True return False + return False - def fire_bundle_event(self, event): + async def fire_bundle_event(self, event): """ - Notifies bundle events listeners of a new event in the calling thread. + Async Notifies bundle events listeners of a new event in the calling thread. :param event: The bundle event """ - with self.__bnd_lock: + # Get EventLoop + loop = asyncio.get_running_loop() + + async with self.__bnd_lock: # Copy the list of listeners listeners = self.__bnd_listeners[:] + # Schedule all + listener_list = [loop.create_task(listener.bundle_changed(event)) for listener in listeners] # Call'em all - for listener in listeners: + for listener in listener_list: try: - listener.bundle_changed(event) + await listener except: self._logger.exception("Error calling a bundle listener") - def fire_framework_stopping(self): + async def fire_framework_stopping(self): """ - Calls all framework listeners, telling them that the framework is + Async Calls all framework listeners, telling them that the framework is stopping """ - with self.__fw_lock: + # Get EventLoop + loop = asyncio.get_running_loop() + + async with self.__fw_lock: # Copy the list of listeners listeners = self.__fw_listeners[:] - - for listener in listeners: + + # Schedule all + listener_list = [loop.create_task(listener.framework_stopping()) for listener in listeners] + # Call'em all + for listener in listener_list: try: - listener.framework_stopping() + await listener except: self._logger.exception( "An error occurred calling one of the " "framework stop listeners" ) - def fire_service_event(self, event): + async def fire_service_event(self, event: ServiceEvent): """ - Notifies service events listeners of a new event in the calling thread. + Async Notifies service events listeners of a new event in the calling thread. :param event: The service event """ # Get the service properties - properties = event.get_service_reference().get_properties() + properties = await event.get_service_reference().get_properties() svc_specs = properties[OBJECTCLASS] previous = None endmatch_event = None @@ -858,23 +871,19 @@ def fire_service_event(self, event): previous, ) - with self.__svc_lock: + async with self.__svc_lock: # Get the listeners for this specification listeners = set() for spec in svc_specs: - try: + if spec in self.__svc_listeners: listeners.update(self.__svc_listeners[spec]) - except KeyError: - pass # Add those which listen to any specification - try: + if None in self.__svc_listeners: listeners.update(self.__svc_listeners[None]) - except KeyError: - pass # Filter listeners with EventListenerHooks - listeners = self._filter_with_hooks(event, listeners) + listeners = await self._filter_with_hooks(event, listeners) # Get the listeners for this specification for data in listeners: @@ -898,21 +907,21 @@ def fire_service_event(self, event): # Call'em try: - data.listener.service_changed(sent_event) + await data.listener.service_changed(sent_event) except: self._logger.exception("Error calling a service listener") - def _filter_with_hooks(self, svc_event, listeners): + async def _filter_with_hooks(self, svc_event: ServiceEvent, listeners): """ - Filters listeners with EventListenerHooks + Async Filters listeners with EventListenerHooks :param svc_event: ServiceEvent being triggered :param listeners: Listeners to filter :return: A list of listeners with hook references """ - svc_ref = svc_event.get_service_reference() + svc_ref: ServiceReference = svc_event.get_service_reference() # Get EventListenerHooks service refs from registry - hook_refs = self._registry.find_service_references( + hook_refs = await self._registry.find_service_references( SERVICE_EVENT_LISTENER_HOOK ) # only do something if there are some hook_refs @@ -937,20 +946,20 @@ def _filter_with_hooks(self, svc_event, listeners): # Get the bundle of the hook service hook_bundle = hook_ref.get_bundle() # lookup service from registry - hook_svc = self._registry.get_service(hook_bundle, hook_ref) + hook_svc = await self._registry.get_service(hook_bundle, hook_ref) if hook_svc is not None: # call event method of the hook service, # pass in svc_event and shrinkable_ctx_listeners # (which can be modified by hook) try: - hook_svc.event(svc_event, shrinkable_ctx_listeners) + await hook_svc.event(svc_event, shrinkable_ctx_listeners) except: self._logger.exception( "Error calling EventListenerHook" ) finally: # Clean up the service - self._registry.unget_service(hook_bundle, hook_ref) + await self._registry.unget_service(hook_bundle, hook_ref) # Convert the shrinkable_ctx_listeners back to a list of listeners # before returning @@ -999,26 +1008,26 @@ def __init__(self, framework, logger=None): # Specification -> Service references[] (always sorted) self.__svc_specs = {} - # Services published: Bundle -> set(Service references) + # Services published -> set(Service references) self.__bundle_svc: Dict[Any, Set[ServiceReference]] = {} - # Services consumed: Bundle -> {Service reference -> UsageCounter} + # Services consumed -> {Service reference -> UsageCounter} self.__bundle_imports: Dict[Any, Dict[ServiceReference, _UsageCounter]] = {} - # Service factories consumption: Bundle -> _FactoryCounter + # Service factories consumption -> _FactoryCounter self.__factory_usage: Dict[Any, _FactoryCounter] = {} # Locks - self.__svc_lock = threading.RLock() + self.__svc_lock = RLock() # Pending unregistration: Service reference -> Service instance self.__pending_services: Dict[ServiceReference, Any] = {} - def clear(self): + async def clear(self): """ - Clears the registry + Async Clears the registry """ - with self.__svc_lock: + async with self.__svc_lock: self.__svc_registry.clear() self.__svc_factories.clear() self.__svc_specs.clear() @@ -1027,11 +1036,11 @@ def clear(self): self.__factory_usage.clear() self.__pending_services.clear() - def register( + async def register( self, bundle, classes, properties, svc_instance, factory, prototype ): """ - Registers a service. + Async Registers a service. :param bundle: The bundle that registers the service :param classes: The classes implemented by the service @@ -1042,7 +1051,7 @@ def register( factory (the factory argument is considered True) :return: The ServiceRegistration object """ - with self.__svc_lock: + async with self.__svc_lock: # Prepare properties service_id = self.__next_service_id self.__next_service_id += 1 @@ -1059,9 +1068,12 @@ def register( properties[SERVICE_SCOPE] = SCOPE_SINGLETON # Force to have a valid service ranking - try: - properties[SERVICE_RANKING] = int(properties[SERVICE_RANKING]) - except (KeyError, ValueError, TypeError): + if SERVICE_RANKING in properties: + try: + properties[SERVICE_RANKING] = int(properties[SERVICE_RANKING]) + except (ValueError, TypeError): + properties[SERVICE_RANKING] = 0 + else: properties[SERVICE_RANKING] = 0 # Make the service reference @@ -1088,19 +1100,20 @@ def register( bundle_services.add(svc_ref) return svc_registration - def __sort_registry(self, svc_ref: ServiceReference) -> None: + async def __sort_registry(self, svc_ref: ServiceReference) -> None: """ - Sorts the registry, after the update of the sort key of given service + Async Sorts the registry, after the update of the sort key of given service reference :param svc_ref: A service reference with a modified sort key """ - with self.__svc_lock: + async with self.__svc_lock: if svc_ref not in self.__svc_registry: raise BundleException("Unknown service: {0}".format(svc_ref)) + get_property = await svc_ref.get_property(OBJECTCLASS) # Remove current references - for spec in svc_ref.get_property(OBJECTCLASS): + for spec in get_property: # Use bisect to remove the reference (faster) spec_refs = self.__svc_specs[spec] idx = bisect.bisect_left(spec_refs, svc_ref) @@ -1109,26 +1122,25 @@ def __sort_registry(self, svc_ref: ServiceReference) -> None: # ... use the new sort key svc_ref.update_sort_key() - for spec in svc_ref.get_property(OBJECTCLASS): + for spec in get_property: # ... and insert it again spec_refs = self.__svc_specs[spec] bisect.insort_left(spec_refs, svc_ref) - def unregister(self, svc_ref: ServiceReference) -> Any: + async def unregister(self, svc_ref: ServiceReference) -> Any: """ - Unregisters a service + Async Unregisters a service :param svc_ref: A service reference :return: The unregistered service instance :raise BundleException: Unknown service reference """ - with self.__svc_lock: - try: + async with self.__svc_lock: + if svc_ref in self.__pending_services: # Try in pending services return self.__pending_services.pop(svc_ref) - except KeyError: - # Not pending: continue - pass + + # else Not pending: continue if svc_ref not in self.__svc_registry: raise BundleException("Unknown service: {0}".format(svc_ref)) @@ -1139,7 +1151,8 @@ def unregister(self, svc_ref: ServiceReference) -> Any: # Get the service instance service = self.__svc_registry.pop(svc_ref) - for spec in svc_ref.get_property(OBJECTCLASS): + get_property = await svc_ref.get_property(OBJECTCLASS) + for spec in get_property: spec_services = self.__svc_specs[spec] # Use bisect to remove the reference (faster) idx = bisect.bisect_left(spec_services, svc_ref) @@ -1152,7 +1165,7 @@ def unregister(self, svc_ref: ServiceReference) -> Any: # Call unget_service for all client bundle factory, svc_reg = self.__svc_factories.pop(svc_ref) for counter in self.__factory_usage.values(): - counter.cleanup_service(factory, svc_reg) + await counter.cleanup_service(factory, svc_reg) else: # Delete bundle association bundle_services = self.__bundle_svc[bundle] @@ -1163,21 +1176,22 @@ def unregister(self, svc_ref: ServiceReference) -> Any: return service - def hide_bundle_services(self, bundle): + async def hide_bundle_services(self, bundle): """ - Hides the services of the given bundle (removes them from lists, but + Async Hides the services of the given bundle (removes them from lists, but lets them be unregistered) :param bundle: The bundle providing services :return: The references of the hidden services """ - with self.__svc_lock: - try: - svc_refs = self.__bundle_svc.pop(bundle) - except KeyError: + async with self.__svc_lock: + if bundle not in self.__bundle_svc: # Nothing to do return set() + else: + svc_refs = self.__bundle_svc.pop(bundle) + # Clean the registry specs = set() for svc_ref in svc_refs: @@ -1188,27 +1202,27 @@ def hide_bundle_services(self, bundle): self.__pending_services[svc_ref] = self.__svc_registry.pop( svc_ref ) - specs.update(svc_ref.get_property(OBJECTCLASS)) + get_property = await svc_ref.get_property(OBJECTCLASS) + specs.update(get_property) # Clean the specifications cache - for spec in svc_ref.get_property(OBJECTCLASS): + for spec in get_property: spec_services = self.__svc_specs[spec] # Use bisect to remove the reference (faster) idx = bisect.bisect_left(spec_services, svc_ref) del spec_services[idx] if not spec_services: del self.__svc_specs[spec] - return svc_refs - def find_service_references( + async def find_service_references( self, clazz: Optional[Type[object]] = None, ldap_filter: Optional[ldapfilter.LDAPFilter] = None, only_one: bool = False ) -> Union[list, None]: """ - Finds all services references matching the given filter. + Async Finds all services references matching the given filter. :param clazz: Class implemented by the service :param ldap_filter: Service filter @@ -1217,7 +1231,7 @@ def find_service_references( :raise BundleException: An error occurred looking for service references """ - with self.__svc_lock: + async with self.__svc_lock: if clazz is None and ldap_filter is None: # Return a sorted copy of the keys list # Do not return None, as the whole content was required @@ -1234,10 +1248,10 @@ def find_service_references( # Directly use the given filter refs_set = sorted(self.__svc_registry.keys()) else: - try: + if clazz in self.__svc_specs: # Only for references with the given specification refs_set = iter(self.__svc_specs[clazz]) - except KeyError: + else: # No matching specification return None @@ -1252,24 +1266,23 @@ def find_service_references( # walk-through refs_set = ( ref - for ref in refs_set - if new_filter.matches(ref.get_properties()) + async for ref in refs_set + if new_filter.matches(await ref.get_properties()) ) - if only_one: # Return the first element in the list/generator try: - return [next(refs_set)] + return [await next(refs_set)] except StopIteration: # No match return None # Get all the matching references - return list(refs_set) or None + return await list(refs_set) or None - def get_bundle_imported_services(self, bundle): + async def get_bundle_imported_services(self, bundle): """ - Returns this bundle's ServiceReference list for all services it is + Async Returns this bundle's ServiceReference list for all services it is using or returns None if this bundle is not using any services. A bundle is considered to be using a service if its use count for that service is greater than zero. @@ -1281,49 +1294,49 @@ def get_bundle_imported_services(self, bundle): :param bundle: The bundle to look into :return: The references of the services used by this bundle """ - with self.__svc_lock: + async with self.__svc_lock: return sorted(self.__bundle_imports.get(bundle, [])) - def get_bundle_registered_services(self, bundle: Any) -> List[ServiceReference]: + async def get_bundle_registered_services(self, bundle) -> List[ServiceReference]: """ - Retrieves the services registered by the given bundle. Returns None + Async Retrieves the services registered by the given bundle. Returns None if the bundle didn't register any service. :param bundle: The bundle to look into :return: The references to the services registered by the bundle """ - with self.__svc_lock: + async with self.__svc_lock: return sorted(self.__bundle_svc.get(bundle, [])) - def get_service(self, bundle: Any, reference: ServiceReference) -> Any: + async def get_service(self, bundle, reference: ServiceReference) -> Any: """ - Retrieves the service corresponding to the given reference + Async Retrieves the service corresponding to the given reference :param bundle: The bundle requiring the service :param reference: A service reference :return: The requested service :raise BundleException: The service could not be found """ - with self.__svc_lock: + async with self.__svc_lock: if reference.is_factory(): - return self.__get_service_from_factory(bundle, reference) + return await self.__get_service_from_factory(bundle, reference) # Be sure to have the instance - try: + if reference in self.__svc_registry: service = self.__svc_registry[reference] # Indicate the dependency imports = self.__bundle_imports.setdefault(bundle, {}) imports.setdefault(reference, _UsageCounter()).inc() - reference.used_by(bundle) + await reference.used_by(bundle) return service - except KeyError: + else: # Not found raise BundleException( "Service not found (reference: {0})".format(reference) ) - def __get_service_from_factory(self, bundle: Any, reference: ServiceReference) -> Any: + async def __get_service_from_factory(self, bundle, reference: ServiceReference) -> Any: """ Returns a service instance from a service factory or a prototype service factory @@ -1333,7 +1346,7 @@ def __get_service_from_factory(self, bundle: Any, reference: ServiceReference) - :return: The requested service :raise BundleException: The service could not be found """ - try: + if reference in self.__svc_factories: factory, svc_reg = self.__svc_factories[reference] # Indicate the dependency @@ -1344,105 +1357,99 @@ def __get_service_from_factory(self, bundle: Any, reference: ServiceReference) - usage_counter = _UsageCounter() usage_counter.inc() imports[reference] = usage_counter - reference.used_by(bundle) + await reference.used_by(bundle) # Check the per-bundle usage counter factory_counter = self.__factory_usage.setdefault( bundle, _FactoryCounter(bundle) ) - return factory_counter.get_service(factory, svc_reg) - except KeyError: + return await factory_counter.get_service(factory, svc_reg) + else: # Not found raise BundleException( "Service not found (reference: {0})".format(reference) ) - def unget_used_services(self, bundle): + async def unget_used_services(self, bundle): """ - Cleans up all service usages of the given bundle. + Async Cleans up all service usages of the given bundle. - :param bundle: Bundle to be cleaned up + :param bundle to be cleaned up """ # Pop used references - try: - imported_refs = list(self.__bundle_imports.pop(bundle)) - except KeyError: + if bundle in self.__bundle_imports: + imported_refs = await list(self.__bundle_imports.pop(bundle)) + else: # Nothing to do return for svc_ref in imported_refs: # Remove usage marker - svc_ref.unused_by(bundle) + await svc_ref.unused_by(bundle) if svc_ref.is_prototype(): # Get factory information and clean up the service from the # factory counter factory_counter = self.__factory_usage.pop(bundle) factory, svc_reg = self.__svc_factories[svc_ref] - factory_counter.cleanup_service(factory, svc_reg) + await factory_counter.cleanup_service(factory, svc_reg) elif svc_ref.is_factory(): # Factory service, release it the standard way - self.__unget_service_from_factory(bundle, svc_ref) + await self.__unget_service_from_factory(bundle, svc_ref) # Clean up local structures - try: + if bundle in self.__factory_usage: del self.__factory_usage[bundle] - except KeyError: - pass - try: + if bundle in self.__bundle_imports: self.__bundle_imports.pop(bundle).clear() - except KeyError: - pass - def unget_service( + async def unget_service( self, bundle: Any, reference: ServiceReference, service: Optional[Any] = None ) -> bool: """ - Removes the usage of a service by a bundle + Async Removes the usage of a service by a bundle :param bundle: The bundle that used the service :param reference: A service reference :param service: Service instance (for Prototype Service Factories) :return: True if the bundle usage has been removed """ - with self.__svc_lock: + async with self.__svc_lock: if reference.is_prototype(): - return self.__unget_service_from_factory( + return await self.__unget_service_from_factory( bundle, reference, service ) elif reference.is_factory(): - return self.__unget_service_from_factory(bundle, reference) + return await self.__unget_service_from_factory(bundle, reference) - try: + if bundle in self.__bundle_imports: # Remove the service reference from the bundle imports = self.__bundle_imports[bundle] if not imports[reference].dec(): # No more reference to it del imports[reference] - except KeyError: - # Unknown reference - return False - else: # Clean up if not imports: del self.__bundle_imports[bundle] # Update the service reference - reference.unused_by(bundle) + await reference.unused_by(bundle) return True + else: + return False - def __unget_service_from_factory( + async def __unget_service_from_factory( self, bundle: Any, reference: ServiceReference, service: Optional[Any] = None ) -> bool: """ - Removes the usage of a a service factory or a prototype + Async Removes the usage of a a service factory or a prototype service factory by a bundle :param bundle: The bundle that used the service @@ -1450,37 +1457,32 @@ def __unget_service_from_factory( :param service: Service instance (for prototype factories) :return: True if the bundle usage has been removed """ - try: + if reference in self.__svc_factories: factory, svc_reg = self.__svc_factories[reference] - except KeyError: + else: # Unknown service reference return False # Check the per-bundle usage counter - try: + if bundle in self.__factory_usage: counter = self.__factory_usage[bundle] - except KeyError: + else: # Unknown reference to a factory return False - else: - if counter.unget_service(factory, svc_reg, service): - try: - # No more dependency - reference.unused_by(bundle) - - # All references have been taken away: clean up - if not self.__factory_usage[bundle].is_used(): - del self.__factory_usage[bundle] - - # Remove the service reference from the bundle - imports = self.__bundle_imports[bundle] - del imports[reference] - except KeyError: - # Unknown reference - return False - else: - # Clean up - if not imports: - del self.__bundle_imports[bundle] - + if await counter.unget_service(factory, svc_reg, service): + # No more dependency + await reference.unused_by(bundle) + # All references have been taken away: clean up + if not self.__factory_usage[bundle].is_used(): + del self.__factory_usage[bundle] + if bundle in self.__bundle_imports: + # Remove the service reference from the bundle + imports = self.__bundle_imports[bundle] + del imports[reference] + # Clean up + if not imports: + del self.__bundle_imports[bundle] + else: + # Unknown reference + return False return True