Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added checks, warnings, and exceptions for invalid extension_director… #45

Merged
merged 1 commit into from
Jan 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions flowmancer/flowmancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import inspect
import os
import pkgutil
import sys
import time
from argparse import ArgumentParser
from collections import namedtuple
Expand Down Expand Up @@ -44,6 +43,14 @@ class NoTasksLoadedError(Exception):
pass


class ExtensionsDirectoryNotFoundError(Exception):
pass


class NotAPackageError(Exception):
pass


# Need to explicitly manage loop in case multiple instances of Flowmancer are run.
@contextlib.contextmanager
def _create_loop():
Expand All @@ -54,15 +61,35 @@ def _create_loop():
loop.close()


def _load_extensions_path(path: str, add_to_path: bool = True, package_chain: List[str] = []):
def _load_extensions_path(path: str, package_chain: Optional[List[str]] = None):
if not path.startswith('/'):
path = os.path.join(os.path.dirname(os.path.abspath(inspect.stack()[-1][1])), path)
if add_to_path:
sys.path.append(path)
path = os.path.abspath(
os.path.join(
os.path.dirname(
os.path.abspath(inspect.stack()[-1][1])
),
path
)
)

if not os.path.exists(path):
raise ExtensionsDirectoryNotFoundError(f"No such directory: '{path}'")
if os.path.isfile(path):
raise NotAPackageError(f"Only packages (directories) are allowed. The following is not a dir: '{path}'")
if not os.path.exists(os.path.join(path, '__init__.py')):
print(f"WARNING: The '{path}' dir is not a package (no __init__.py file found). Modules will not be imported.")

if not package_chain:
package_chain = [os.path.basename(path)]

for x in pkgutil.iter_modules(path=[path]):
importlib.import_module('.'.join(package_chain+[x.name]))
try:
print(f"importing: {'.'.join(package_chain+[x.name])}")
importlib.import_module('.'.join(package_chain+[x.name]))
except Exception as e:
print(f"Skipping import for '{'.'.join(package_chain+[x.name])}' due to {type(e).__name__}: {str(e)}")
if x.ispkg:
_load_extensions_path(os.path.join(path, x.name), False, package_chain+[x.name])
_load_extensions_path(os.path.join(path, x.name), package_chain+[x.name])


class Flowmancer:
Expand Down Expand Up @@ -318,8 +345,15 @@ def load_job_definition(self, j: Union[JobDefinition, str], filetype: str = 'yam

# Recursively import any modules found in the following paths in order to trigger the registration of any
# decorated classes.
search_paths = ['./tasks', './extensions', './loggers'] + jobdef.config.extension_directories
for p in search_paths:
for p in ['./tasks', './extensions', './loggers']:
try:
_load_extensions_path(p)
except ExtensionsDirectoryNotFoundError:
# Don't error on the absence of dirs that are searched by default.
pass

# Allow for missing dir exceptions for passed-in paths.
for p in jobdef.config.extension_directories:
_load_extensions_path(p)

for p in jobdef.config.extension_packages:
Expand Down