diff --git a/pelix/framework.py b/pelix/framework.py index 37a68c42..f4d55bef 100644 --- a/pelix/framework.py +++ b/pelix/framework.py @@ -2,24 +2,17 @@ # -- Content-Encoding: UTF-8 -- """ Core module for Pelix. - Pelix is a Python framework that aims to act as OSGi as much as possible - :author: Thomas Calmant :copyright: Copyright 2018, Thomas Calmant :license: Apache License 2.0 :version: 0.8.2 - .. - Copyright 2018 Thomas Calmant - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,11 +25,9 @@ import importlib import inspect import logging -from concurrent.futures import Future import os import sys -import asyncio -#import threading +import threading import uuid # pylint: disable=W0611 from typing import Any, List, Optional, Set, Union @@ -66,7 +57,6 @@ def reload_module(module_): """ Reloads a module using ``importlib.reload()`` when available - :param module_: The module to update :return: The new version of the module :raise ImportError: Error looking for file @@ -81,7 +71,6 @@ def walk_modules(path): """ Code from ``pkgutil.ImpImporter.iter_modules()``: walks through a folder and yields all loadable packages and modules. - :param path: Path where to look for modules :return: Generator to walk through found packages and modules """ @@ -160,7 +149,6 @@ class Bundle(object): "_state", "__registered_services", "__registration_lock", - "__loop" ) UNINSTALLED = 1 @@ -185,16 +173,13 @@ def __init__(self, framework, bundle_id, name, module_): # type: (Framework, int, str, types.ModuleType) -> None """ Sets up the bundle descriptor - :param framework: The host framework :param bundle_id: The ID of the bundle in the host framework :param name: The bundle symbolic name :param module_: The bundle module """ - # Main Thread EventLoop - self.__loop = asyncio.get_event_loop() - # A lock for synchronization - self._lock = asyncio.Lock() + # A re-entrant lock for synchronization + self._lock = threading.RLock() # Bundle self.__context = BundleContext(framework, self) @@ -207,7 +192,7 @@ def __init__(self, framework, bundle_id, name, module_): # Registered services self.__registered_services = set() # type: Set[ServiceRegistration] - self.__registration_lock = asyncio.Lock() + self.__registration_lock = threading.Lock() def __str__(self): """ @@ -215,10 +200,9 @@ def __str__(self): """ return "Bundle(ID={0}, Name={1})".format(self.__id, self.__name) - async def __get_activator_method(self, method_name): + def __get_activator_method(self, method_name): """ Retrieves the requested method of the activator, or returns None - :param method_name: A method name :return: A method, or None """ @@ -237,42 +221,38 @@ async def __get_activator_method(self, method_name): ) return getattr(activator, method_name, None) - async def _fire_bundle_event(self, kind): + def _fire_bundle_event(self, kind): # type: (int) -> None """ Fires a bundle event of the given kind - :param kind: Kind of event """ self.__framework._dispatcher.fire_bundle_event(BundleEvent(kind, self)) - async def _registered_service(self, registration): + def _registered_service(self, registration): # type: (ServiceRegistration) -> None """ Bundle is notified by the framework that a service has been registered in the name of this bundle. - :param registration: The service registration object """ - async with self.__registration_lock: + with self.__registration_lock: self.__registered_services.add(registration) - async def _unregistered_service(self, registration): + def _unregistered_service(self, registration): # type: (ServiceRegistration) -> None """ Bundle is notified by the framework that a service has been unregistered in the name of this bundle. - :param registration: The service registration object """ - async with self.__registration_lock: + with self.__registration_lock: self.__registered_services.discard(registration) def get_bundle_context(self): # type: () -> BundleContext """ Retrieves the bundle context - :return: The bundle context """ return self.__context @@ -281,7 +261,6 @@ def get_bundle_id(self): # type: () -> int """ Retrieves the bundle ID - :return: The bundle ID """ return self.__id @@ -290,7 +269,6 @@ def get_location(self): # type: () -> str """ Retrieves the location of this module - :return: The location of the Pelix module, or an empty string """ return getattr(self.__module, "__file__", "") @@ -300,7 +278,6 @@ def get_module(self): # type: () -> types.ModuleType """ Retrieves the Python module corresponding to the bundle - :return: The Python module """ return self.__module @@ -310,11 +287,9 @@ def get_registered_services(self): """ Returns this bundle's ServiceReference list for all services it has registered or an empty list - The list is valid at the time of the call to this method, however, as the Framework is a very dynamic environment, services can be modified or unregistered at any time. - :return: An array of ServiceReference objects :raise BundleException: If the bundle has been uninstalled """ @@ -332,11 +307,9 @@ def get_services_in_use(self): using or an empty list. A bundle is considered to be using a service if its use count for that service is greater than zero. - The list is valid at the time of the call to this method, however, as the Framework is a very dynamic environment, services can be modified or unregistered at any time. - :return: An array of ServiceReference objects :raise BundleException: If the bundle has been uninstalled """ @@ -350,7 +323,6 @@ def get_state(self): # type: () -> int """ Retrieves the bundle state - :return: The bundle state """ return self._state @@ -359,7 +331,6 @@ def get_symbolic_name(self): # type: () -> str """ Retrieves the bundle symbolic name (its Python module name) - :return: The bundle symbolic name """ return self.__name @@ -369,7 +340,6 @@ def get_version(self): """ Retrieves the bundle version, using the ``__version__`` or ``__version_info__`` attributes of its module. - :return: The bundle version, "0.0.0" by default """ # Get the version value @@ -389,30 +359,6 @@ def start(self): """ Starts the bundle. Does nothing if the bundle is already starting or active. - - :raise BundleException: The framework is not yet started or the bundle - activator failed. - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_start(), - self.__loop) - try: - # Waiting for the end of the coroutine - result = future.result() - except BundleException: - raise BundleException - else: - try: - self.__loop.run_until_complete(self.async_start()) - except BundleException: - raise BundleException - - async def async_start(self): - """ - Async Starts the bundle. Does nothing if the bundle - is already starting or active. - :raise BundleException: The framework is not yet started or the bundle activator failed. """ @@ -422,8 +368,7 @@ async def async_start(self): "Framework must be started before its bundles" ) - loop = asyncio.get_running_loop() - async with self._lock: + with self._lock: if self._state in (Bundle.ACTIVE, Bundle.STARTING): # Already started bundle, do nothing return @@ -433,12 +378,10 @@ async def async_start(self): # Starting... self._state = Bundle.STARTING - starting = loop.create_task(self._fire_bundle_event(BundleEvent.STARTING)) - await starting + self._fire_bundle_event(BundleEvent.STARTING) # Call the activator, if any - activator = loop.create_task(self.__get_activator_method("start")) - starter = await activator + starter = self.__get_activator_method("start") if starter is not None: try: # Call the start method @@ -464,33 +407,11 @@ async def async_start(self): # Bundle is now active self._state = Bundle.ACTIVE - started = loop.create_task(self._fire_bundle_event(BundleEvent.STARTED)) - await started + self._fire_bundle_event(BundleEvent.STARTED) def stop(self): """ Stops the bundle. Does nothing if the bundle is already stopped. - - :raise BundleException: The bundle activator failed. - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_stop(), self.__loop) - try: - # Waiting for the end of the coroutine - result = future.result() - except BundleException: - raise BundleException - else: - try: - self.__loop.run_until_complete(self.async_stop()) - except BundleException: - raise BundleException - - async def async_stop(self): - """ - Async Stops the bundle. Does nothing if the bundle is already stopped. - :raise BundleException: The bundle activator failed. """ if self._state != Bundle.ACTIVE: @@ -498,19 +419,16 @@ async def async_stop(self): return exception = None - loop = asyncio.get_running_loop() - async with self._lock: + with self._lock: # Store the bundle current state previous_state = self._state # Stopping... self._state = Bundle.STOPPING - stopping = loop.create_task(self._fire_bundle_event(BundleEvent.STOPPING)) - await stopping + self._fire_bundle_event(BundleEvent.STOPPING) # Call the activator, if any - activator = loop.create_task(self.__get_activator_method("stop")) - stopper = await activator + stopper = self.__get_activator_method("stop") if stopper is not None: try: # Call the start method @@ -532,26 +450,21 @@ async def async_stop(self): exception = BundleException(ex) # Hide remaining services - hide_bundle_services = loop.create_task(self.__framework._hide_bundle_services(self)) - await hide_bundle_services + self.__framework._hide_bundle_services(self) # Intermediate bundle event : activator should have cleaned up # everything, but some element could stay (iPOPO components, ...) - stopping_preclean = loop.create_task(self._fire_bundle_event(BundleEvent.STOPPING_PRECLEAN)) - await stopping_preclean + self._fire_bundle_event(BundleEvent.STOPPING_PRECLEAN) # Remove remaining services (the hard way) - remove_services = loop.create_task(self.__unregister_services()) - await remove_services + self.__unregister_services() # Cleanup service usages - clean_services = loop.create_task(self.__framework._unget_used_services(self)) - await clean_services + self.__framework._unget_used_services(self) # Bundle is now stopped and all its services have been unregistered self._state = Bundle.RESOLVED - stopped = loop.create_task(self._fire_bundle_event(BundleEvent.STOPPED)) - await stopped + self._fire_bundle_event(BundleEvent.STOPPED) # Raise the exception, if any # pylint: disable=E0702 @@ -559,12 +472,12 @@ async def async_stop(self): if exception is not None: raise exception - async def __unregister_services(self): + def __unregister_services(self): """ Unregisters all bundle services """ # Copy the services list, as it will be modified during the process - async with self.__registration_lock: + with self.__registration_lock: registered_services = self.__registered_services.copy() for registration in registered_services: @@ -577,7 +490,7 @@ async def __unregister_services(self): if self.__registered_services: _logger.warning("Not all services have been unregistered...") - async with self.__registration_lock: + with self.__registration_lock: # Clear the list, just to be clean self.__registered_services.clear() @@ -585,20 +498,7 @@ def uninstall(self): """ Uninstalls the bundle """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_uninstall(), self.__loop) - # Waiting for the end of the coroutine - future.result() - else: - self.__loop.run_until_complete(self.async_uninstall()) - - async def async_uninstall(self): - """ - Async Uninstalls the bundle - """ - loop = asyncio.get_running_loop() - async with self._lock: + with self._lock: if self._state == Bundle.ACTIVE: self.stop() @@ -606,42 +506,25 @@ async def async_uninstall(self): self._state = Bundle.UNINSTALLED # Call the framework - uninstall = loop.create_task(self.__framework.uninstall_bundle(self)) - await uninstall + self.__framework.uninstall_bundle(self) def update(self): """ Updates the bundle """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_update(), self.__loop) - # Waiting for the end of the coroutine - future.result() - else: - self.__loop.run_until_complete(self.async_update()) - - async def async_update(self): - """ - Async Updates the bundle - """ - loop = asyncio.get_running_loop() - async with self._lock: + with self._lock: # Was it active ? restart = self._state == Bundle.ACTIVE # Send the update event - update_begin = loop.create_task(self._fire_bundle_event(BundleEvent.UPDATE_BEGIN)) - await update_begin + self._fire_bundle_event(BundleEvent.UPDATE_BEGIN) try: # Stop the bundle - stop = loop.create_task(self.async_stop()) - await stop + self.stop() except: # Something wrong occurred, notify listeners - update_fails = loop.create_task(self._fire_bundle_event(BundleEvent.UPDATE_FAILED)) - await update_fails + self._fire_bundle_event(BundleEvent.UPDATE_FAILED) raise # Change the source file age @@ -701,17 +584,14 @@ async def async_update(self): if restart: try: # Re-start the bundle - start = loop.create_task(self.async_start()) - await start + self.start() except: # Something wrong occurred, notify listeners - update_failed = loop.create_task(self._fire_bundle_event(BundleEvent.UPDATE_FAILED)) - await update_failed + self._fire_bundle_event(BundleEvent.UPDATE_FAILED) raise # Bundle update finished - updated = loop.create_task(self._fire_bundle_event(BundleEvent.UPDATED)) - await updated + self._fire_bundle_event(BundleEvent.UPDATED) # ------------------------------------------------------------------------------ @@ -727,12 +607,8 @@ class Framework(Bundle): def __init__(self, properties=None): """ Sets up the framework. - :param properties: The framework properties """ - # Main Thread EventLoop - self.__loop = asyncio.get_event_loop() - # Framework bundle set up Bundle.__init__( self, self, 0, self.get_symbolic_name(), sys.modules[__name__] @@ -755,7 +631,7 @@ def __init__(self, properties=None): self.__properties[OSGI_FRAMEWORK_UUID] = str(framework_uid) # Properties lock - self.__properties_lock = asyncio.Lock() + self.__properties_lock = threading.Lock() # Bundles (start at 1, as 0 is reserved for the framework itself) self.__next_bundle_id = 1 @@ -764,7 +640,7 @@ def __init__(self, properties=None): self.__bundles = {} # Bundles lock - self.__bundles_lock = asyncio.Lock() + self.__bundles_lock = threading.RLock() # Service registry self._registry = ServiceRegistry(self) @@ -774,41 +650,20 @@ def __init__(self, properties=None): self._dispatcher = EventDispatcher(self._registry) # The wait_for_stop event (initially stopped) - self._fw_stop_event = asyncio.Event() + self._fw_stop_event = threading.Event() self._fw_stop_event.set() def add_property(self, name, value): # type: (str, object) -> bool """ Adds a property to the framework **if it is not yet set**. - If the property already exists (same name), then nothing is done. Properties can't be updated. - :param name: The property name :param value: The value to set :return: True if the property was stored, else False """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_add_property(name, value), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_uninstall()) - - async def async_add_property(self, name, value): - """ - Async Adds a property to the framework **if it is not yet set**. - - If the property already exists (same name), then nothing is done. - Properties can't be updated. - - :param name: The property name - :param value: The value to set - :return: True if the property was stored, else False - """ - async with self.__properties_lock: + with self.__properties_lock: if name in self.__properties: # Already stored property return False @@ -822,7 +677,6 @@ def find_service_references( # type: (Optional[str], Optional[str], bool) -> Optional[List[ServiceReference]] """ Finds all services references matching the given filter. - :param clazz: Class implemented by the service :param ldap_filter: Service filter :param only_one: Return the first matching service reference only @@ -835,33 +689,9 @@ def find_service_references( ) def get_bundle_by_id(self, bundle_id): - # type: (int) -> Union[Bundle, Framework] - """ - Retrieves the bundle with the given ID - - :param bundle_id: ID of an installed bundle - :return: The requested bundle - :raise BundleException: The ID is invalid - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_bundle_id(bundle_id), self.__loop) - # Waiting for the end of the coroutine - try: - return future.result() - except BundleException: - raise BundleException - else: - try: - return self.__loop.run_until_complete(self.async_get_bundle_id(bundle_id)) - except BundleException: - raise BundleException - - async def async_get_bundle_id(self, bundle_id): # type: (int) -> Union[Bundle, Framework] """ - Async Retrieves the bundle with the given ID - + Retrieves the bundle with the given ID :param bundle_id: ID of an installed bundle :return: The requested bundle :raise BundleException: The ID is invalid @@ -870,7 +700,7 @@ async def async_get_bundle_id(self, bundle_id): # "System bundle" return self - async with self.__bundles_lock: + with self.__bundles_lock: if bundle_id not in self.__bundles: raise BundleException("Invalid bundle ID {0}".format(bundle_id)) @@ -880,23 +710,6 @@ def get_bundle_by_name(self, bundle_name): # type: (str) -> Optional[Bundle] """ Retrieves the bundle with the given name - - :param bundle_name: Name of the bundle to look for - :return: The requested bundle, None if not found - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_bundle_name(bundle_name), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_get_bundle_name(bundle_name)) - - async def async_get_bundle_name(self, bundle_name): - # type: (str) -> Optional[Bundle] - """ - Async Retrieves the bundle with the given name - :param bundle_name: Name of the bundle to look for :return: The requested bundle, None if not found """ @@ -908,7 +721,7 @@ async def async_get_bundle_name(self, bundle_name): # System bundle requested return self - async with self.__bundles_lock: + with self.__bundles_lock: for bundle in self.__bundles.values(): if bundle_name == bundle.get_symbolic_name(): # Found ! @@ -921,24 +734,9 @@ def get_bundles(self): # type: () -> List[Bundle] """ Returns the list of all installed bundles - :return: the list of all installed bundles """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_bundles(), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_get_bundles()) - - async def async_get_bundles(self): - """ - Async Returns the list of all installed bundles - - :return: the list of all installed bundles - """ - async with self.__bundles_lock: + with self.__bundles_lock: return [ self.__bundles[bundle_id] for bundle_id in sorted(self.__bundles.keys()) @@ -949,20 +747,7 @@ def get_properties(self): """ Retrieves a copy of the stored framework properties. """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_properties(), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_get_properties(), self.__loop) - - async def async_get_properties(self): - # type: () -> dict - """ - Async Retrieves a copy of the stored framework properties. - """ - async with self.__properties_lock: + with self.__properties_lock: return self.__properties.copy() def get_property(self, name): @@ -970,58 +755,24 @@ def get_property(self, name): """ Retrieves a framework or system property. As framework properties don't change while it's running, this method don't need to be protected. - :param name: The property name """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_property(name), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_get_property(name)) - - async def async_get_property(self, name): - # type: (str) -> object - """ - Async Retrieves a framework or system property. As framework properties don't - change while it's running, this method don't need to be protected. - - :param name: The property name - """ - async with self.__properties_lock: + with self.__properties_lock: return self.__properties.get(name, os.getenv(name)) def get_property_keys(self): # type: () -> tuple """ Returns an array of the keys in the properties of the service - - :return: An array of property keys. - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_get_property_keys(), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - return self.__loop.run_until_complete(self.async_get_property_keys(), self.__loop) - - async def async_get_property_keys(self): - # type: () -> tuple - """ - Async Returns an array of the keys in the properties of the service - :return: An array of property keys. """ - async with self.__properties_lock: + with self.__properties_lock: return tuple(self.__properties.keys()) def get_service(self, bundle, reference): # type: (Bundle, ServiceReference) -> Any """ Retrieves the service corresponding to the given reference - :param bundle: The bundle requiring the service :param reference: A service reference :return: The requested service @@ -1044,7 +795,6 @@ def _get_service_objects(self, bundle, reference): """ Returns the ServiceObjects object for the service referenced by the specified ServiceReference object. - :param bundle: The bundle requiring the service :param reference: Reference to a prototype service factory :return: An intermediate object to get more instances of a service @@ -1055,7 +805,6 @@ def get_symbolic_name(self): # type: () -> str """ Retrieves the framework symbolic name - :return: Always "pelix.framework" """ return "pelix.framework" @@ -1064,50 +813,17 @@ def install_bundle(self, name, path=None): # type: (str, str) -> Bundle """ Installs the bundle with the given name - *Note:* Before Pelix 0.5.0, this method returned the ID of the installed bundle, instead of the Bundle object. - **WARNING:** The behavior of the loading process is subject to changes, as it does not allow to safely run multiple frameworks in the same Python interpreter, as they might share global module values. - - :param name: A bundle name - :param path: Preferred path to load the module - :return: The installed Bundle object - :raise BundleException: Something happened - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_install_bundle(name, path), self.__loop) - try: - # Waiting for the end of the coroutine - return future.result() - except BundleException: - raise BundleException - else: - try: - return self.__loop.run_until_complete(self.async_install_bundle(name, path)) - except BundleException: - raise BundleException - - async def async_install_bundle(self, name, path=None): - """ - Async Installs the bundle with the given name - - *Note:* Before Pelix 0.5.0, this method returned the ID of the - installed bundle, instead of the Bundle object. - - **WARNING:** The behavior of the loading process is subject to changes, - as it does not allow to safely run multiple frameworks in the same - Python interpreter, as they might share global module values. - :param name: A bundle name :param path: Preferred path to load the module :return: The installed Bundle object :raise BundleException: Something happened """ - async with self.__bundles_lock: + with self.__bundles_lock: # A bundle can't be installed twice for bundle in self.__bundles.values(): if bundle.get_symbolic_name() == name: @@ -1164,33 +880,6 @@ def install_package(self, path, recursive=False, prefix=None): # type: (str, bool, str) -> tuple """ Installs all the modules found in the given package - - :param path: Path of the package (folder) - :param recursive: If True, install the sub-packages too - :param prefix: (**internal**) Prefix for all found modules - :return: A 2-tuple, with the list of installed bundles and the list - of failed modules names - :raise ValueError: Invalid path - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_install_package(path, recursive=recursive, prefix=prefix), self.__loop) - try: - # Waiting for the end of the coroutine - return future.result() - except ValueError: - raise ValueError - else: - try: - return self.__loop.run_until_complete(self.async_install_package(path, recursive=recursive, prefix=prefix)) - except ValueError: - raise ValueError - - async def async_install_package(self, path, recursive=False, prefix=None): - # type: (str, bool, str) -> tuple - """ - ASync Installs all the modules found in the given package - :param path: Path of the package (folder) :param recursive: If True, install the sub-packages too :param prefix: (**internal**) Prefix for all found modules @@ -1217,31 +906,30 @@ def visitor(fullname, is_package, module_path): """ return recursive or not is_package - loop = asyncio.get_running_loop() # Set up the prefix if needed if prefix is None: prefix = os.path.basename(path) bundles = set() # type: Set[Bundle] failed = set() # type: Set[str] - try: - install = loop.create_task(self.async_install_bundle(prefix, os.path.dirname(path))) - # Install the package first, resolved from the parent directory - installed_bundles = await install - bundles.add(installed_bundles) - # Visit the package - visiting = loop.create_task(self.async_install_visiting(path, visitor, prefix=prefix)) - visited, sub_failed = await visiting + with self.__bundles_lock: + try: + # Install the package first, resolved from the parent directory + bundles.add(self.install_bundle(prefix, os.path.dirname(path))) - # Update the sets - bundles.update(visited) - failed.update(sub_failed) + # Visit the package + visited, sub_failed = self.install_visiting( + path, visitor, prefix + ) - except BundleException as ex: - # Error loading the module - _logger.warning("Error loading package %s: %s", prefix, ex) - failed.add(prefix) + # Update the sets + bundles.update(visited) + failed.update(sub_failed) + except BundleException as ex: + # Error loading the module + _logger.warning("Error loading package %s: %s", prefix, ex) + failed.add(prefix) return bundles, failed @@ -1249,45 +937,10 @@ def install_visiting(self, path, visitor, prefix=None): """ Installs all the modules found in the given path if they are accepted by the visitor. - - The visitor must be a callable accepting 3 parameters: - - * fullname: The full name of the module - * is_package: If True, the module is a package - * module_path: The path to the module file - - :param path: Root search path - :param visitor: The visiting callable - :param prefix: (**internal**) Prefix for all found modules - :return: A 2-tuple, with the list of installed bundles and the list - of failed modules names - :raise ValueError: Invalid path or visitor - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_install_visiting(path, visitor, prefix=prefix), self.__loop) - try: - # Waiting for the end of the coroutine - return future.result() - except ValueError: - raise ValueError - else: - try: - return self.__loop.run_until_complete(self.async_install_visiting(path, visitor, prefix=prefix)) - except ValueError: - raise ValueError - - async def async_install_visiting(self, path, visitor, prefix=None): - """ - Async Installs all the modules found in the given path if they are accepted - by the visitor. - The visitor must be a callable accepting 3 parameters: - * fullname: The full name of the module * is_package: If True, the module is a package * module_path: The path to the module file - :param path: Root search path :param visitor: The visiting callable :param prefix: (**internal**) Prefix for all found modules @@ -1314,44 +967,41 @@ async def async_install_visiting(self, path, visitor, prefix=None): if prefix is None: prefix = os.path.basename(path) - loop = asyncio.get_running_loop() bundles = set() failed = set() - # Walk through the folder to find modules - for name, is_package in walk_modules(path): - # Ignore '__main__' modules - if name == "__main__": - continue - - # Compute the full name of the module - fullname = ".".join((prefix, name)) if prefix else name - try: - if visitor(fullname, is_package, path): - if is_package: - # Install the package - install = loop.create_task(self.async_install_bundle(fullname, path)) - installed_bundles = await install - bundles.add(installed_bundles) - - # Visit the package - sub_path = os.path.join(path, name) - visiting = loop.create_task(self.async_install_visiting(sub_path, visitor, fullname)) - sub_bundles, sub_failed = await visiting - bundles.update(sub_bundles) - failed.update(sub_failed) - else: - # Install the bundle - install = loop.create_task(self.async_install_bundle(fullname, path)) - installed_bundles = await install - bundles.add(installed_bundles) - except BundleException as ex: - # Error loading the module - _logger.warning("Error visiting %s: %s", fullname, ex) + with self.__bundles_lock: + # Walk through the folder to find modules + for name, is_package in walk_modules(path): + # Ignore '__main__' modules + if name == "__main__": + continue - # Try the next module - failed.add(fullname) - continue + # Compute the full name of the module + fullname = ".".join((prefix, name)) if prefix else name + try: + if visitor(fullname, is_package, path): + if is_package: + # Install the package + bundles.add(self.install_bundle(fullname, path)) + + # Visit the package + sub_path = os.path.join(path, name) + sub_bundles, sub_failed = self.install_visiting( + sub_path, visitor, fullname + ) + bundles.update(sub_bundles) + failed.update(sub_failed) + else: + # Install the bundle + bundles.add(self.install_bundle(fullname, path)) + except BundleException as ex: + # Error loading the module + _logger.warning("Error visiting %s: %s", fullname, ex) + + # Try the next module + failed.add(fullname) + continue return bundles, failed @@ -1368,64 +1018,6 @@ def register_service( # type: (Bundle, Union[List[Any], type, str], object, dict, bool, bool, bool) -> ServiceRegistration """ Registers a service and calls the listeners - - :param bundle: The bundle registering the service - :param clazz: Name(s) of the interface(s) implemented by service - :param service: The service to register - :param properties: Service properties - :param send_event: If not, doesn't trigger a service registered event - :param factory: If True, the given service is a service factory - :param prototype: If True, the given service is a prototype service - factory (the factory argument is considered True) - :return: A ServiceRegistration object - :raise BundleException: An error occurred while registering the service - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_register_service( - bundle, - clazz, - service, - properties, - send_event, - factory=factory, - prototype=prototype - ), self.__loop) - - try: - # Waiting for the end of the coroutine - return future.result() - except BundleException: - raise BundleException - else: - try: - return self.__loop.run_until_complite(self.async_register_service( - bundle, - clazz, - service, - properties, - send_event, - factory=factory, - prototype=prototype - )) - - except BundleException: - raise BundleException - - async def async_register_service( - self, - bundle, - clazz, - service, - properties, - send_event, - factory=False, - prototype=False, - ): - # type: (Bundle, Union[List[Any], type, str], object, dict, bool, bool, bool) -> ServiceRegistration - """ - Async Registers a service and calls the listeners - :param bundle: The bundle registering the service :param clazz: Name(s) of the interface(s) implemented by service :param service: The service to register @@ -1468,15 +1060,13 @@ async def async_register_service( # Class OK classes.append(svc_clazz) - loop = asyncio.get_running_loop() # Make the service registration registration = self._registry.register( bundle, classes, properties, service, factory, prototype ) # Update the bundle registration information - service_registration = loop.create_task(bundle._registered_service(registration)) - await service_registration + bundle._registered_service(registration) if send_event: # Call the listeners @@ -1491,36 +1081,11 @@ def start(self): # type: () -> bool """ Starts the framework - :return: True if the bundle has been started, False if it was already running :raise BundleException: A bundle failed to start """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_start(), self.__loop) - try: - # Waiting for the end of the coroutine - return future.result() - except BundleException: - raise BundleException - else: - try: - return self.__loop.run_until_complete(self.async_start()) - except BundleException: - raise BundleException - - async def async_start(self): - # type: () -> bool - """ - Async Starts the framework - - :return: True if the bundle has been started, False if it was already - running - :raise BundleException: A bundle failed to start - """ - loop = asyncio.get_running_loop() - async with self._lock: + with self._lock: if self._state in (Bundle.STARTING, Bundle.ACTIVE): # Already started framework return False @@ -1536,22 +1101,18 @@ async def async_start(self): # Start all registered bundles (use a copy, just in case...) for bundle in self.__bundles.copy().values(): - start = loop.create_task(bundle.start()) try: - await start - + bundle.start() except FrameworkException as ex: # Important error _logger.exception( "Important error starting bundle: %s", bundle - ) - + ) if ex.needs_stop: # Stop the framework (has to be in active state) self._state = Bundle.ACTIVE self.stop() return False - except BundleException: # A bundle failed to start : just log _logger.exception("Error starting bundle: %s", bundle) @@ -1564,23 +1125,9 @@ def stop(self): # type: () -> bool """ Stops the framework - - :return: True if the framework stopped, False it wasn't running - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_stop(), self.__loop) - return future.result() - else: - return self.__loop.run_until_complete(self.async_stop()) - - async def async_stop(self): - """ - Async Stops the framework - :return: True if the framework stopped, False it wasn't running """ - async with self._lock: + with self._lock: if self._state != Bundle.ACTIVE: # Invalid state return False @@ -1633,7 +1180,6 @@ async def async_stop(self): def delete(self, force=False): """ Deletes the current framework - :param force: If True, stops the framework before deleting it :return: True if the framework has been delete, False if is couldn't """ @@ -1650,7 +1196,6 @@ def delete(self, force=False): def uninstall(self): """ A framework can't be uninstalled - :raise BundleException: This method must not be called """ raise BundleException("A framework can't be uninstalled") @@ -1659,30 +1204,6 @@ def uninstall_bundle(self, bundle): # type: (Bundle) -> None """ Ends the uninstallation of the given bundle (must be called by Bundle) - - :param bundle: The bundle to uninstall - :raise BundleException: Invalid bundle - """ - if self.__loop.is_running(): - try: - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_uninstall_bundle(bundle), self.__loop) - # Waiting for the end of the coroutine - future.result() - except BundleException: - raise BundleException - - else: - try: - self.__loop.run_until_complite(self.async_uninstall_bundle(bundle)) - except BundleException: - raise BundleException - - async def async_uninstall_bundle(self, bundle): - # type: (Bundle) -> None - """ - Async Ends the uninstallation of the given bundle (must be called by Bundle) - :param bundle: The bundle to uninstall :raise BundleException: Invalid bundle """ @@ -1690,11 +1211,9 @@ async def async_uninstall_bundle(self, bundle): # Do nothing return - loop = asyncio.get_running_loop() - async with self.__bundles_lock: + with self.__bundles_lock: # Stop the bundle first - stop = loop.create_task(bundle.async_stop()) - await stop + bundle.stop() bundle_id = bundle.get_bundle_id() if bundle_id not in self.__bundles: @@ -1730,7 +1249,6 @@ def unregister_service(self, registration): # type: (ServiceRegistration) -> bool """ Unregisters the given service - :param registration: A ServiceRegistration to the service to unregister :raise BundleException: Invalid reference """ @@ -1755,21 +1273,19 @@ def unregister_service(self, registration): del self.__unregistering_services[reference] return True - async def _hide_bundle_services(self, bundle): + def _hide_bundle_services(self, bundle): # type: (Bundle) -> List[ServiceReference] """ Hides the services of the given bundle in the service registry - :param bundle: The bundle providing services :return: The references of the hidden services """ return self._registry.hide_bundle_services(bundle) - async def _unget_used_services(self, bundle): + def _unget_used_services(self, bundle): # type: (Bundle) -> None """ Cleans up all service usages of the given bundle - :param bundle: Bundle to be cleaned up """ self._registry.unget_used_services(bundle) @@ -1777,62 +1293,20 @@ async def _unget_used_services(self, bundle): def update(self): """ Stops and starts the framework, if the framework is active. - - :raise BundleException: Something wrong occurred while stopping or - starting the framework. - """ - if self.__loop.is_running(): - try: - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_update(), self.__loop) - # Waiting for the end of the coroutine - future.result() - except BundleException: - raise BundleException - else: - try: - self.__loop.run_until_complete(self.async_update()) - except BundleException: - raise BundleException - - async def async_update(self): - """ - Async Stops and starts the framework, if the framework is active. - :raise BundleException: Something wrong occurred while stopping or starting the framework. """ - if self._state == Bundle.ACTIVE: - await self.async_stop() - await self.async_start() + with self._lock: + if self._state == Bundle.ACTIVE: + self.stop() + self.start() def wait_for_stop(self, timeout=None): # type: (Optional[int]) -> bool """ Waits for the framework to stop. Does nothing if the framework bundle is not in ACTIVE state. - - Uses a Asyncio.Condition object - - :param timeout: The maximum time to wait (in seconds) - :return: True if the framework has stopped, False if the timeout raised - """ - if self.__loop.is_running(): - # I'm in another thread, so I'm scheduling the coroutine - future = asyncio.run_coroutine_threadsafe(self.async_wait_for_stop(timeout=timeout), self.__loop) - # Waiting for the end of the coroutine - return future.result() - else: - self.__loop.run_until_complete(self.async_wait_for_stop(timeout=timeout)) - - async def async_wait_for_stop(self, timeout=None): - # type: (Optional[int]) -> bool - """ - Async Waits for the framework to stop. Does nothing if the framework bundle - is not in ACTIVE state. - - Uses a Asyncio.Condition object - + Uses a threading.Condition object :param timeout: The maximum time to wait (in seconds) :return: True if the framework has stopped, False if the timeout raised """ @@ -1840,12 +1314,9 @@ async def async_wait_for_stop(self, timeout=None): # Inactive framework, ignore the call return True - try: - await asyncio.wait_for(self._fw_stop_event.wait(), timeout) - except asyncio.TimeoutError: - return False + self._fw_stop_event.wait(timeout) - async with self._lock: + with self._lock: # If the timeout raised, we should be in another state return self._state == Bundle.RESOLVED @@ -1880,7 +1351,6 @@ def get_service_reference(self): """ Returns the ServiceReference for the service associated with this object. - :return: The ServiceReference to the service associated to this object """ return self.__reference @@ -1889,7 +1359,6 @@ def unget_service(self, service): # type: (Any) -> bool """ Releases a service object for the associated service. - :param service: An instance of a service returned by ``get_service()`` :return: True if the bundle usage has been removed """ @@ -1925,12 +1394,9 @@ def add_bundle_listener(self, listener): """ Registers a bundle listener, which will be notified each time a bundle is installed, started, stopped or updated. - The listener must be a callable accepting a single parameter:\ - * **event** -- The description of the event (a :class:`~pelix.internals.events.BundleEvent` object). - :param listener: The bundle listener to register :return: True if the listener has been registered, False if it already was @@ -1941,15 +1407,12 @@ def add_framework_stop_listener(self, listener): """ Registers a listener that will be called back right before the framework stops - The framework listener must have a method with the following prototype:: - def framework_stopping(self): ''' No parameter given ''' # ... - :param listener: The framework stop listener :return: True if the listener has been registered """ @@ -1960,17 +1423,13 @@ def add_service_listener( ): """ Registers a service listener - The service listener must have a method with the following prototype:: - def service_changed(self, event): ''' Called by Pelix when some service properties changes - event: A ServiceEvent object ''' # ... - :param bundle_context: This bundle context :param listener: The listener to register :param ldap_filter: Filter that must match the service properties @@ -1989,7 +1448,6 @@ def get_all_service_references(self, clazz, ldap_filter=None): The returned array of ServiceReference objects contains services that were registered under the specified class and match the specified filter expression. - :param clazz: Class implemented by the service :param ldap_filter: Service filter :return: The sorted list of all matching service references, or None @@ -2002,7 +1460,6 @@ def get_bundle(self, bundle_id=None): Retrieves the :class:`~pelix.framework.Bundle` object for the bundle matching the given ID (int). If no ID is given (None), the bundle associated to this context is returned. - :param bundle_id: A bundle ID (optional) :return: The requested :class:`~pelix.framework.Bundle` object :raise BundleException: The given ID doesn't exist or is invalid @@ -2020,7 +1477,6 @@ def get_bundles(self): # type: () -> List[Bundle] """ Returns the list of all installed bundles - :return: A list of :class:`~pelix.framework.Bundle` objects """ return self.__framework.get_bundles() @@ -2030,7 +1486,6 @@ def get_framework(self): """ Returns the :class:`~pelix.FRAMEWORK.Framework` that created this bundle context - :return: The :class:`~pelix.framework.Framework` object """ return self.__framework @@ -2040,7 +1495,6 @@ def get_property(self, name): """ Returns the value of a property of the framework, else returns the OS environment value. - :param name: A property name """ return self.__framework.get_property(name) @@ -2049,7 +1503,6 @@ def get_service(self, reference): # type: (ServiceReference) -> Any """ Returns the service described with the given reference - :param reference: A ServiceReference object :return: The service object itself """ @@ -2060,7 +1513,6 @@ def get_service_objects(self, reference): """ Returns the ServiceObjects object for the service referenced by the specified ServiceReference object. - :param reference: Reference to a prototype service factory :return: An intermediate object to get more instances of a service """ @@ -2071,7 +1523,6 @@ def get_service_reference(self, clazz, ldap_filter=None): """ Returns a ServiceReference object for a service that implements and was registered under the specified class - :param clazz: The class name with which the service was registered. :param ldap_filter: A filter on service properties :return: A service reference, None if not found @@ -2089,7 +1540,6 @@ def get_service_references(self, clazz, ldap_filter=None): """ Returns the service references for services that were registered under the specified class by this bundle and matching the given filter - :param clazz: The class name with which the service was registered. :param ldap_filter: A filter on service properties :return: The list of references to the services registered by the @@ -2107,21 +1557,17 @@ def install_bundle(self, name, path=None): # type: (str, str) -> Bundle """ Installs the bundle (module) with the given name. - If a path is given, it is inserted in first place in the Python loading path (``sys.path``). All modules loaded alongside this bundle, *i.e.* by this bundle or its dependencies, will be looked after in this path in priority. - .. note:: Before Pelix 0.5.0, this method returned the ID of the installed bundle, instead of the Bundle object. - .. warning:: The behavior of the loading process is subject to changes, as it does not allow to safely run multiple frameworks in the same Python interpreter, as they might share global module values. - :param name: The name of the bundle to install :param path: Preferred path to load the module (optional) :return: The :class:`~pelix.framework.Bundle` object of the installed @@ -2138,7 +1584,6 @@ def install_package(self, path, recursive=False): It is a utility method working like :meth:`~pelix.framework.BundleContext.install_visiting`, with a visitor accepting every module found. - :param path: Path of the package (folder) :param recursive: If True, installs the modules found in sub-directories :return: A 2-tuple, with the list of installed bundles @@ -2152,13 +1597,10 @@ def install_visiting(self, path, visitor): """ Looks for modules in the given path and installs those accepted by the given visitor. - The visitor must be a callable accepting 3 parameters:\ - * **fullname** -- The full name of the module * **is_package** -- If True, the module is a package * **module_path** -- The path to the module file - :param path: Root search path (folder) :param visitor: The visiting callable :return: A 2-tuple, with the list of installed bundles @@ -2180,7 +1622,6 @@ def register_service( # type: (Union[List[Any], type, str], object, dict, bool, bool, bool) -> ServiceRegistration """ Registers a service - :param clazz: Class or Classes (list) implemented by this service :param service: The service instance :param properties: The services properties (dictionary) @@ -2204,7 +1645,6 @@ def register_service( def remove_bundle_listener(self, listener): """ Unregisters the given bundle listener - :param listener: The bundle listener to remove :return: True if the listener has been unregistered, False if it wasn't registered @@ -2214,7 +1654,6 @@ def remove_bundle_listener(self, listener): def remove_framework_stop_listener(self, listener): """ Unregisters a framework stop listener - :param listener: The framework stop listener :return: True if the listener has been unregistered """ @@ -2223,7 +1662,6 @@ def remove_framework_stop_listener(self, listener): def remove_service_listener(self, listener): """ Unregisters a service listener - :param listener: The service listener :return: True if the listener has been unregistered """ @@ -2233,7 +1671,6 @@ def unget_service(self, reference): # type: (ServiceReference) -> bool """ Disables a reference to the service - :return: True if the bundle was using this reference, else False """ # Lose the dependency @@ -2259,7 +1696,6 @@ def get_framework(cls, properties=None): """ If it doesn't exist yet, creates a framework with the given properties, else returns the current framework instance. - :return: A Pelix instance """ if cls.__singleton is None: @@ -2276,7 +1712,6 @@ def is_framework_running(cls, framework=None): Tests if the given framework has been constructed and not deleted. If *framework* is None, then the methods returns if at least one framework is running. - :param framework: The framework instance to be tested :return: True if the framework is running """ @@ -2291,7 +1726,6 @@ def delete_framework(cls, framework=None): # pylint: disable=W0212 """ Removes the framework singleton - :return: True on success, else False """ if framework is None: @@ -2346,7 +1780,6 @@ def create_framework( If *auto_delete* is True, the framework will be deleted once it has stopped, and the method will return None. This requires *wait_for_stop* and *auto_start* to be True. - :param bundles: Bundles to initially install (shouldn't be empty if *wait_for_stop* is True) :param properties: Optional framework properties @@ -2395,7 +1828,6 @@ def _package_exists(path): """ Checks if the given Python path matches a valid file or a valid container file - :param path: A Python path :return: True if the module or its container exists """ @@ -2445,4 +1877,4 @@ def normalize_path(): # builtin modules don't have a __path__ pass except ImportError: - pass + pass \ No newline at end of file