diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d8d8d1..e317a63 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: hooks: - id: black args: ["--line-length=120"] - language_version: python3.9 + language_version: python3.10 - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.3.0 hooks: diff --git a/taskbadger/cli.py b/taskbadger/cli.py index 1be7248..42be911 100644 --- a/taskbadger/cli.py +++ b/taskbadger/cli.py @@ -1,6 +1,3 @@ -import subprocess -import time -from datetime import datetime from typing import Optional, Tuple import typer @@ -10,6 +7,7 @@ from taskbadger import Action, StatusEnum, Task, __version__, integrations from taskbadger.config import get_config, write_config from taskbadger.exceptions import ConfigurationError +from taskbadger.process import ProcessRunner app = typer.Typer( rich_markup_mode="rich", @@ -49,6 +47,7 @@ def run( show_default=False, help="Action definition e.g. 'success,error email to:me@email.com'", ), + capture_output: bool = typer.Option(False, help="Capture stdout and stderr."), ): """Execute a command using the CLI and create a Task to track its outcome. @@ -81,33 +80,44 @@ def run( else: print(f"Task created: {task.public_url}") env = {"TASKBADGER_TASK_ID": task.id} if task else None - last_update = datetime.utcnow() try: - process = subprocess.Popen(ctx.args, env=env, shell=True) - while process.poll() is None: - try: - time.sleep(0.1) - if task and _should_update_task(last_update, update_frequency): - last_update = datetime.utcnow() - task.ping() - except Exception as e: - err_console.print(f"Error updating task status: {e}") + process = ProcessRunner(ctx.args, env, capture_output=capture_output, update_frequency=update_frequency) + for output in process.run(): + _update_task(task, **(output or {})) except Exception as e: - task and task.error(data={"exception": str(e)}) + _update_task(task, exception=str(e)) raise typer.Exit(1) if task: if process.returncode == 0: task.success(value=100) else: - task.error(data={"return_code": process.returncode}) + _update_task(task, status=StatusEnum.ERROR, return_code=process.returncode) if process.returncode != 0: raise typer.Exit(process.returncode) -def _should_update_task(last_update: datetime, update_frequency_seconds): - return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds +def _update_task(task, status=None, **data_kwargs): + """Update the task and merge the data""" + if not task: + return + + task_data = task.data or {} + for key, value in data_kwargs.items(): + if key in ("stdout", "stderr"): + if key in task_data and value: + task_data[key] += value + elif value: + task_data[key] = value + else: + task_data[key] = value + + print(task_data) + try: + task.update(status=status, data=task_data or None) + except Exception as e: + err_console.print(f"Error updating task status: {e}") @app.command() diff --git a/taskbadger/process.py b/taskbadger/process.py new file mode 100644 index 0000000..43c8115 --- /dev/null +++ b/taskbadger/process.py @@ -0,0 +1,78 @@ +import subprocess +import threading +import time +from datetime import datetime + + +class ProcessRunner: + def __init__(self, process_args, env, capture_output: bool, update_frequency: int = 5): + self.process_args = process_args + self.env = env + self.capture_output = capture_output + self.update_frequency = update_frequency + self.returncode = None + + def run(self): + last_update = datetime.utcnow() + + kwargs = {} + if self.capture_output: + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.PIPE + + process = subprocess.Popen(self.process_args, env=self.env, shell=True, **kwargs) + if self.capture_output: + stdout = Reader(process.stdout).start() + stderr = Reader(process.stderr).start() + + while process.poll() is None: + time.sleep(0.1) + if _should_update(last_update, self.update_frequency): + last_update = datetime.utcnow() + if self.capture_output: + yield {"stdout": stdout.read(), "stderr": stderr.read()} + else: + yield + + if self.capture_output and (stdout or stderr): + yield {"stdout": stdout.read(), "stderr": stderr.read()} + + self.returncode = process.returncode + + +class Reader: + def __init__(self, source): + self.source = source + self.data = [] + self._lock = threading.Lock() + + def start(self): + self._thread = threading.Thread(name="reader-thread", target=self._reader, daemon=True) + self._thread.start() + return self + + def _reader(self): + """Read data from source until EOF, adding it to collector.""" + while True: + data = self.source.read1().decode() + self._lock.acquire() + self.data.append(data) + self._lock.release() + if not data: + break + return + + def read(self): + """Read data written by the process to its standard output.""" + self._lock.acquire() + outdata = "".join(self.data) + del self.data[:] + self._lock.release() + return outdata + + def __bool__(self): + return bool(self.data) + + +def _should_update(last_update: datetime, update_frequency_seconds): + return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds diff --git a/tests/test_cli_run.py b/tests/test_cli_run.py index fc957f4..e44011d 100644 --- a/tests/test_cli_run.py +++ b/tests/test_cli_run.py @@ -7,7 +7,7 @@ from taskbadger.cli import app from taskbadger.internal.models import PatchedTaskRequest, PatchedTaskRequestData, StatusEnum, TaskRequest -from taskbadger.internal.types import Response +from taskbadger.internal.types import UNSET, Response from taskbadger.sdk import Badger from tests.utils import task_for_test @@ -36,10 +36,42 @@ def test_cli_long_run(): def _should_update_task(last_update, update_frequency_seconds): return True - with mock.patch("taskbadger.cli._should_update_task", new=_should_update_task): + with mock.patch("taskbadger.process._should_update", new=_should_update_task): _test_cli_run(["echo test; sleep 0.11"], 0, args=["task_name"], update_call_count=3) +def test_cli_capture_output(): + update_patch = _test_cli_run(["echo test"], 0, args=["task_name", "--capture-output"], update_call_count=2) + + body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n"})) + update_patch.assert_any_call( + client=Badger.current.settings.client, + organization_slug="org", + project_slug="project", + id="test_id", + json_body=body, + ) + + +def test_cli_capture_output_append(): + def _should_update_task(last_update, update_frequency_seconds): + return True + + with mock.patch("taskbadger.process._should_update", new=_should_update_task): + update_patch = _test_cli_run( + ["echo test; sleep 0.11; echo 123"], 0, args=["task_name", "--capture-output"], update_call_count=3 + ) + + body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n123\n"})) + update_patch.assert_any_call( + client=Badger.current.settings.client, + organization_slug="org", + project_slug="project", + id="test_id", + json_body=body, + ) + + def test_cli_run_error(): _test_cli_run(["not-a-command"], 127, args=["task_name"]) @@ -63,14 +95,22 @@ def test_cli_run_webhook(): def _test_cli_run(command, return_code, args=None, action=None, update_call_count=1): + update_mock = mock.MagicMock() + + def _update(*args, **kwargs): + update_mock(*args, **kwargs) + + # handle updating task data + data = kwargs["json_body"].data + return Response(HTTPStatus.OK, b"", {}, task_for_test(data=data.additional_properties if data else None)) + with ( mock.patch("taskbadger.sdk.task_create.sync_detailed") as create, - mock.patch("taskbadger.sdk.task_partial_update.sync_detailed") as update, + mock.patch("taskbadger.sdk.task_partial_update.sync_detailed", new=_update) as update, ): task = task_for_test() create.return_value = Response(HTTPStatus.OK, b"", {}, task) - update.return_value = Response(HTTPStatus.OK, b"", {}, task) args = args or [] result = runner.invoke(app, ["run"] + args + ["--"] + command, catch_exceptions=False) print(result.output) @@ -91,7 +131,8 @@ def _test_cli_run(command, return_code, args=None, action=None, update_call_coun status=StatusEnum.ERROR, data=PatchedTaskRequestData.from_dict({"return_code": return_code}) ) - assert update.call_count == update_call_count - update.assert_called_with( + assert update_mock.call_count == update_call_count + update_mock.assert_called_with( client=settings.client, organization_slug="org", project_slug="project", id="test_id", json_body=body ) + return update_mock