-
-
Notifications
You must be signed in to change notification settings - Fork 607
/
subsystem.py
230 lines (177 loc) · 9.39 KB
/
subsystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import importlib
import inspect
import logging
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, cast
from pants.option.option_value_container import OptionValueContainer
from pants.option.optionable import Optionable
from pants.option.options import Options
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin, SubsystemDependency
logger = logging.getLogger(__name__)
class SubsystemError(Exception):
"""An error in a subsystem."""
_S = TypeVar("_S", bound="Subsystem")
class Subsystem(SubsystemClientMixin, Optionable):
"""A separable piece of functionality that may be reused across multiple tasks or other code.
Subsystems encapsulate the configuration and initialization of things like JVMs,
Python interpreters, SCMs and so on.
Subsystem instances can be global or per-optionable. Global instances are useful for representing
global concepts, such as the SCM used in the workspace. Per-optionable instances allow individual
Optionable objects (notably, tasks) to have their own configuration for things such as artifact
caches.
Each subsystem type has an option scope. The global instance of that subsystem initializes
itself from options in that scope. An optionable-specific instance initializes itself from options
in an appropriate subscope, which defaults back to the global scope.
For example, the global artifact cache options would be in scope `cache`, but the
compile.java task can override those options in scope `cache.compile.java`.
Subsystems may depend on other subsystems, and therefore mix in SubsystemClientMixin.
:API: public
"""
class UninitializedSubsystemError(SubsystemError):
def __init__(self, class_name, scope):
super().__init__(f'Subsystem "{class_name}" not initialized for scope "{scope}"')
@classmethod
def is_subsystem_type(cls, obj):
return inspect.isclass(obj) and issubclass(obj, cls)
@classmethod
def scoped(cls, optionable, removal_version=None, removal_hint=None):
"""Returns a dependency on this subsystem, scoped to `optionable`.
:param removal_version: An optional deprecation version for this scoped Subsystem dependency.
:param removal_hint: An optional hint to accompany a deprecation removal_version.
Return value is suitable for use in SubsystemClientMixin.subsystem_dependencies().
"""
return SubsystemDependency(cls, optionable.options_scope, removal_version, removal_hint)
@classmethod
def get_scope_info(cls, subscope=None):
cls.validate_scope_name_component(cls.options_scope)
if subscope is None:
return super().get_scope_info()
else:
return ScopeInfo(cls.subscope(subscope), cls)
# The full Options object for this pants run. Will be set after options are parsed.
# TODO: A less clunky way to make option values available?
_options: Optional[Options] = None
@classmethod
def set_options(cls, options: Options) -> None:
cls._options = options
@classmethod
def is_initialized(cls) -> bool:
return cls._options is not None
# A cache of (cls, scope) -> the instance of cls tied to that scope.
# NB: it would be ideal to use `_S` rather than `Subsystem`, but we can't do this because
# MyPy complains that `_S` would not be properly constrained. Specifically, it suggests that we'd
# have to use typing.Generic or typing.Protocol to properly constrain the type var, which we
# don't want to do.
_scoped_instances: Dict[Tuple[Type["Subsystem"], str], "Subsystem"] = {}
@classmethod
def global_instance(cls: Type[_S]) -> _S:
"""Returns the global instance of this subsystem.
:API: public
:returns: The global subsystem instance.
Note that `global_instance` is a v1-idiom only. v2 rules should always request a subsystem as a rule input, rather than
trying to call <subsystem>.global_instance() in the body of an `@rule`.
"""
return cls._instance_for_scope(cls.options_scope) # type: ignore[arg-type] # MyPy is treating cls.options_scope as a Callable, rather than `str`
@classmethod
def scoped_instance(cls: Type[_S], optionable: Union[Optionable, Type[Optionable]]) -> _S:
"""Returns an instance of this subsystem for exclusive use by the given `optionable`.
:API: public
:param optionable: An optionable type or instance to scope this subsystem under.
:returns: The scoped subsystem instance.
"""
if not isinstance(optionable, Optionable) and not issubclass(optionable, Optionable):
raise TypeError(
"Can only scope an instance against an Optionable, given {} of type {}.".format(
optionable, type(optionable)
)
)
return cls._instance_for_scope(cls.subscope(optionable.options_scope))
@classmethod
def _instance_for_scope(cls: Type[_S], scope: str) -> _S:
if cls._options is None:
raise cls.UninitializedSubsystemError(cls.__name__, scope)
key = (cls, scope)
if key not in cls._scoped_instances:
cls._scoped_instances[key] = cls(scope, cls._options.for_scope(scope))
return cast(_S, cls._scoped_instances[key])
@classmethod
def reset(cls, reset_options: bool = True) -> None:
"""Forget all option values and cached subsystem instances.
Used primarily for test isolation and to reset subsystem state for pantsd.
"""
if reset_options:
cls._options = None
cls._scoped_instances = {}
def __init__(self, scope: str, scoped_options: OptionValueContainer) -> None:
"""Note: A subsystem has no access to options in scopes other than its own.
TODO: We'd like that to be true of Tasks some day. Subsystems will help with that.
Code should call scoped_instance() or global_instance() to get a subsystem instance.
It should not invoke this constructor directly.
:API: public
"""
super().__init__()
self._scope = scope
self._scoped_options = scoped_options
self._fingerprint = None
# It's safe to override the signature from Optionable because we validate
# that every Optionable has `options_scope` defined as a `str` in the __init__. This code is
# complex, though, and may be worth refactoring.
@property
def options_scope(self) -> str: # type: ignore[override]
return self._scope
@property
def options(self) -> OptionValueContainer:
"""Returns the option values for this subsystem's scope.
:API: public
"""
return self._scoped_options
def get_options(self) -> OptionValueContainer:
"""Returns the option values for this subsystem's scope.
:API: public
"""
return self._scoped_options
@staticmethod
def get_streaming_workunit_callbacks(subsystem_names: Iterable[str]) -> List[Callable]:
"""This method is used to dynamically generate a list of callables intended to be passed to
StreamingWorkunitHandler. The caller provides a collection of strings representing a Python
import path to a class that implements the `Subsystem` class. It will then inspect these
classes for the presence of a special method called `handle_workunits`, which will.
be called with a set of kwargs - see the docstring for StreamingWorkunitHandler.
For instance, you might invoke this method with something like:
`Subsystem.get_streaming_workunit_callbacks(["pants.reporting.workunits.Workunit"])`
And this will result in the method attempting to dynamically-import a
module called "pants.reporting.workunits", inspecting it for the presence
of a class called `Workunit`, getting a global instance of this Subsystem,
and returning a list containing a single reference to the
`handle_workunits` method defined on it - and returning an empty list and
emitting warnings if any of these steps fail.
"""
callables = []
for name in subsystem_names:
try:
name_components = name.split(".")
module_name = ".".join(name_components[:-1])
class_name = name_components[-1]
module = importlib.import_module(module_name)
subsystem_class = getattr(module, class_name)
except (IndexError, AttributeError, ModuleNotFoundError, ValueError) as e:
logger.warning(f"Invalid module name: {name}: {e}")
continue
except ImportError as e:
logger.warning(f"Could not import {module_name}: {e}")
continue
try:
subsystem = subsystem_class.global_instance()
except AttributeError:
logger.warning(f"{subsystem_class} is not a global subsystem.")
continue
try:
callables.append(subsystem.handle_workunits)
except AttributeError:
logger.warning(
f"{subsystem_class} does not have a method named `handle_workunits` defined."
)
continue
return callables