Skip to content

Commit fa3a50c

Browse files
authored
Merge pull request #4 from taskbadger/sk/capture-output
capture process output
2 parents 0b93c99 + 67c010f commit fa3a50c

File tree

4 files changed

+153
-24
lines changed

4 files changed

+153
-24
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repos:
1111
hooks:
1212
- id: black
1313
args: ["--line-length=120"]
14-
language_version: python3.9
14+
language_version: python3.10
1515
- repo: https://github.com/pre-commit/pre-commit-hooks
1616
rev: v4.4.0
1717
hooks:

taskbadger/cli.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import subprocess
2-
import time
3-
from datetime import datetime
41
from typing import Optional, Tuple
52

63
import typer
@@ -10,6 +7,7 @@
107
from taskbadger import Action, StatusEnum, Task, __version__, integrations
118
from taskbadger.config import get_config, write_config
129
from taskbadger.exceptions import ConfigurationError
10+
from taskbadger.process import ProcessRunner
1311

1412
app = typer.Typer(
1513
rich_markup_mode="rich",
@@ -49,6 +47,7 @@ def run(
4947
show_default=False,
5048
help="Action definition e.g. 'success,error email to:me@email.com'",
5149
),
50+
capture_output: bool = typer.Option(False, help="Capture stdout and stderr."),
5251
):
5352
"""Execute a command using the CLI and create a Task to track its outcome.
5453
@@ -81,33 +80,44 @@ def run(
8180
else:
8281
print(f"Task created: {task.public_url}")
8382
env = {"TASKBADGER_TASK_ID": task.id} if task else None
84-
last_update = datetime.utcnow()
8583
try:
86-
process = subprocess.Popen(ctx.args, env=env, shell=True)
87-
while process.poll() is None:
88-
try:
89-
time.sleep(0.1)
90-
if task and _should_update_task(last_update, update_frequency):
91-
last_update = datetime.utcnow()
92-
task.ping()
93-
except Exception as e:
94-
err_console.print(f"Error updating task status: {e}")
84+
process = ProcessRunner(ctx.args, env, capture_output=capture_output, update_frequency=update_frequency)
85+
for output in process.run():
86+
_update_task(task, **(output or {}))
9587
except Exception as e:
96-
task and task.error(data={"exception": str(e)})
88+
_update_task(task, exception=str(e))
9789
raise typer.Exit(1)
9890

9991
if task:
10092
if process.returncode == 0:
10193
task.success(value=100)
10294
else:
103-
task.error(data={"return_code": process.returncode})
95+
_update_task(task, status=StatusEnum.ERROR, return_code=process.returncode)
10496

10597
if process.returncode != 0:
10698
raise typer.Exit(process.returncode)
10799

108100

109-
def _should_update_task(last_update: datetime, update_frequency_seconds):
110-
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds
101+
def _update_task(task, status=None, **data_kwargs):
102+
"""Update the task and merge the data"""
103+
if not task:
104+
return
105+
106+
task_data = task.data or {}
107+
for key, value in data_kwargs.items():
108+
if key in ("stdout", "stderr"):
109+
if key in task_data and value:
110+
task_data[key] += value
111+
elif value:
112+
task_data[key] = value
113+
else:
114+
task_data[key] = value
115+
116+
print(task_data)
117+
try:
118+
task.update(status=status, data=task_data or None)
119+
except Exception as e:
120+
err_console.print(f"Error updating task status: {e}")
111121

112122

113123
@app.command()

taskbadger/process.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import subprocess
2+
import threading
3+
import time
4+
from datetime import datetime
5+
6+
7+
class ProcessRunner:
8+
def __init__(self, process_args, env, capture_output: bool, update_frequency: int = 5):
9+
self.process_args = process_args
10+
self.env = env
11+
self.capture_output = capture_output
12+
self.update_frequency = update_frequency
13+
self.returncode = None
14+
15+
def run(self):
16+
last_update = datetime.utcnow()
17+
18+
kwargs = {}
19+
if self.capture_output:
20+
kwargs["stdout"] = subprocess.PIPE
21+
kwargs["stderr"] = subprocess.PIPE
22+
23+
process = subprocess.Popen(self.process_args, env=self.env, shell=True, **kwargs)
24+
if self.capture_output:
25+
stdout = Reader(process.stdout).start()
26+
stderr = Reader(process.stderr).start()
27+
28+
while process.poll() is None:
29+
time.sleep(0.1)
30+
if _should_update(last_update, self.update_frequency):
31+
last_update = datetime.utcnow()
32+
if self.capture_output:
33+
yield {"stdout": stdout.read(), "stderr": stderr.read()}
34+
else:
35+
yield
36+
37+
if self.capture_output and (stdout or stderr):
38+
yield {"stdout": stdout.read(), "stderr": stderr.read()}
39+
40+
self.returncode = process.returncode
41+
42+
43+
class Reader:
44+
def __init__(self, source):
45+
self.source = source
46+
self.data = []
47+
self._lock = threading.Lock()
48+
49+
def start(self):
50+
self._thread = threading.Thread(name="reader-thread", target=self._reader, daemon=True)
51+
self._thread.start()
52+
return self
53+
54+
def _reader(self):
55+
"""Read data from source until EOF, adding it to collector."""
56+
while True:
57+
data = self.source.read1().decode()
58+
self._lock.acquire()
59+
self.data.append(data)
60+
self._lock.release()
61+
if not data:
62+
break
63+
return
64+
65+
def read(self):
66+
"""Read data written by the process to its standard output."""
67+
self._lock.acquire()
68+
outdata = "".join(self.data)
69+
del self.data[:]
70+
self._lock.release()
71+
return outdata
72+
73+
def __bool__(self):
74+
return bool(self.data)
75+
76+
77+
def _should_update(last_update: datetime, update_frequency_seconds):
78+
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds

tests/test_cli_run.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from taskbadger.cli import app
99
from taskbadger.internal.models import PatchedTaskRequest, PatchedTaskRequestData, StatusEnum, TaskRequest
10-
from taskbadger.internal.types import Response
10+
from taskbadger.internal.types import UNSET, Response
1111
from taskbadger.sdk import Badger
1212
from tests.utils import task_for_test
1313

@@ -36,10 +36,42 @@ def test_cli_long_run():
3636
def _should_update_task(last_update, update_frequency_seconds):
3737
return True
3838

39-
with mock.patch("taskbadger.cli._should_update_task", new=_should_update_task):
39+
with mock.patch("taskbadger.process._should_update", new=_should_update_task):
4040
_test_cli_run(["echo test; sleep 0.11"], 0, args=["task_name"], update_call_count=3)
4141

4242

43+
def test_cli_capture_output():
44+
update_patch = _test_cli_run(["echo test"], 0, args=["task_name", "--capture-output"], update_call_count=2)
45+
46+
body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n"}))
47+
update_patch.assert_any_call(
48+
client=Badger.current.settings.client,
49+
organization_slug="org",
50+
project_slug="project",
51+
id="test_id",
52+
json_body=body,
53+
)
54+
55+
56+
def test_cli_capture_output_append():
57+
def _should_update_task(last_update, update_frequency_seconds):
58+
return True
59+
60+
with mock.patch("taskbadger.process._should_update", new=_should_update_task):
61+
update_patch = _test_cli_run(
62+
["echo test; sleep 0.11; echo 123"], 0, args=["task_name", "--capture-output"], update_call_count=3
63+
)
64+
65+
body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n123\n"}))
66+
update_patch.assert_any_call(
67+
client=Badger.current.settings.client,
68+
organization_slug="org",
69+
project_slug="project",
70+
id="test_id",
71+
json_body=body,
72+
)
73+
74+
4375
def test_cli_run_error():
4476
_test_cli_run(["not-a-command"], 127, args=["task_name"])
4577

@@ -63,14 +95,22 @@ def test_cli_run_webhook():
6395

6496

6597
def _test_cli_run(command, return_code, args=None, action=None, update_call_count=1):
98+
update_mock = mock.MagicMock()
99+
100+
def _update(*args, **kwargs):
101+
update_mock(*args, **kwargs)
102+
103+
# handle updating task data
104+
data = kwargs["json_body"].data
105+
return Response(HTTPStatus.OK, b"", {}, task_for_test(data=data.additional_properties if data else None))
106+
66107
with (
67108
mock.patch("taskbadger.sdk.task_create.sync_detailed") as create,
68-
mock.patch("taskbadger.sdk.task_partial_update.sync_detailed") as update,
109+
mock.patch("taskbadger.sdk.task_partial_update.sync_detailed", new=_update) as update,
69110
):
70111
task = task_for_test()
71112
create.return_value = Response(HTTPStatus.OK, b"", {}, task)
72113

73-
update.return_value = Response(HTTPStatus.OK, b"", {}, task)
74114
args = args or []
75115
result = runner.invoke(app, ["run"] + args + ["--"] + command, catch_exceptions=False)
76116
print(result.output)
@@ -91,7 +131,8 @@ def _test_cli_run(command, return_code, args=None, action=None, update_call_coun
91131
status=StatusEnum.ERROR, data=PatchedTaskRequestData.from_dict({"return_code": return_code})
92132
)
93133

94-
assert update.call_count == update_call_count
95-
update.assert_called_with(
134+
assert update_mock.call_count == update_call_count
135+
update_mock.assert_called_with(
96136
client=settings.client, organization_slug="org", project_slug="project", id="test_id", json_body=body
97137
)
138+
return update_mock

0 commit comments

Comments
 (0)