Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/test_toolbox_dsl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Unit tests for projects/core/dsl (task decorators, execute_tasks, failure/always/skip).
name: Toolbox DSL tests

on:
pull_request:
branches: [main]
push:
branches: [main]
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
pytest:
runs-on: ubuntu-latest
env:
PYTHONPATH: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install test dependencies
run: |
set -o errexit
python -m pip install --upgrade pip
python -m pip install pytest pyyaml jinja2

- name: Run projects/core/tests
run: |
set -o errexit
# Tree + docstrings (what is being tested), then execute with one line per test + result.
python -m pytest --collect-only -vv -o addopts='-ra --strict-markers --strict-config'
echo ""
python -m pytest -vv -o addopts='-ra --strict-markers --strict-config'
136 changes: 136 additions & 0 deletions docs/toolbox/dsl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Toolbox DSL

The FORGE toolbox uses a small Python **domain-specific language** (DSL) so each command is a standalone script: a **public entrypoint** (plain function with typed parameters), a **linear list of tasks** (`@task` functions), and **verbose logging** under an artifact directory for post-mortem review.

## Command entrypoint and CLI

The public API for orchestration (or humans) is a single function, for example `run(...)`, documented with an `Args:` section in its docstring.

- **From Python**: call that function; it typically ends with `return execute_tasks(locals())` so every `@task` defined in the same module runs in registration order.
- **From the shell**: use `projects.core.dsl.toolbox.create_toolbox_main(run)` so `argparse` options are generated from the function signature (`toolbox.run_toolbox_command`).

Tasks are **not** the entrypoint; they are internal steps registered at import time.

## Tasks (`@task`)

Each task is a function `(args, ctx) -> value` (or return `None`).

- **`args`**: read-only view of the command parameters plus `artifact_dir` (see `projects.core.dsl.context.ReadOnlyArgs`).
- **`ctx`**: mutable per-task context that is merged back into a shared namespace after each task so later steps can read values previous tasks set on `ctx`.

Registration order in the file is execution order (see `ScriptManager`).

```python
@task
def ensure_project(args, ctx):
"""Describe the step; shown in logs."""
ctx.project_ready = True
return "ok"
```

## Conditional tasks (`@when`)

`@when` takes a **zero-argument** callable (usually `lambda:` …) evaluated when the task is reached. If it returns a falsy value, the task is skipped (logged as skipped).

**Decorator order:** write `@when(...)` on the line **above** `@task` (same as `@retry`). Python applies the bottom decorator first, so `@task` registers the step, then `@when` attaches the condition and updates the script registry entry so `execute_tasks` sees it.

```python
@when(lambda: some_other_task.status.return_value is True)
@task
def follow_up(args, ctx):
...
```

Because the condition is called with **no arguments**, anything dynamic must come from a **closure**, module-level state, or another task’s `.status.return_value` (see `TaskResult` in `script_manager.py`).

## Retries (`@retry`)

**Order:** `@retry(...)` above `@task` above `def` (same pattern as waiting for OpenShift resources).

By default, retries apply when the task **returns a falsy** value (`False`, `None`, `[]`, …). Each attempt runs the full `@task` wrapper (logging, result capture). Delays use `time.sleep`.

Parameters:

| Parameter | Meaning |
|-----------|---------|
| `attempts` | Maximum attempts |
| `delay` | Initial sleep in seconds before the next attempt |
| `backoff` | Multiplier for the delay after each retry |
| `retry_on_exceptions` | If `True`, **also** retry when the task raises (never retries on `KeyboardInterrupt`) |

```python
@retry(attempts=60, delay=30, backoff=1.0)
@task
def wait_until_ready(args, ctx):
...
return False # try again after delay
```

```python
@retry(attempts=5, delay=2, backoff=1.5, retry_on_exceptions=True)
@task
def call_flaky_api(args, ctx):
...
```

If all attempts fail, `RetryFailure` is raised (wrapped in `TaskExecutionError` during `execute_tasks`, with the underlying `RetryFailure` available as `TaskExecutionError.__cause__` when that applies).

## Always tasks (`@always`)

Mark cleanup or artifact steps that must run **after a failure** in the main sequence. `@always` may appear **above or below** `@task` on the same function (see `always()` in `task.py`).

If a normal task raises, remaining non-`@always` tasks are **skipped** (each pending non-always task is logged as skipped; its body is not run). Pending `@always` tasks still run in file order. The original error is re-raised after always-tasks finish (unless an always-task fails and becomes the primary error when there was none).

Place `@always` tasks **after** the main pipeline so they behave as teardown (see toolbox scripts under `projects/*/toolbox/`).

## Execution driver (`execute_tasks`)

`execute_tasks(locals())` (or a filtered dict of parameters):

- Opens a nested artifact directory (`env.NextArtifactDir`).
- Writes metadata (`_meta/metadata.yaml`, `_meta/restart.sh`) and `task.log`.
- Runs tasks from the **calling file** only (`ScriptManager` path must match `Path(__file__).relative_to(FORGE_HOME)` vs `os.path.relpath` at task definition — run commands from the repository root as the toolbox does).

Interrupts (`KeyboardInterrupt`, `SignalError`) stop execution and still emit completion banners where implemented (not covered by `test_dsl_toolbox.py`; see `runtime.py`).

### Trace and artifacts (post-mortem)

Each run is intended to be reviewable without re-executing the command:

| Output | Role |
|--------|------|
| `task.log` | Full DSL log stream for the run |
| `_meta/metadata.yaml` | Timestamp, command file, artifact dir, serialized arguments |
| `_meta/restart.sh` | Replay helper with the same CLI-style flags |
| Console / DSL logger | Step headers, skip lines (`==> SKIPPING TASK: …` when pending steps are skipped after a failure), retry banners |

Keep secrets out of entrypoint parameters where possible so they appear safely in metadata (follow project norms for redaction if you add any).

### Standalone parameters (entrypoint contract)

Declare orchestration and operator inputs on the **public entrypoint** (typed parameters and an `Args:` section in the docstring). Prefer those parameters (and values derived in tasks) over ad hoc reads of undeclared environment variables, so the command stays **standalone** and reviewable—except where FORGE already documents global conventions (for example `FORGE_HOME`, artifact layout).

## Related modules

- `projects.core.dsl.task` — `@task`, `@when`, `@retry`, `@always`, `RetryFailure`
- `projects.core.dsl.runtime` — `execute_tasks`, `TaskExecutionError`
- `projects.core.dsl.toolbox` — CLI wrapper
- `projects.core.dsl.shell`, `template`, … — helpers used inside tasks

## Tests

`projects/core/tests/test_dsl_toolbox.py` exercises the behaviors below. Run: `python -m pytest projects/core/tests/test_dsl_toolbox.py -v`.

| Area | What is asserted |
|------|------------------|
| Task order | `first` / `second` run in **source definition order** when all succeed. |
| Failure → skip | After a task raises, **later non-`@always`** tasks do not run: **`task.log`** has `SKIPPING TASK: …` and the “not @always” line; unique return markers for pending steps **do not** appear in the log. |
| Failure → `@always` | After a task raises, a **later** `@always` task **still runs** (event order); **`task.log`** contains that task’s return value; the failure re-raised is **`TaskExecutionError`** with the original **`RuntimeError`** as **`__cause__`**. |
| `@when` | If the predicate is falsy, the task body does not run. |
| `@retry` (falsy → success) | Falsy return values are retried until a truthy result. |
| `@retry` (falsy exhausted) | If every attempt returns falsy, **`RetryFailure`** is raised and wrapped so **`TaskExecutionError.__cause__`** is **`RetryFailure`**. |
| `@retry` (exceptions) | With `retry_on_exceptions=True`, exceptions are retried until success; if every attempt raises, **`TaskExecutionError.__cause__`** is **`RetryFailure`**. |
| Decorator stack | `@retry` / `@when` **without** `@task` raise **`TypeError` at definition time** with the “Put `@task` BELOW …” message. |
| Success return | `execute_tasks` returns **`shared_context`** with task attributes and **`artifact_dir`** set. |

Not in that file: interrupt handling (`KeyboardInterrupt` / `SignalError`), and CLI wiring (`create_toolbox_main` / `run_toolbox_command`)—those are documented above but not exercised by these unit tests.
18 changes: 1 addition & 17 deletions projects/cluster/toolbox/rebuild_image/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,7 @@
toolbox,
)


# Set up clean logger without prefix
def setup_clean_logger(name: str):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

if not logger.handlers:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(console_handler)

logger.propagate = False
return logger


logger = setup_clean_logger("TOOLBOX")
logger = logging.getLogger("DSL")


def _capture_all_container_logs(pod_name: str, namespace: str, artifact_dir):
Expand Down
44 changes: 18 additions & 26 deletions projects/core/ci_entrypoint/github/pr_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,14 @@
from pathlib import Path
from typing import Any

logger = logging.getLogger(__name__)

REQUIRED_AUTHOR_ASSOCIATION = "CONTRIBUTOR"

DEFAULT_REPO_OWNER = "openshift-psap"
DEFAULT_REPO_NAME = "forge"


def setup_logging():
"""Set up logging configuration."""
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s: %(message)s",
handlers=[logging.StreamHandler(sys.stderr)],
)


def get_directive_handlers() -> dict[str, Callable[[str], dict[str, Any]]]:
"""
Get a mapping of directive prefixes to their handler functions.
Expand Down Expand Up @@ -89,7 +82,7 @@ def handle_test_directive(line: str) -> dict[str, Any]:
}
)

logging.info(f"Jump CI configuration: target_project={target_project}, args={args}")
logger.info(f"Jump CI configuration: target_project={target_project}, args={args}")
else:
# Build result with test info and PR positional arguments
result.update(
Expand Down Expand Up @@ -313,7 +306,7 @@ def parse_directives(text: str) -> tuple[dict[str, Any], list[str]]:
raise ValueError(f"Error parsing directive '{line}': {e}") from e
else:
# Unknown directive - log warning but still track it
logging.warning(f"Unknown directive ignored: {line}")
logger.warning(f"Unknown directive ignored: {line}")
found_directives.append(f"# UNKNOWN: {line}")

if not has_test:
Expand All @@ -339,12 +332,12 @@ def fetch_url(url: str, cache_file: Path | None = None) -> dict[str, Any]:

# Check cache first
if cache_file and cache_file.exists():
logging.info(f"Using cached file: {cache_file}")
logger.info(f"Using cached file: {cache_file}")
with open(cache_file) as f:
return json.load(f)

# Fetch from URL
logging.info(f"Fetching from URL: {url}")
logger.info(f"Fetching from URL: {url}")
try:
with urllib.request.urlopen(url) as response:
data = json.load(response)
Expand Down Expand Up @@ -403,8 +396,8 @@ def parse_pr_arguments(
f"https://api.github.com/repos/{repo_owner}/{repo_name}/issues/{pull_number}/comments"
)

logging.info(f"# PR URL: {pr_url}")
logging.info(f"# PR comments URL: {pr_comments_url}")
logger.info(f"# PR URL: {pr_url}")
logger.info(f"# PR comments URL: {pr_comments_url}")

# Set up caching for OpenShift CI
pr_cache_file = None
Expand Down Expand Up @@ -434,7 +427,7 @@ def parse_pr_arguments(

test_anchor = f"/test {test_name}"

logging.info(
logger.info(
f"# Looking for comments from author '{pr_author}' or '{REQUIRED_AUTHOR_ASSOCIATION}' containing '{test_anchor}'"
)

Expand Down Expand Up @@ -469,15 +462,15 @@ def main():
"""
Main function for testing the PR arguments parser.

Reads environment variables and logging.infos the parsed configuration to stdout.
Reads environment variables and prints the parsed configuration to stdout.
Special argument '--help-directives' shows supported directives.
"""
# Handle special help argument
if len(sys.argv) > 1 and sys.argv[1] == "--help-directives":
logging.info("Supported GitHub PR directives:")
print("Supported GitHub PR directives:")
for directive, description in get_supported_directives().items():
logging.info(f" {directive}: {description}")
logging.info(f"\nSupported prefixes: {', '.join(get_directive_prefixes())}")
print(f" {directive}: {description}")
print(f"\nSupported prefixes: {', '.join(get_directive_prefixes())}")
return

try:
Expand All @@ -488,21 +481,21 @@ def main():
pull_number_str = os.environ.get("PULL_NUMBER") or 1

if not repo_owner:
logging.error("REPO_OWNER environment variable not defined")
logger.error("REPO_OWNER environment variable not defined")
sys.exit(1)

if not repo_name:
logging.error("REPO_NAME environment variable not defined")
logger.error("REPO_NAME environment variable not defined")
sys.exit(1)

if not pull_number_str:
logging.error("PULL_NUMBER environment variable not defined")
logger.error("PULL_NUMBER environment variable not defined")
sys.exit(1)

try:
pull_number = int(pull_number_str)
except ValueError:
logging.error(f"PULL_NUMBER must be an integer, got: {pull_number_str}")
logger.error(f"PULL_NUMBER must be an integer, got: {pull_number_str}")
sys.exit(1)

# Optional parameters
Expand All @@ -529,10 +522,9 @@ def main():
print(f"{key}: {value}")

except Exception as e:
logging.error(f"ERROR: {e}")
logger.exception(f"{e.__class__.__name__}: {e}")
sys.exit(1)
Comment thread
kpouget marked this conversation as resolved.


if __name__ == "__main__":
setup_logging()
main()
10 changes: 5 additions & 5 deletions projects/core/ci_entrypoint/prepare_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ def setup_dual_output():
artifact_dir = os.environ.get("ARTIFACT_DIR")

if not artifact_dir:
logging.warning("ARTIFACT_DIR not defined, not saving $ARTIFACT_DIR/run.log")
logger.warning("ARTIFACT_DIR not defined, not saving $ARTIFACT_DIR/run.log")
return None

log_file_path = Path(artifact_dir) / "run.log"

try:
log_file_path.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
logging.warning(f"Failed to create directory: {e}")
logger.warning(f"Failed to create directory: {e}")
return None

if log_file_path.exists():
Expand Down Expand Up @@ -135,11 +135,11 @@ def communicate():
log_file.flush()
except (OSError, ValueError) as e:
# Pipe was closed, exit gracefully
logging.exception(f"Dual output thread file operations failed: {e}")
logger.exception(f"Dual output thread file operations failed: {e}")
break
# If no data, loop continues and checks stop_event
except Exception as e:
logging.exception(f"Dual output thread failed: {e}")
logger.exception(f"Dual output thread failed: {e}")
pass # Exit gracefully on any error

# 4. Start a background thread to act as the 'tee' process
Expand Down Expand Up @@ -340,7 +340,7 @@ def ci_banner(project: str, operation: str, args: list[str]):
if result.returncode == 0:
lines = result.stdout.split("\n")[:10] # head 10
for line in lines:
logging.info(line)
logger.info(line)
else:
logger.warning("Could not access git history (main..) ...")
except Exception as e:
Expand Down
Loading
Loading