Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
hooks:
- id: black
args: ["--line-length=120"]
language_version: python3.9
language_version: python3.10
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
Expand Down
44 changes: 27 additions & 17 deletions taskbadger/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import subprocess
import time
from datetime import datetime
from typing import Optional, Tuple

import typer
Expand All @@ -10,6 +7,7 @@
from taskbadger import Action, StatusEnum, Task, __version__, integrations
from taskbadger.config import get_config, write_config
from taskbadger.exceptions import ConfigurationError
from taskbadger.process import ProcessRunner

app = typer.Typer(
rich_markup_mode="rich",
Expand Down Expand Up @@ -49,6 +47,7 @@ def run(
show_default=False,
help="Action definition e.g. 'success,error email to:me@email.com'",
),
capture_output: bool = typer.Option(False, help="Capture stdout and stderr."),
):
"""Execute a command using the CLI and create a Task to track its outcome.

Expand Down Expand Up @@ -81,33 +80,44 @@ def run(
else:
print(f"Task created: {task.public_url}")
env = {"TASKBADGER_TASK_ID": task.id} if task else None
last_update = datetime.utcnow()
try:
process = subprocess.Popen(ctx.args, env=env, shell=True)
while process.poll() is None:
try:
time.sleep(0.1)
if task and _should_update_task(last_update, update_frequency):
last_update = datetime.utcnow()
task.ping()
except Exception as e:
err_console.print(f"Error updating task status: {e}")
process = ProcessRunner(ctx.args, env, capture_output=capture_output, update_frequency=update_frequency)
for output in process.run():
_update_task(task, **(output or {}))
except Exception as e:
task and task.error(data={"exception": str(e)})
_update_task(task, exception=str(e))
raise typer.Exit(1)

if task:
if process.returncode == 0:
task.success(value=100)
else:
task.error(data={"return_code": process.returncode})
_update_task(task, status=StatusEnum.ERROR, return_code=process.returncode)

if process.returncode != 0:
raise typer.Exit(process.returncode)


def _should_update_task(last_update: datetime, update_frequency_seconds):
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds
def _update_task(task, status=None, **data_kwargs):
"""Update the task and merge the data"""
if not task:
return

task_data = task.data or {}
for key, value in data_kwargs.items():
if key in ("stdout", "stderr"):
if key in task_data and value:
task_data[key] += value
elif value:
task_data[key] = value
else:
task_data[key] = value

print(task_data)
try:
task.update(status=status, data=task_data or None)
except Exception as e:
err_console.print(f"Error updating task status: {e}")


@app.command()
Expand Down
78 changes: 78 additions & 0 deletions taskbadger/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import subprocess
import threading
import time
from datetime import datetime


class ProcessRunner:
def __init__(self, process_args, env, capture_output: bool, update_frequency: int = 5):
self.process_args = process_args
self.env = env
self.capture_output = capture_output
self.update_frequency = update_frequency
self.returncode = None

def run(self):
last_update = datetime.utcnow()

kwargs = {}
if self.capture_output:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE

process = subprocess.Popen(self.process_args, env=self.env, shell=True, **kwargs)
if self.capture_output:
stdout = Reader(process.stdout).start()
stderr = Reader(process.stderr).start()

while process.poll() is None:
time.sleep(0.1)
if _should_update(last_update, self.update_frequency):
last_update = datetime.utcnow()
if self.capture_output:
yield {"stdout": stdout.read(), "stderr": stderr.read()}
else:
yield

if self.capture_output and (stdout or stderr):
yield {"stdout": stdout.read(), "stderr": stderr.read()}

self.returncode = process.returncode


class Reader:
def __init__(self, source):
self.source = source
self.data = []
self._lock = threading.Lock()

def start(self):
self._thread = threading.Thread(name="reader-thread", target=self._reader, daemon=True)
self._thread.start()
return self

def _reader(self):
"""Read data from source until EOF, adding it to collector."""
while True:
data = self.source.read1().decode()
self._lock.acquire()
self.data.append(data)
self._lock.release()
if not data:
break
return

def read(self):
"""Read data written by the process to its standard output."""
self._lock.acquire()
outdata = "".join(self.data)
del self.data[:]
self._lock.release()
return outdata

def __bool__(self):
return bool(self.data)


def _should_update(last_update: datetime, update_frequency_seconds):
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds
53 changes: 47 additions & 6 deletions tests/test_cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from taskbadger.cli import app
from taskbadger.internal.models import PatchedTaskRequest, PatchedTaskRequestData, StatusEnum, TaskRequest
from taskbadger.internal.types import Response
from taskbadger.internal.types import UNSET, Response
from taskbadger.sdk import Badger
from tests.utils import task_for_test

Expand Down Expand Up @@ -36,10 +36,42 @@ def test_cli_long_run():
def _should_update_task(last_update, update_frequency_seconds):
return True

with mock.patch("taskbadger.cli._should_update_task", new=_should_update_task):
with mock.patch("taskbadger.process._should_update", new=_should_update_task):
_test_cli_run(["echo test; sleep 0.11"], 0, args=["task_name"], update_call_count=3)


def test_cli_capture_output():
update_patch = _test_cli_run(["echo test"], 0, args=["task_name", "--capture-output"], update_call_count=2)

body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n"}))
update_patch.assert_any_call(
client=Badger.current.settings.client,
organization_slug="org",
project_slug="project",
id="test_id",
json_body=body,
)


def test_cli_capture_output_append():
def _should_update_task(last_update, update_frequency_seconds):
return True

with mock.patch("taskbadger.process._should_update", new=_should_update_task):
update_patch = _test_cli_run(
["echo test; sleep 0.11; echo 123"], 0, args=["task_name", "--capture-output"], update_call_count=3
)

body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n123\n"}))
update_patch.assert_any_call(
client=Badger.current.settings.client,
organization_slug="org",
project_slug="project",
id="test_id",
json_body=body,
)


def test_cli_run_error():
_test_cli_run(["not-a-command"], 127, args=["task_name"])

Expand All @@ -63,14 +95,22 @@ def test_cli_run_webhook():


def _test_cli_run(command, return_code, args=None, action=None, update_call_count=1):
update_mock = mock.MagicMock()

def _update(*args, **kwargs):
update_mock(*args, **kwargs)

# handle updating task data
data = kwargs["json_body"].data
return Response(HTTPStatus.OK, b"", {}, task_for_test(data=data.additional_properties if data else None))

with (
mock.patch("taskbadger.sdk.task_create.sync_detailed") as create,
mock.patch("taskbadger.sdk.task_partial_update.sync_detailed") as update,
mock.patch("taskbadger.sdk.task_partial_update.sync_detailed", new=_update) as update,
):
task = task_for_test()
create.return_value = Response(HTTPStatus.OK, b"", {}, task)

update.return_value = Response(HTTPStatus.OK, b"", {}, task)
args = args or []
result = runner.invoke(app, ["run"] + args + ["--"] + command, catch_exceptions=False)
print(result.output)
Expand All @@ -91,7 +131,8 @@ def _test_cli_run(command, return_code, args=None, action=None, update_call_coun
status=StatusEnum.ERROR, data=PatchedTaskRequestData.from_dict({"return_code": return_code})
)

assert update.call_count == update_call_count
update.assert_called_with(
assert update_mock.call_count == update_call_count
update_mock.assert_called_with(
client=settings.client, organization_slug="org", project_slug="project", id="test_id", json_body=body
)
return update_mock