-
Notifications
You must be signed in to change notification settings - Fork 28
/
pipelinerunner.py
291 lines (234 loc) · 10.7 KB
/
pipelinerunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""pypyr pipeline runner.
Runs the pipeline specified by the input pipeline_name parameter.
Pipelines must have a "steps" list-like attribute.
Attributes:
pipeline_cache: global instance of the pipeline yaml cache.
Use this attribute to access the cache from elsewhere.
"""
import logging
import pypyr.context
import pypyr.log.logger
import pypyr.moduleloader
from pypyr.cache.parsercache import contextparser_cache
from pypyr.errors import Stop, StopPipeline
from pypyr.cache.pipelinecache import pipeline_cache
from pypyr.stepsrunner import StepsRunner
import pypyr.yaml
# use pypyr logger to ensure loglevel is set correctly
logger = logging.getLogger(__name__)
def get_parsed_context(pipeline, context_in_args):
"""Execute get_parsed_context handler if specified.
Dynamically load the module specified by the context_parser key in pipeline
dict and execute the get_parsed_context function on that module.
Args:
pipeline: dict. Pipeline object.
context_in_args: list of string. Input arguments from console.
Returns:
pypyr.context.Context() instance.
Raises:
AttributeError: parser specified on pipeline missing get_parsed_context
function.
"""
logger.debug("starting")
if 'context_parser' in pipeline:
parser_module_name = pipeline['context_parser']
logger.debug("context parser specified: %s", parser_module_name)
get_parsed_context = contextparser_cache.get_context_parser(
parser_module_name)
logger.debug("running parser %s", parser_module_name)
result_context = get_parsed_context(context_in_args)
logger.debug("context parse %s done", parser_module_name)
# Downstream steps likely to expect context not to be None, hence
# empty rather than None.
if result_context is None:
logger.debug(
"%s returned None. Using empty context instead",
parser_module_name
)
return pypyr.context.Context()
else:
return pypyr.context.Context(result_context)
else:
logger.debug("pipeline does not have custom context parser. Using "
"empty context.")
logger.debug("done")
# initialize to an empty dictionary because you want to be able to run
# with no context.
return pypyr.context.Context()
def main(
pipeline_name,
pipeline_context_input,
working_dir,
log_level,
log_path,
groups=None,
success_group=None,
failure_group=None
):
"""Entry point for pypyr pipeline runner.
Call this once per pypyr run. Call me if you want to run a pypyr pipeline
from your own code. This function does some one-off 1st time initialization
before running the actual pipeline.
pipeline_name.yaml should be in the working_dir/pipelines/ directory.
Args:
pipeline_name: string. Name of pipeline, sans .yaml at end.
pipeline_context_input: string. Initialize the pypyr context with this
string.
working_dir: path. looks for ./pipelines and modules in this directory.
log_level: int. Standard python log level enumerated value.
log_path: os.path. Append log to this path.
groups: list of str. step-group names to run in pipeline.
success_group: str. step-group name to run on success completion.
failure_group: str. step-group name to run on pipeline failure.
Returns:
None
"""
pypyr.log.logger.set_root_logger(log_level, log_path)
logger.debug("starting pypyr")
# pipelines specify steps in python modules that load dynamically.
# make it easy for the operator so that the cwd is automatically included
# without needing to pip install a package 1st.
pypyr.moduleloader.set_working_directory(working_dir)
try:
load_and_run_pipeline(pipeline_name=pipeline_name,
pipeline_context_input=pipeline_context_input,
groups=groups,
success_group=success_group,
failure_group=failure_group)
except Stop:
logger.debug("Stop: stopped pypyr")
logger.debug("pypyr done")
def prepare_context(pipeline, context_in_args, context):
"""Prepare context for pipeline run.
Args:
pipeline: dict. Dictionary representing the pipeline.
context_in_args: list of str. Args used to initialize context.
context: pypyr.context.Context. Merge any new context generated from
context_in_string into this context instance.
Returns:
None. The context instance to use for the pipeline run is contained
in the context arg, it's not passed back as a function return.
"""
logger.debug("starting")
parsed_context = get_parsed_context(
pipeline=pipeline,
context_in_args=context_in_args)
context.update(parsed_context)
logger.debug("done")
def load_and_run_pipeline(pipeline_name,
pipeline_context_input=None,
context=None,
parse_input=True,
loader=None,
groups=None,
success_group=None,
failure_group=None):
"""Load and run the specified pypyr pipeline.
This function runs the actual pipeline by name. If you are running another
pipeline from within a pipeline, call this, not main(). Do call main()
instead for your 1st pipeline if there are pipelines calling pipelines.
By default pypyr uses file loader. This means that pipeline_name.yaml
should be in the working_dir/ directory if you're using fileloader.
Look for pipelines and modules in the working_dir. Set the working_dir by
calling pypyr.moduleloader.set_working_directory('/my/dir')
Args:
pipeline_name (str): Name of pipeline, sans .yaml at end.
pipeline_context_input (str): Initialize the pypyr context with this
string.
context (pypyr.context.Context): Use if you already have a
Context object, such as if you are running a pipeline from
within a pipeline and you want to re-use the same context
object for the child pipeline. Any mutations of the context by
the pipeline will be against this instance of it.
parse_input (bool): run context_parser in pipeline.
loader (str): str. optional. Absolute name of pipeline loader module.
If not specified will use pypyr.pypeloaders.fileloader.
groups: list of str. step-group names to run in pipeline.
success_group: str. step-group name to run on success completion.
failure_group: str. step-group name to run on pipeline failure.
Returns:
None
"""
logger.debug("you asked to run pipeline: %s", pipeline_name)
logger.debug("you set the initial context to: %s", pipeline_context_input)
if context is None:
context = pypyr.context.Context()
context.pipeline_name = pipeline_name
context.working_dir = pypyr.moduleloader.get_working_directory()
# pipeline loading deliberately outside of try catch. The try catch will
# try to run a failure-handler from the pipeline, but if the pipeline
# doesn't exist there is no failure handler that can possibly run so this
# is very much a fatal stop error.
pipeline_definition = pipeline_cache.get_pipeline(
pipeline_name=pipeline_name,
loader=loader)
run_pipeline(
pipeline=pipeline_definition,
pipeline_context_input=pipeline_context_input,
context=context,
parse_input=parse_input,
groups=groups,
success_group=success_group,
failure_group=failure_group
)
def run_pipeline(pipeline,
context,
pipeline_context_input=None,
parse_input=True,
groups=None,
success_group=None,
failure_group=None):
"""Run the specified pypyr pipeline.
This function runs the actual pipeline. If you are running another
pipeline from within a pipeline don't call main(). Do call main()
instead for your 1st pipeline, if there are subsequent pipelines calling
pipelines use load_and_run_pipeline or run_pipeline.
Pipeline and context should be already loaded. If pipeline not loaded yet,
you probably want to call load_and_run_pipeline instead.
If none of groups, success_group & failure_group specified, defaults to
['steps'], on_success, on_failure. If any of groups, success_group or
failure_group specified, will ONLY run the specified (i.e if you specify
groups you don't get on_success/on_failure groups unless you specify these
explicitly.)
Args:
pipeline (dict): Dictionary representing the pipeline.
context (pypyr.context.Context): Reusable context object.
pipeline_context_input (str): Initialize the pypyr context with this
string.
parse_input (bool): run context_parser in pipeline.
groups: list of str. step-group names to run in pipeline.
success_group: str. step-group name to run on success completion.
failure_group: str. step-group name to run on pipeline failure.
Returns:
None
"""
logger.debug("starting")
if not groups:
groups = ['steps']
if not success_group and not failure_group:
success_group = 'on_success'
failure_group = 'on_failure'
steps_runner = StepsRunner(pipeline_definition=pipeline, context=context)
try:
if parse_input:
logger.debug("executing context_parser")
prepare_context(pipeline=pipeline,
context_in_args=pipeline_context_input,
context=context)
else:
logger.debug("skipping context_parser")
except Exception:
# yes, yes, don't catch Exception. Have to, though, to run the failure
# handler. Also, it does raise it back up.
logger.error("Something went wrong. Will now try to run %s",
failure_group)
# failure_step_group will log but swallow any errors
steps_runner.run_failure_step_group(failure_group)
logger.debug("Raising original exception to caller.")
raise
try:
steps_runner.run_step_groups(groups=groups,
success_group=success_group,
failure_group=failure_group)
except StopPipeline:
logger.debug("StopPipeline: stopped %s", context.pipeline_name)