Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 4 (#2199). Companion to the connection-init issue: covers task-output streaming, task revocation, the ansible-vault password helper, the ansible-facts freshness check, and the first iterator helper from osism/utils/__init__.py.
Scope
Add tests/unit/utils/test_init_task_output.py covering the helpers below in osism/utils/__init__.py.
Test targets
first(iterable, condition=lambda x: True) — __init__.py:348
Pure function (already has doctests).
first((1, 2, 3), condition=lambda x: x % 2 == 0) → 2
first(range(3, 100)) → 3 (default condition)
first(()) → raises StopIteration
- No item satisfies condition → raises
StopIteration
- Generator input (consumed lazily) → returns the first match without exhausting downstream
fetch_task_output(task_id, timeout=…, enable_play_recap=False) — __init__.py:371
Patch _init_redis to return a MagicMock redis client. Capture stdout via capsys.
- One stdout message +
rc message + quit action → prints stdout, returns the rc value (as int)
- Default rc when no
rc message is sent → returns 0
enable_play_recap=True and stdout contains "PLAY RECAP" → log message about completion is emitted
enable_play_recap=False and stdout contains "PLAY RECAP" → no extra log
- Each delivered message →
xdel(task_id, last_id) called
- After processing one batch →
last_id updated so the next xread uses it ({str(task_id): last_id})
xread returns None for the entire timeout window → TimeoutError raised
xread returning data resets the stop-time deadline
quit action → r.close() called and rc returned immediately (return rc)
- Custom timeout from env (
OSISM_TASK_TIMEOUT) propagates (the module reads it at import; one happy-path test that the function honours an explicit timeout=… kwarg is enough)
push_task_output(task_id, line) — __init__.py:407
Patch _init_redis.
- Calls
xadd(task_id, {"type": "stdout", "content": line}) exactly once
finish_task_output(task_id, rc=None) — __init__.py:411
rc=None → only the quit action is published ({"type": "action", "content": "quit"})
rc=0 → only quit is published (the if rc: check is intentionally truthy)
rc=1 → both rc message and quit action published, in that order
- Verify the order via
xadd.call_args_list
revoke_task(task_id) — __init__.py:418
Patch celery.Celery and osism.tasks.Config.
- Happy path → instantiates
Celery("task"), calls app.config_from_object(Config), then app.control.revoke(task_id, terminate=True), returns True
Celery(...) raises → returns False, error logged
app.control.revoke raises → returns False, error logged
get_ansible_vault_password() — __init__.py:318
Patch builtins.open (mock_open(read_data="<fernet-key>")), cryptography.fernet.Fernet, and _init_redis.
- Key file readable, redis returns encrypted bytes,
Fernet().decrypt(...) returns valid password → returns the decoded password (stripped? — the production code does not strip, only checks password.strip() == ""; verify the actual return value is the raw decoded text)
- Redis returns
None for key "ansible_vault_password" → raises ValueError("Ansible vault password is not set in Redis"), error logged before re-raise
- Decryption returns empty string →
ValueError("...empty or contains only whitespace")
- Decryption returns whitespace-only → same
ValueError
open(keyfile) raises FileNotFoundError → propagates after logging (test by patching open to raise)
Fernet(key).decrypt(...) raises → propagates after logging
check_ansible_facts(max_age=None) — __init__.py:560
Patch _init_redis and osism.utils.settings.FACTS_MAX_AGE. Use frozen-time helpers (or pass max_age=... explicitly).
r.scan raises → warning logged, function returns early without further work
- No keys found → warning
"No Ansible facts found...", returns
- One stale host (
epoch older than max_age) → warning logged listing the stale host with age in seconds
- One fresh host → no warning logged
- Mix of fresh and stale → only stale listed
- Hostname extraction: key
b"ansible_facts<hostname>" → strips the "ansible_facts" prefix
- Bytes vs. str keys both handled
r.get(key) returns None → host skipped (continue)
r.get returns malformed JSON → caught (JSONDecodeError), debug log, host skipped
epoch missing → debug log "facts missing ansible_date_time.epoch", host skipped
epoch non-numeric (e.g. string "foo") → caught (ValueError/TypeError), debug log, host skipped
r.scan paginates correctly: first call returns (cursor!=0, [k1]), second (0, [k2]) → both keys processed
max_age=None → uses settings.FACTS_MAX_AGE
- Explicit
max_age=10 → overrides settings
Mocking hints
- For
fetch_task_output, build the xread return value as the production code expects:
data = [(b"task-id", [(b"123-0", {b"type": b"stdout", b"content": b"hello"})])]
Use side_effect on the mock to return a sequence of payloads (one per loop iteration), ending with None only when you want the timeout path.
- For
check_ansible_facts, prefer providing max_age=10 and crafting epoch values relative to time.time() rather than freezing time.
_init_redis is the single dependency most of these helpers share — patch it once per test or via a fixture returning a MagicMock.
Definition of Done
Dependencies
Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 4 (#2199). Companion to the connection-init issue: covers task-output streaming, task revocation, the ansible-vault password helper, the ansible-facts freshness check, and the
firstiterator helper fromosism/utils/__init__.py.Scope
Add
tests/unit/utils/test_init_task_output.pycovering the helpers below inosism/utils/__init__.py.Test targets
first(iterable, condition=lambda x: True)—__init__.py:348Pure function (already has doctests).
first((1, 2, 3), condition=lambda x: x % 2 == 0)→2first(range(3, 100))→3(default condition)first(())→ raisesStopIterationStopIterationfetch_task_output(task_id, timeout=…, enable_play_recap=False)—__init__.py:371Patch
_init_redisto return aMagicMockredis client. Capture stdout viacapsys.rcmessage +quitaction → prints stdout, returns the rc value (asint)rcmessage is sent → returns0enable_play_recap=Trueand stdout contains"PLAY RECAP"→ log message about completion is emittedenable_play_recap=Falseand stdout contains"PLAY RECAP"→ no extra logxdel(task_id, last_id)calledlast_idupdated so the nextxreaduses it ({str(task_id): last_id})xreadreturnsNonefor the entire timeout window →TimeoutErrorraisedxreadreturning data resets the stop-time deadlinequitaction →r.close()called and rc returned immediately (return rc)OSISM_TASK_TIMEOUT) propagates (the module reads it at import; one happy-path test that the function honours an explicittimeout=…kwarg is enough)push_task_output(task_id, line)—__init__.py:407Patch
_init_redis.xadd(task_id, {"type": "stdout", "content": line})exactly oncefinish_task_output(task_id, rc=None)—__init__.py:411rc=None→ only thequitaction is published ({"type": "action", "content": "quit"})rc=0→ onlyquitis published (theif rc:check is intentionally truthy)rc=1→ both rc message and quit action published, in that orderxadd.call_args_listrevoke_task(task_id)—__init__.py:418Patch
celery.Celeryandosism.tasks.Config.Celery("task"), callsapp.config_from_object(Config), thenapp.control.revoke(task_id, terminate=True), returnsTrueCelery(...)raises → returnsFalse, error loggedapp.control.revokeraises → returnsFalse, error loggedget_ansible_vault_password()—__init__.py:318Patch
builtins.open(mock_open(read_data="<fernet-key>")),cryptography.fernet.Fernet, and_init_redis.Fernet().decrypt(...)returns valid password → returns the decoded password (stripped? — the production code does not strip, only checkspassword.strip() == ""; verify the actual return value is the raw decoded text)Nonefor key"ansible_vault_password"→ raisesValueError("Ansible vault password is not set in Redis"), error logged before re-raiseValueError("...empty or contains only whitespace")ValueErroropen(keyfile)raisesFileNotFoundError→ propagates after logging (test by patchingopento raise)Fernet(key).decrypt(...)raises → propagates after loggingcheck_ansible_facts(max_age=None)—__init__.py:560Patch
_init_redisandosism.utils.settings.FACTS_MAX_AGE. Use frozen-time helpers (or passmax_age=...explicitly).r.scanraises → warning logged, function returns early without further work"No Ansible facts found...", returnsepocholder thanmax_age) → warning logged listing the stale host with age in secondsb"ansible_facts<hostname>"→ strips the"ansible_facts"prefixr.get(key)returnsNone→ host skipped (continue)r.getreturns malformed JSON → caught (JSONDecodeError), debug log, host skippedepochmissing → debug log "facts missing ansible_date_time.epoch", host skippedepochnon-numeric (e.g. string"foo") → caught (ValueError/TypeError), debug log, host skippedr.scanpaginates correctly: first call returns(cursor!=0, [k1]), second(0, [k2])→ both keys processedmax_age=None→ usessettings.FACTS_MAX_AGEmax_age=10→ overrides settingsMocking hints
fetch_task_output, build thexreadreturn value as the production code expects:side_effecton the mock to return a sequence of payloads (one per loop iteration), ending withNoneonly when you want the timeout path.check_ansible_facts, prefer providingmax_age=10and craftingepochvalues relative totime.time()rather than freezing time._init_redisis the single dependency most of these helpers share — patch it once per test or via a fixture returning aMagicMock.Definition of Done
tests/unit/utils/test_init_task_output.pycreatedpytest --cov=osism.utilsfor the targeted helpers ≥ 90 %pipenv run pytest tests/unit/utils/test_init_task_output.pypasses locallyflake8,mypy,python-blackremain greenpython-osism-unit-testspassesDependencies