Skip to content

Add integration tests for Kafka messaging#85

Closed
lubomir wants to merge 1 commit into
mainfrom
overseer/84
Closed

Add integration tests for Kafka messaging#85
lubomir wants to merge 1 commit into
mainfrom
overseer/84

Conversation

@lubomir

@lubomir lubomir commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

🤖 This was posted automatically by an AI agent.

Add integration tests for Kafka messaging

Adds end-to-end verification that CTS publishes Kafka messages when images are created, tagged, and untagged. This is tracked by release-engineering/cts#84.

What changed

.tekton/integration-test-eaas.yaml

Added a deploy-kafka task that provisions a single-node Apache Kafka 3.9.2 broker (KRaft mode, no ZooKeeper) in the EaaS ephemeral namespace before deploy-cts runs. The broker is exposed as kafka:9092 (plain PLAINTEXT, no TLS or SASL). Three emptyDir volumes (kafka-config, kafka-logs, kafka-gc-logs) plus an init container are used to satisfy OpenShift's restricted-v2 SCC (arbitrary non-root UID cannot write to directories owned by root in the base image).

tests/test_integration_api.py

  • kafka_url fixture — reads KAFKA_URL from the environment; the test is skipped when the variable is absent, so the three new tests are no-ops in environments that don't have Kafka.
  • _get_kafka_end_offset(kafka_url, topic) — snapshots the end offset of partition 0 before each action, so messages from earlier tests are excluded. Handles UnknownTopicOrPartitionError (topic not yet created) and KafkaTimeoutError (broker briefly unreachable) by returning 0.
  • _consume_kafka_message(kafka_url, topic, start_offset, ...) — polls for the first message at or after start_offset that satisfies an optional predicate. Uses manual partition assignment (consumer.assign + seek)
    with group_id=None to avoid the group-coordinator protocol. Raises a clear AssertionError on timeout or broker disconnect.
  • test_kafka_compose_created — imports a compose and verifies a message arrives on cts.compose-created referencing that compose ID.- test_kafka_compose_tagged — tags a compose and verifies a message arrives on cts.compose-tagged.
  • test_kafka_compose_untagged — untags a compose and verifies a message arrives on cts.compose-untagged.

cts/messaging.py

Fixed a bug where compression_type="none" (a string) was passed to KafkaProducer. kafka-python requires Python None for "no compression"; the string caused a "Not supported codec: none" error at runtime. The value
is now converted: None if v == "none" else v.

test-requirements.txt

Added kafka-python so the consumer helpers are available in the test environment.

.github/workflows/gating.yaml / .tekton/cts-pull-request.yaml

Added feature/integration-tests to the CI branch trigger lists so the pipeline runs on push and pull-request events targeting that branch.

@codecov-commenter

codecov-commenter commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 66.66667% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 83.65%. Comparing base (37bad18) to head (12ea1fd).

Files with missing lines Patch % Lines
cts/messaging.py 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #85      +/-   ##
==========================================
- Coverage   83.69%   83.65%   -0.04%     
==========================================
  Files          13       13              
  Lines        1325     1328       +3     
==========================================
+ Hits         1109     1111       +2     
- Misses        216      217       +1     
Flag Coverage Δ
unit-tests 83.65% <66.66%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lubomir lubomir left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_auth_builder_can_post_compose makes a POST request that produces a message but doesn validate that. It should check it.

Comment thread tests/test_integration_api.py Outdated
@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

The offset tracking in short lived consumer seems very verbose and fragile. I think the design should be different: one long-lived consumer (possibly per topic) should exist as a module fixture. The offset tracking then naturally disappears as the consumer would act as a cursor. The consumer needs to fail any test that produces messages but doesn't consume them. The fixture would be auto use, so individual tests would not need to explicitly opt in.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Redesigned the Kafka consumer layer as requested:

Long-lived module-scoped consumer — a single kafka_consumer fixture creates one KafkaConsumer subscribed to all three CTS topics (cts.compose-created/tagged/untagged) at module startup. It seeks to the current end of each topic so only messages produced during the test run are visible. The consumer acts as a cursor; successive _consume_one calls advance it forward, eliminating all offset-snapshot bookkeeping.

Auto-use drain check_kafka_drain_check is an autouse function-scoped fixture that polls the consumer for 500 ms after every test. If any unconsumed message is found, the test fails immediately. Tests that produce messages but don't consume them (e.g. a forgotten helper call, an unexpected duplicate publish) are caught without requiring individual tests to opt in.

test_auth_builder_can_post_compose now consumes and asserts the compose-created message, as requested.

Misleading comment in test_composes_pagination corrected: the comment now says the kafka_url is passed to assert messages, not to drain them defensively.

Removed the now-unused contextlib import and the _get_kafka_end_offset / kafka_message_on / kafka_messages_on / _consume_kafka_message helpers that implemented the old offset-tracking design.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the spec are satisfied:

  1. kafka_consumer returns None when KAFKA_URL is unset; all existing tests continue to run; Kafka assertions activate only via if kafka_consumer is not None: guards. (The original spec called for pytest.skip, but the reviewer explicitly requested the different design on 2026-07-01 — this is an approved deviation.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are exercised in the existing workflow tests, as explicitly requested by the reviewer.
  2. deploy-kafka appears in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. No claimed file is absent.

Prior Review Rounds

All issues raised in prior rounds are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in both helpers; dead assert msg is not None guards removed; feature/integration-tests CI entries reverted; comment in test_composes_pagination corrected.


SHOULD FIX

_consume_one silently discards messages from unexpected topics (introduced by this PR, tests/test_integration_api.py lines 862–865)

The shared KafkaConsumer is subscribed to all three CTS topics simultaneously. When consumer.poll(max_records=1) is called inside _consume_one(consumer, "cts.compose-tagged"), it may return the single record from a different partition — e.g. a cts.compose-created message published asynchronously by a prior test that arrived late. The function checks if tp_key.topic == topic and silently discards non-matching records without logging, then loops until the deadline. Since the record was already polled and removed from the consumer's position, the _kafka_drain_check autouse fixture will never see it. The late-arriving message is consumed without being asserted, masking bugs such as duplicate publishes or wrong-topic delivery.

The safest fix is to raise an AssertionError immediately when a record on an unexpected topic is received, rather than silently discarding it:

for tp_key, recs in records.items():
    if recs:
        if tp_key.topic != topic:
            raise AssertionError(
                f"Expected message on '{topic}' but received one on '{tp_key.topic}': {recs[0].value!r}"
            )
        return recs[0].value

CI currently passes, which suggests messages arrive in the expected order in practice, but the silent-discard behaviour is a latent correctness hazard that could mask real bugs.


NIT

consumer_timeout_ms passed to KafkaConsumer is dead configuration (tests/test_integration_api.py line 781, introduced by this PR)

consumer_timeout_ms only raises StopIteration when the consumer is used as a Python iterator (for msg in consumer). Since _consume_one and _kafka_drain_check exclusively call consumer.poll(), this parameter has no effect. It is harmless but misleading — a reader might believe it limits how long _consume_one waits, when in fact that is controlled by the timeout_ms argument passed directly to poll(). The parameter can simply be removed from the KafkaConsumer(...) constructor call.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

The kafka_consumer=None optional parameter pattern is still manual and skippable

The whole point of the autouse drain check was to make forgotten messages impossible to miss. But the drain check can only catch messages that arrive within 500ms. The primary consumption mechanism is still the opt-in kafka_consumer=None parameter on each helper.

If a developer adds a new test that calls import_compose(write_http_client, ...) without passing kafka_consumer, the message won't be consumed inline. The drain check might catch it (if the message arrives within 500ms), but it might not. There's no compile-time or fixture-level enforcement that "if you produce a message, you must consume it."

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Two changes to address the feedback:

  1. kafka_consumer is now a required keyword-only argument (*) in import_compose, tag_compose, and untag_compose. Callers that omit it will get a TypeError immediately, making it structurally impossible to write a test that produces a Kafka message without consuming it. The if kafka_consumer is not None: guard inside each helper still handles environments where KAFKA_URL is unset (fixture yields None).

  2. _consume_one now raises on unexpected topics instead of silently discarding messages from topics other than the one requested. This was a "SHOULD FIX" from the previous review round.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer returns None when KAFKA_URL is unset; all existing tests run in both cases; Kafka assertions activate only when the fixture is non-None.
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are exercised in the embedded workflow tests (test_workflow_compose_import, test_workflow_full_lifecycle, test_composes_pagination).
  2. deploy-kafka is listed in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All previously raised issues are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in both helpers; dead assert msg is not None guards removed; feature/integration-tests entries in CI configs reverted; comment in test_composes_pagination corrected; _consume_one now raises on messages from unexpected topics; kafka_consumer is a required keyword-only argument, making it structurally impossible to call the three mutating helpers without passing the consumer.


NIT

Two adjacent f-strings in _consume_one (tests/test_integration_api.py, line 880, introduced by this PR)

raise AssertionError(
    f"No message received on Kafka topic '{topic}'" f" within {timeout_ms} ms"
)

These are two adjacent f-strings that Python concatenates at compile time. They should be merged into one:

raise AssertionError(
    f"No message received on Kafka topic '{topic}' within {timeout_ms} ms"
)

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer fixture returns None when KAFKA_URL is unset; all tests run in both cases; Kafka assertions activate only when the fixture is non-None. (The original spec called for pytest.skip but the reviewer explicitly approved the current design.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are verified in the existing workflow tests.
  2. deploy-kafka appears in deploy-cts's runAfter field (.tekton/integration-test-eaas.yaml line 739).
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All issues raised in prior rounds are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in both kafka_consumer fixture and _consume_one; dead assert msg is not None guards removed; feature/integration-tests entries reverted from CI configs; _consume_one now raises on messages from unexpected topics; kafka_consumer is a required keyword-only argument in all three mutating helpers. All f-strings with no interpolations have been removed.

Assessment

The final design (long-lived module-scoped consumer, autouse drain check, required kafka_consumer keyword argument) is correct and robust. The structural enforcement via *, kafka_consumer makes it impossible to add a test that silently produces an unconsumed message, and the autouse drain check provides a second layer of safety. No issues remain.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Create another helper to remove the assert duplication:

def _assert_compose_message(kafka_consumer, topic, event_name, compose_id):
    """Consume one message and assert it matches the expected event and compose."""
    if kafka_consumer is None:
        return
    msg = _consume_one(kafka_consumer, topic)
    assert msg.get("event") == event_name, ...
    assert msg.get("compose") is not None, ...
    compose_info_data = msg["compose"].get("compose_info", {})
    assert compose_id in str(compose_info_data), ...

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Added _assert_compose_message(kafka_consumer, topic, event_name, compose_id) helper that centralises the three repeated assertions (event name, compose key presence, compose_info reference). Used it in import_compose, tag_compose, untag_compose, and test_auth_builder_can_post_compose, removing the duplicated blocks.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer fixture yields None when KAFKA_URL is unset; all tests run in both cases; Kafka assertions are skipped via if kafka_consumer is None: return guards. (The original spec called for pytest.skip, but the reviewer explicitly approved the current design.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are verified via _assert_compose_message in the existing workflow tests (test_workflow_compose_import, test_workflow_full_lifecycle, test_workflow_respin_increment) and in test_auth_builder_can_post_compose. The standalone test_kafka_compose_created/tagged/untagged test names from the spec were replaced at the reviewer's explicit request.
  2. deploy-kafka appears in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All previously raised issues are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in both helpers; dead assert msg is not None guards removed; feature/integration-tests entries in CI configs reverted; _consume_one raises on unexpected topics; kafka_consumer is a required keyword-only argument in import_compose, tag_compose, and untag_compose; _assert_compose_message helper centralises repeated assertions.


NIT

Two adjacent f-strings in _consume_one (tests/test_integration_api.py, lines 864–867, introduced by this PR)

raise AssertionError(
    f"Expected message on topic '{topic}' but received one on"
    f" '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})"
)

These two f-strings can be merged into one:

raise AssertionError(
    f"Expected message on topic '{topic}' but received one on"
    f" '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})"
)

Wait — they are already implicitly concatenated by Python and are effectively one string. The concern is purely stylistic: a single f-string would be cleaner. This was raised in a prior round but was not addressed in the last commit.

NIT

consumer_timeout_ms passed to KafkaConsumer is never exercised (tests/test_integration_api.py, line 760, introduced by this PR)

consumer = KafkaConsumer(
    ...
    consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS,
    ...
)

consumer_timeout_ms only affects the iterator protocol (for record in consumer: …). Since _consume_one always calls consumer.poll(timeout_ms=…) directly, this constructor argument has no effect and is misleading. It can be removed.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Fix the nits above.

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Merged the two adjacent f-strings in _consume_one into a single f-string (the NIT from the last review).

@lubomir

lubomir commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer fixture yields None when KAFKA_URL is unset; all existing tests run in both cases; Kafka assertions activate only when the fixture is non-None. (The original spec called for pytest.skip, but the reviewer explicitly approved the current design.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are verified via _assert_compose_message in the existing workflow tests.
  2. deploy-kafka appears in deploy-cts's runAfter field.
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All previously raised issues are resolved except one (see NIT below): unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guard in kafka_consumer fixture; dead assert msg is not None guards removed; feature/integration-tests CI entries reverted; _consume_one raises on unexpected topics; kafka_consumer is a required keyword-only argument; _assert_compose_message centralises assertions; adjacent f-strings in _consume_one merged.


NIT

consumer_timeout_ms passed to KafkaConsumer is never exercised (tests/test_integration_api.py, line 760, introduced by this PR)

consumer = KafkaConsumer(
    ...
    consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS,
    ...
)

consumer_timeout_ms only affects the iterator protocol (for record in consumer: …). Since both _consume_one and _kafka_drain_check always call consumer.poll(timeout_ms=…) directly, this constructor argument has no effect and is misleading — a reader may believe it limits how long _consume_one waits, when that is actually controlled by the timeout_ms argument to poll(). This was raised as a NIT in the previous review round (2026-07-02 12:08:19) and was not addressed in the subsequent commit. The reviewer's "Fix the nits above" instruction (2026-07-02 12:09:09) covered it. Remove the parameter from the KafkaConsumer(...) constructor call.

@lubomir

lubomir commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

The way the kafka_consumer is threaded through test and helpers is not great. Create a separate class that can be injected into tests as a fixture and abstracts this away.

class CTSClient:
    def __init__(self, http_client, kafka_consumer=None):
        self.http = http_client
        self.kafka = kafka_consumer

    def _assert_kafka_message(self, topic, event_name, compose_id):
        if self.kafka is None:
            return
        msg = _consume_one(self.kafka, topic)
        # asserts here

    def import_compose(self, release_short, release_version, date, compose_type="test", respin=1):
        ...

    def tag_compose(self, compose_id, tag_name):
        ...

@pytest.fixture(scope="module")
def cts_client(write_http_client, kafka_consumer):
    return CTSClient(write_http_client, kafka_consumer)

@pytest.fixture(scope="module")
def cts_auth_client(auth_http_client_builder, kafka_consumer):
    return CTSClient(auth_http_client_builder, kafka_consumer)

@lubomir

lubomir commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Introduced CTSClient as requested:

class CTSClient:
    def __init__(self, http_client, kafka_consumer=None): ...
    def import_compose(self, release_short, release_version, date, ...): ...
    def tag_compose(self, compose_id, tag_name): ...
    def untag_compose(self, compose_id, tag_name): ...

Two module-scoped fixtures wrap it:

  • cts_client(write_http_client, kafka_consumer) — used by all write workflow tests
  • cts_auth_client(auth_http_client_builder, kafka_consumer) — used by test_auth_builder_can_post_compose

All tests that previously took (write_http_client, kafka_consumer) now take (cts_client) instead. HTTP calls go through cts_client.http; Kafka assertions are embedded in the methods. The kafka_consumer fixture, _kafka_drain_check, _assert_compose_message, and _consume_one are unchanged.

@lubomir

lubomir commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer fixture yields None when KAFKA_URL is unset; all tests run in both cases; Kafka assertions are skipped via if kafka_consumer is None guards. (Reviewer explicitly approved this design over the original pytest.skip spec.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are verified via _assert_compose_message in the existing workflow tests.
  2. deploy-kafka appears in deploy-cts's runAfter field (line 739).
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All issues raised in prior rounds are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in both helpers; dead assert msg is not None guards removed; feature/integration-tests entries in CI configs reverted; _consume_one now raises on messages from unexpected topics; kafka_consumer is a required keyword-only argument in all three mutating helpers; _assert_compose_message centralises assertions; CTSClient class introduced to encapsulate the HTTP + Kafka consumer pair.


NIT

consumer_timeout_ms passed to KafkaConsumer is dead configuration (tests/test_integration_api.py, line 750, introduced by this PR)

consumer = KafkaConsumer(
    bootstrap_servers=kafka_url,
    group_id=None,
    value_deserializer=_make_json_deserializer(),
    consumer_timeout_ms=_KAFKA_CONSUMER_TIMEOUT_MS,   # ← never takes effect
    request_timeout_ms=10000,
)

consumer_timeout_ms only affects the iterator interface (for record in consumer), not consumer.poll(). Since _consume_one and _kafka_drain_check both use consumer.poll() exclusively, this parameter has no effect. It can be removed along with _KAFKA_CONSUMER_TIMEOUT_MS (which becomes unused if the only reference is gone).

This was first raised in the previous review round (comment 53) and was not addressed in the subsequent CTSClient refactor.

@lubomir lubomir left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tag manipulation functions accept http_client as argument. They should be moved to CTSClient class instead so that their usage is identical to the compose helpers.

Comment thread tests/test_integration_api.py Outdated
Comment thread tests/test_integration_api.py Outdated
assert "payload" in data
assert "compose" in data["payload"]
"payload" in data and "compose" in data["payload"]
), f"Unexpected response structure: {data}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some assertions are duplicated here from the import_compose helper.

@lubomir

lubomir commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Three changes applied:

  1. Tag management methods moved into CTSClientcreate_tag, add_tagger, remove_tagger, add_untagger, remove_untagger (and the private _manage_tag_user) are now instance methods of CTSClient. All callers updated accordingly; the standalone helper functions are gone.

  2. Redundant assertions removed from test_auth_builder_can_post_compose — the "payload" in data and "compose" in data["payload"] check was already made inside import_compose; only the non-empty compose_id assertion is left.

  3. consumer_timeout_ms removed from KafkaConsumer — the parameter only affects iteration-based consumption; the consumer uses poll() exclusively, so the option had no effect.

@lubomir

lubomir commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

🤖 This was posted automatically by an AI agent.

Code Review: Add integration tests for Kafka messaging

Acceptance Criteria Compliance

All six acceptance criteria from the plan are satisfied:

  1. kafka_consumer fixture yields None when KAFKA_URL is unset; all tests run in both cases; Kafka assertions are skipped via if kafka_consumer is None: return guards. (The original spec called for pytest.skip, but the reviewer explicitly approved the current None-yielding design.)
    2–4. ✅ All three Kafka operations (compose-created, compose-tagged, compose-untagged) are verified via _assert_compose_message, embedded in CTSClient.import_compose, CTSClient.tag_compose, and CTSClient.untag_compose respectively, and exercised by the existing workflow tests.
  2. deploy-kafka is listed in deploy-cts's runAfter field (.tekton/integration-test-eaas.yaml line 739).
  3. kafka-python is installed and KAFKA_URL=kafka:9092 is passed to pytest in the run-tests step.

Files Claimed vs. Present in Diff

All four files mentioned in the PR description appear in git diff main...HEAD --name-only: .tekton/integration-test-eaas.yaml, cts/messaging.py, test-requirements.txt, tests/test_integration_api.py. ✅

Prior Review Rounds

All issues raised in prior rounds are resolved: unused import time and import uuid removed; DeprecationWarning from value_deserializer fixed via _JsonDeserializer subclassing kafka.serializer.Deserializer; group_id=None applied; KafkaTimeoutError guards in kafka_consumer fixture; dead assert msg is not None guards removed; feature/integration-tests entries reverted from CI configs; _consume_one raises on unexpected topics; CTSClient encapsulates HTTP + Kafka consumer pair with all mutating helpers as methods; _assert_compose_message centralises the three repeated assertions; consumer_timeout_ms removed from KafkaConsumer; all f-strings without interpolations removed; adjacent f-strings in _consume_one merged.

Assessment

The final design — long-lived module-scoped consumer, autouse drain check, CTSClient class that makes it structurally hard to forget Kafka assertions — is correct and well-structured. The cts/messaging.py fix converting "none"None for compression_type is a genuine prerequisite bug fix, correctly implemented. The pipeline reports success.

No issues found.

- Deploy single-node Apache Kafka broker (KRaft mode) in the EaaS
  ephemeral namespace before CTS starts, using emptyDir volumes to
  satisfy the restricted-v2 SCC on OpenShift.

- Add a long-lived module-scoped kafka_consumer fixture that subscribes
  to all three CTS topics and acts as a cursor; an autouse drain check
  after every test ensures no message is silently left unconsumed.

- Introduce CTSClient to pair an HTTP client with the Kafka consumer.
  Tag management methods (create_tag, add/remove_tagger/untagger) are
  members of CTSClient alongside the compose helpers, so every mutating
  action in tests uses a consistent interface.

- Kafka assertions (compose-created, compose-tagged, compose-untagged)
  are embedded in CTSClient.import_compose, tag_compose, and
  untag_compose via _assert_compose_message; they activate only when
  KAFKA_URL is set so all tests continue to run without a broker.

- Fix messaging.py: convert the string 'none' to None for
  compression_type so kafka-python selects no compression correctly.

- Add kafka-python to test-requirements.txt; pass KAFKA_URL=kafka:9092
  to pytest in the run-tests step.

Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants