Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
Rewrite support for Pipeline in Pipeline (Block in Block technically)
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed Oct 1, 2021
1 parent ad7265e commit a24ca4a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 72 deletions.
3 changes: 3 additions & 0 deletions src/core/rkd/core/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ def main(self, argv: list):
num += 1
self.print_err(io, err, num)

if pre_parsed_args['print_event_history']:
executor.get_observer().print_event_history()

sys.exit(1)

except HandledExitException as err:
Expand Down
1 change: 0 additions & 1 deletion src/core/rkd/core/exception.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from types import FunctionType
from typing import List, Type
from jsonschema import ValidationError
from .argparsing.model import TaskArguments, ArgumentBlock


class RiotKitDoException(Exception):
Expand Down
138 changes: 97 additions & 41 deletions src/core/rkd/core/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@


class OneByOneTaskExecutor(ExecutorInterface, TaskIterator):
""" Executes tasks one-by-one, providing a context that includes eg. parsed arguments """
"""
Executes tasks one-by-one, providing a context that includes eg. parsed arguments
"""

_ctx: ApplicationContext
_observer: ProgressObserver
Expand All @@ -43,7 +45,7 @@ def fail_fast(self) -> bool:
def process_task(self, scheduled: DeclarationScheduledToRun, task_num: int):
self.execute(scheduled, task_num)

def execute(self, scheduled_declaration: DeclarationScheduledToRun, task_num: int):
def execute(self, scheduled_declaration: DeclarationScheduledToRun, task_num: int, inherited: bool = False):
"""
Prepares all dependencies, then triggers execution
"""
Expand Down Expand Up @@ -97,7 +99,7 @@ def execute(self, scheduled_declaration: DeclarationScheduledToRun, task_num: in
# When: Task has a failure
#
temp.finally_clean_up()
self._on_failure(scheduled_declaration, keep_going, task_num, e)
self._on_failure(scheduled_declaration, keep_going, task_num, e, inherited=inherited)

return

Expand All @@ -109,10 +111,10 @@ def execute(self, scheduled_declaration: DeclarationScheduledToRun, task_num: in
if result is True:
self._observer.task_succeed(scheduled_declaration)
else:
self._on_failure(scheduled_declaration, keep_going, task_num, None)
self._on_failure(scheduled_declaration, keep_going, task_num, None, inherited=inherited)

def _on_failure(self, scheduled_declaration: DeclarationScheduledToRun, keep_going: bool, task_num: int,
exception: Optional[Exception] = None):
exception: Optional[Exception] = None, inherited: bool = False):
"""
Executed when task fails: Goes through all nested blocks and tries to rescue the situation or notify an error
Expand All @@ -124,37 +126,42 @@ def _on_failure(self, scheduled_declaration: DeclarationScheduledToRun, keep_goi
eg. second time after failing first time
"""

# todo: blocks should be in DeclarationScheduledToRun?
blocks = scheduled_declaration.get_blocks_ordered_by_children_to_parent()
last_block: ArgumentBlock = blocks[0]
block_num = 0

self.io.internal(f'declaration={scheduled_declaration}')
self.io.internal(f'last_block={last_block}')

for block in blocks:
block_num += 1
self.io.internal(f'Handling failure of {scheduled_declaration} in block #{block_num} {block}')
if not inherited:
for block in blocks:
block_num += 1

if block.is_default_empty_block:
self.io.internal('Skipping default empty block')
continue
self.io.internal(f'Handling failure of {scheduled_declaration} in block #{block_num} {block}')

# NOTE: we need to mark blocks as resolved to avoid loops, as the execution process is triggered by
# upper layer - TaskResolver, that may not be aware of what is done there
if block.is_default_empty_block:
self.io.internal('Skipping default empty block')
continue

if block.is_already_failed_for(scheduled_declaration.declaration):
self.io.internal(f'{scheduled_declaration} already failed in {block}')
continue
# NOTE: we need to mark blocks as resolved to avoid loops, as the execution process is triggered by
# upper layer - TaskResolver, that may not be aware of what is done there

self._handle_failure_in_specific_block(
scheduled_declaration, exception, block,
allow_retrying_single_task=block == last_block, # @retry works only for latest block
task_num=task_num
)
if block.is_already_failed_for(scheduled_declaration.declaration):
self.io.internal(f'{scheduled_declaration} already failed in {block}')
continue

is_failure_repaired = self._handle_failure_in_specific_block(
scheduled_declaration, exception, block,
task_num=task_num
)

self.io.internal(f'Marking {scheduled_declaration} as failed in {block}')
block.mark_as_failed_for(scheduled_declaration)
# if Block modifiers worked, and the Task result is repaired, then do not raise exception at the end
# also do not process next blocks
if is_failure_repaired:
return

self.io.internal(f'Marking {scheduled_declaration} as failed in {block}')
block.mark_as_failed_for(scheduled_declaration)

# break the whole pipeline only if not --keep-going
if not keep_going:
Expand All @@ -163,40 +170,87 @@ def _on_failure(self, scheduled_declaration: DeclarationScheduledToRun, keep_goi
def _handle_failure_in_specific_block(self, scheduled_declaration: DeclarationScheduledToRun,
exception: Exception,
block: ArgumentBlock,
allow_retrying_single_task: bool,
task_num: int):
task_num: int) -> bool:

if allow_retrying_single_task and block.should_task_be_retried(scheduled_declaration.declaration):
# ==============================================================================
# @retry: Repeat a Task multiple times, until it hits the maximum repeat count
# or the repeated Task will end with success
# ==============================================================================
while block.should_task_be_retried(scheduled_declaration.declaration):
block.task_retried(scheduled_declaration.declaration)
self._observer.task_retried(scheduled_declaration)
self.execute(scheduled_declaration, task_num)

return
try:
self.execute(scheduled_declaration, task_num, inherited=True)

elif block.should_block_be_retried():
except InterruptExecution:
continue

# if not "continue" then it is a success (no exception)
return True

# ==============================================================================
# @retry-block: Repeat all Tasks in current Block until success, or
# ==============================================================================
while block.should_block_be_retried():
self.io.internal(f'Got block to retry: {block}')
self._observer.group_of_tasks_retried(block)

for task in block.resolved_body_tasks():
self.execute(task, task_num)
succeed_count = 0
expected_tasks_to_succeed = len(block.resolved_body_tasks())

return
for task_in_block in block.resolved_body_tasks():
try:
self.execute(task_in_block, task_num, inherited=True)

except InterruptExecution:
continue

# if not raised the exception, then continue not worked
succeed_count += 1

if succeed_count == expected_tasks_to_succeed:
return True

# regardless of @error & @rescue there should be a visible information that task failed
self._notify_error(scheduled_declaration, exception)

# block modifiers (issue #50): @error and @rescue
# ===================================================================================================
# @error: Send an error notification, execute something in case of a failure
# ===================================================================================================
if block.has_action_on_error():
for resolved in block.resolved_error_tasks():
resolved: DeclarationScheduledToRun

try:
self.execute(resolved, resolved.created_task_num, inherited=True)

except InterruptExecution:
# immediately exit, when any of @on-error Task will fail
return False

# ===================================================================================================
# @rescue: Let's execute a Task instead of our original Task in case, when our original Task fails
# ===================================================================================================
if block.should_rescue_task():
self._observer.task_rescue_attempt(scheduled_declaration)

for resolved in block.resolved_rescue_tasks():
resolved: DeclarationScheduledToRun
self.execute(resolved, resolved.created_task_num)

elif block.has_action_on_error():
for resolved in block.resolved_error_tasks():
resolved: DeclarationScheduledToRun
self.execute(resolved, resolved.created_task_num)
try:
self.execute(resolved, resolved.created_task_num, inherited=True)

except InterruptExecution:
return False # it is expected, that all @rescue tasks will succeed

# if there is no any InterruptException, then we rescued the Task!
return True

# there were no method that was able to rescue the situation
self.io.internal('No valid modifier found in Block to change Task result')

return False

def _notify_error(self, scheduled_to_run: DeclarationScheduledToRun,
exception: Optional[Exception] = None):
Expand All @@ -216,7 +270,8 @@ def _notify_error(self, scheduled_to_run: DeclarationScheduledToRun,

def _execute_directly_or_forked(self, cmdline_become: str, task: TaskInterface, temp: TempManager,
ctx: ExecutionContext):
"""Execute directly or pass to a forked process
"""
Execute directly or pass to a forked process
"""

# unset incrementing variables
Expand All @@ -240,7 +295,8 @@ def _execute_directly_or_forked(self, cmdline_become: str, task: TaskInterface,

@staticmethod
def _execute_as_forked_process(become: str, task: TaskInterface, temp: TempManager, ctx: ExecutionContext):
"""Execute task code as a separate Python process
"""
Execute task code as a separate Python process
The communication between processes is with serialized data and text files.
One text file is a script, the task code is passed with stdin together with a whole context
Expand Down
24 changes: 12 additions & 12 deletions src/core/rkd/core/iterator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import List
from typing import List, Tuple
from .api.syntax import DeclarationScheduledToRun
from .argparsing.model import ArgumentBlock
from .exception import AggregatedResolvingFailure, InterruptExecution
Expand All @@ -14,29 +14,31 @@ def iterate(self, tasks: ResolvedTaskBag):

for scheduled in tasks.scheduled_declarations_to_run:
task_num += 1
aggregated_exceptions += self._handle_task(scheduled, task_num)

try:
aggregated_exceptions += self._handle_task(scheduled, task_num, aggregated_exceptions)

except InterruptExecution:
raise AggregatedResolvingFailure(aggregated_exceptions)

if self.iterate_blocks:
for block in scheduled.blocks:
if block not in blocks_collected:
blocks_collected.append(block)

# iterate over @error and @rescue
# OPTIONALLY iterate over @error and @rescue
if self.iterate_blocks:
for block in blocks_collected:
try:
tasks: List[DeclarationScheduledToRun] = block.resolved_error_tasks() + block.resolved_rescue_tasks()
except:
raise Exception("\n".join(block.trace))
tasks: List[DeclarationScheduledToRun] = block.resolved_error_tasks() + block.resolved_rescue_tasks()

for scheduled in tasks:
task_num += 1
aggregated_exceptions += self._handle_task(scheduled, task_num)
aggregated_exceptions += self._handle_task(scheduled, task_num, aggregated_exceptions)

if aggregated_exceptions:
raise AggregatedResolvingFailure(aggregated_exceptions)

def _handle_task(self, scheduled: DeclarationScheduledToRun, task_num: int):
def _handle_task(self, scheduled: DeclarationScheduledToRun, task_num: int, aggregated_exceptions: list):
"""
Provides error handling for process_task()
Expand All @@ -45,13 +47,11 @@ def _handle_task(self, scheduled: DeclarationScheduledToRun, task_num: int):
:return:
"""

aggregated_exceptions = []

try:
self.process_task(scheduled, task_num)

except InterruptExecution:
return aggregated_exceptions
raise

except Exception as err:
if self.fail_fast is False:
Expand Down
44 changes: 26 additions & 18 deletions src/core/tests/test_functional_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ def test_yaml_written_pipeline_parses_list_of_tasks_correctly(self):
with self.with_temporary_workspace_containing({'.rkd/makefile.yaml': makefile}):
out, exit_code = self.run_and_capture_output([':example'])

self.assertIn('Executing :sh -c echo "Bakunin" [part of :example]', out)
self.assertIn('Executing :sh -c echo "Kropotkin" [part of :example]', out)
self.assertIn('Executing :sh -c echo "Malatesta" [part of :example]', out)
self.assertIn('[1] Executing `:sh -c echo "Bakunin"` [part of :example]', out)
self.assertIn('[2] Executing `:sh -c echo "Kropotkin"` [part of :example]', out)
self.assertIn('[3] Executing `:sh -c echo "Malatesta"` [part of :example]', out)

def test_yaml_written_block_is_parsed_as_block(self):
makefile = dedent('''
Expand All @@ -325,8 +325,8 @@ def test_yaml_written_block_is_parsed_as_block(self):
with self.with_temporary_workspace_containing({'.rkd/makefile.yaml': makefile}):
out, exit_code = self.run_and_capture_output([':example'])

self.assertIn('Executing :sh -c echo "Rocker" [part of :example]', out)
self.assertIn('Executing :sh -c echo Kropotkin [part of :example]', out)
self.assertIn('[1] Executing `:sh -c echo "Rocker"` [part of :example]', out)
self.assertIn('[2] Executing `:sh -c echo Kropotkin` [part of :example]', out)

def test_pipeline_in_pipeline_retry_attribute_inheritance(self):
"""
Expand Down Expand Up @@ -375,23 +375,31 @@ def test_pipeline_in_pipeline_retry_attribute_inheritance(self):

self.assertEqual(
[
# todo: Fix: Conquest of Bread" and "Modern Science and Anarchism" should be as a part of :books
">> Executing :sh -c echo 'Rocker' [part of :example]",
'The task ":sh" [part of :example] succeed.',
">> Executing :sh -c echo 'Kropotkin' [part of :example]",
'The task ":sh" [part of :example] succeed.',
'>> Executing :sh -c echo "The Conquest of Bread"; exit 1 [part of :example]',
'>> Retrying :sh -c echo "The Conquest of Bread"; exit 1 [part of :example]',
'The task ":sh" [part of :example] ended with a failure',
'>> Task ":sh" rescue attempt started',
'>> Executing :sh -c exit 0',
'The task ":sh" succeed.',
'>> Executing :sh -c echo "Modern Science and Anarchism"; [part of :example]',
'The task ":sh" [part of :example] succeed.'
# before block
">> [1] Executing `:sh -c echo 'Rocker'` [part of :example]",
"The task `:sh -c echo 'Rocker'` [part of :example] succeed.",

# first task in block succeeds
">> [2] Executing `:sh -c echo 'Kropotkin'` [part of :example]",
"The task `:sh -c echo 'Kropotkin'` [part of :example] succeed.",

# task inside :books is failing, and is going to be retried 1 times, not 5 times
# (@retry importance = closest block decides if retry is defined in it)
'>> [3] Executing `:sh -c echo "The Conquest of Bread"; exit 1` [part of :example]',
'>> [3] Retrying `:sh -c echo "The Conquest of Bread"; exit 1` [part of :example]',

# rescue from inside :books is called
'The task `:sh -c echo "The Conquest of Bread"; exit 1` [part of :example] ended with a failure',
'>> [3] Task ":sh -c echo "The Conquest of Bread"; exit 1" rescue attempt started',
'>> [4] Executing `:sh -c exit 0`',
'The task `:sh -c exit 0` succeed.',
'The task `:sh -c echo "The Conquest of Bread"; exit 1` [part of :example] ended with a failure'
],
parsed_log
)

self.assertEqual(0, exit_code)

def test_pipeline_in_pipeline_parent_rescue_block_works(self):
"""
Single depth @rescue block test
Expand Down

0 comments on commit a24ca4a

Please sign in to comment.