Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
#66: Implement Pipelines in Python syntax - fix various errors in exi…
Browse files Browse the repository at this point in the history
…sting code made by external contributor
  • Loading branch information
blackandred committed Aug 26, 2021
1 parent 9ad93fd commit 54ea637
Show file tree
Hide file tree
Showing 13 changed files with 697 additions and 69 deletions.
10 changes: 10 additions & 0 deletions src/core/rkd/core/api/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,13 @@ def __init__(self, args: list = None, kwargs: dict = None):

self.args = args
self.kwargs = kwargs


class PipelinePartInterface(object):
"""
Partial element of a Pipeline - string that is being converted to GroupDeclaration
"""

@abstractmethod
def to_pipeline_part(self) -> List[str]:
pass
27 changes: 24 additions & 3 deletions src/core/rkd/core/api/inputoutput.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import re
import shutil
import sys
import os
import subprocess
Expand Down Expand Up @@ -334,10 +335,19 @@ def print_opt_line(self):

self.opt_outln('')

def print_separator(self):
"""Prints a text separator (optional output)
def print_separator(self, status: bool = None):
"""
self.opt_outln("\x1B[37m%s\x1B[0m" % '-----------------------------------')
Prints a text separator (optional output)
"""

color = '37m'

if status is True:
color = '92m'
elif status is False:
color = '91m'

self.opt_outln(f"\x1B[{color}%s\x1B[0m" % ("-" * get_terminal_width()))

#
# Statuses
Expand Down Expand Up @@ -677,3 +687,14 @@ def get_environment_copy() -> dict:
"""

return dict(deepcopy(os.environ))


# reused from PyTest
def get_terminal_width() -> int:
width, _ = shutil.get_terminal_size(fallback=(80, 24))

# The Windows get_terminal_size may be bogus, let's sanify a bit.
if width < 40:
width = 80

return width
144 changes: 132 additions & 12 deletions src/core/rkd/core/api/syntax.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
Classes used in a declaration syntax in makefile.py
"""
from dataclasses import dataclass
from types import FunctionType
from typing import List, Dict, Optional, Union
from copy import deepcopy
from uuid import uuid4

from .contract import TaskDeclarationInterface, ExtendableTaskInterface
from .contract import TaskDeclarationInterface, ExtendableTaskInterface, PipelinePartInterface
from .contract import GroupDeclarationInterface
from .contract import TaskInterface
from .inputoutput import get_environment_copy, ReadableStreamType
Expand Down Expand Up @@ -158,12 +158,14 @@ def with_user_overridden_env(self, env_list: list):
return copy

def with_connected_block(self, block: ArgumentBlock):
"""Immutable arguments setter. Produces new object each time
Block should be a REFERENCE to an object, not a copy
"""
Immutable arguments setter. Produces new object each time
Block should be a REFERENCE to an object, not a copy
"""

copy = self._clone()
copy._block = block
block.register_resolved_task(copy) # register a both side relation

return copy

Expand Down Expand Up @@ -282,7 +284,7 @@ def get_input(self) -> Optional[ReadableStreamType]:
return None

def __str__(self):
return 'TaskDeclaration<%s>' % self.get_task_to_execute().get_full_name()
return 'TaskDeclaration<%s>' % self.to_full_name()


class ExtendedTaskDeclaration(object):
Expand Down Expand Up @@ -391,8 +393,12 @@ def is_internal(self) -> bool:
return False


class TaskAliasDeclaration(object):
""" Allows to define a custom task name that triggers other tasks in proper order """
class Pipeline(object):
"""
Task Caller
Has a name like a Task, but itself does not do anything than calling other tasks in selected order
"""

_name: str
_arguments: List[str]
Expand All @@ -402,12 +408,14 @@ class TaskAliasDeclaration(object):
_workdir: str
_project_name: str

def __init__(self, name: str, to_execute: List[str], env: Dict[str, str] = None, description: str = ''):
def __init__(self, name: str, to_execute: List[Union[str, PipelinePartInterface]],
env: Dict[str, str] = None, description: str = ''):

if env is None:
env = {}

self._name = name
self._arguments = to_execute
self._arguments = self._resolve_pipeline_parts(to_execute)
self._env = merge_env(env)
self._user_defined_env = list(env.keys())
self._description = description
Expand All @@ -434,12 +442,12 @@ def get_user_overridden_envs(self) -> list:
def get_description(self) -> str:
return self._description

def _clone(self) -> 'TaskAliasDeclaration':
def _clone(self) -> 'Pipeline':
"""Clone securely the object. There fields shared across objects as references could be kept"""

return deepcopy(self)

def as_part_of_subproject(self, workdir: str, subproject_name: str) -> 'TaskAliasDeclaration':
def as_part_of_subproject(self, workdir: str, subproject_name: str) -> 'Pipeline':
copy = self._clone()

copy._workdir = merge_workdir(copy._workdir, workdir)
Expand All @@ -458,9 +466,121 @@ def project_name(self) -> str:
def is_part_of_subproject(self) -> bool:
return isinstance(self._project_name, str) and len(self._project_name) > 1

@staticmethod
def _resolve_pipeline_parts(parts: List[Union[str, PipelinePartInterface]]) -> List[str]:
resolved = []

for part in parts:
if isinstance(part, PipelinePartInterface):
resolved += part.to_pipeline_part()
else:
resolved += part

return resolved

def __str__(self) -> str:
return f'Pipeline<{self.get_name()}>'


class TaskAliasDeclaration(Pipeline):
"""
Deprecated: Name will be removed in RKD 6.0
"""


class PipelineTask(PipelinePartInterface):
"""
Represents a single task in a Pipeline
.. code:: python
from rkd.core.api.syntax import Pipeline
PIPELINES = [
Pipeline(
name=':build',
to_execute=[
Task(':server:build'),
Task(':client:build')
]
)
]
"""

task_args: List[str]

def __init__(self, *task_args):
self.task_args = task_args

def to_pipeline_part(self) -> List[str]:
return self.task_args


@dataclass
class PipelineBlock(PipelinePartInterface):
"""
Represents block of tasks
Example of generated block:
{@retry 3} :some-task {/@}
.. code:: python
from rkd.core.api.syntax import Pipeline, PipelineTask as Task, PipelineBlock as Block, TaskDeclaration
Pipeline(
name=':error-handling-example',
description=':notify should be invoked after "doing exit" task, and execution of a BLOCK should be interrupted',
to_execute=[
Task(':server:build'),
Block(error=':notify -c "echo \'Build failed\'"', retry=3, tasks=[
Task(':docs:build', '--test'),
Task(':sh', '-c', 'echo "doing exit"; exit 1'),
Task(':client:build')
]),
Task(':server:clear')
]
)
"""

tasks: List[PipelineTask]
retry: Optional[int] = None
retry_block: Optional[int] = None
error: Optional[str] = None
rescue: Optional[str] = None

def to_pipeline_part(self) -> List[str]:
partial = []
block_body = ['{']

if self.error:
block_body.append(f'@error {self.error} ')

if self.rescue:
block_body.append(f'@rescue {self.rescue} ')

if self.retry:
block_body.append(f'@retry {self.retry} ')

if self.retry_block:
block_body.append(f'@retry-block {self.retry_block} ')

block_body.append('} ')
partial.append(''.join(block_body))

for task in self.tasks:
partial += task.to_pipeline_part()

partial.append('{/@}')

return partial


def merge_env(env: Dict[str, str]):
"""Merge custom environment variables set per-task with system environment
"""
Merge custom environment variables set per-task with system environment
"""

merged_dict = deepcopy(env)
Expand Down
29 changes: 29 additions & 0 deletions src/core/rkd/core/api/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
"""

import os
import subprocess
import sys
from tempfile import TemporaryDirectory
from typing import Tuple, Dict, List, Union
from unittest import TestCase
from io import StringIO
Expand All @@ -25,6 +27,7 @@
from rkd.core.api.inputoutput import NullSystemIO
from rkd.core.api.inputoutput import BufferedSystemIO
from rkd.core.context import ApplicationContext
from rkd.process import switched_workdir


class OutputCapturingSafeTestCase(TestCase):
Expand Down Expand Up @@ -282,3 +285,29 @@ def execute_mocked_task_and_get_output(self, task: TaskInterface, args=None, env
raise

return ctx.io.get_value() + "\n" + str_io.getvalue() + "\nTASK_EXIT_RESULT=" + str(result)

@contextmanager
def with_temporary_workspace_containing(self, files: Dict[str, str]):
"""
Creates a temporary directory as a workspace
and fills up with files specified in "file" parameter
:param files: Dict of [filename: contents to write to a file]
:return:
"""

with TemporaryDirectory() as tempdir:
with switched_workdir(tempdir):

# create files from a predefined content
for filename, content in files.items():
directory = os.path.dirname(filename)

if directory:
subprocess.check_call(['mkdir', '-p', directory])

with open(filename, 'w') as f:
f.write(content)

# execute code
yield

0 comments on commit 54ea637

Please sign in to comment.