feat(stream): kbagent stream command group for Data Streams (OTLP) (0.50.0)#364
Conversation
padak
left a comment
There was a problem hiding this comment.
Review of #364 — feat(stream): kbagent stream command group for Data Streams (OTLP) (0.50.0)
Generated by
kbagent-pr-reviewersubagent. Verdict and findings below
are advisory; the human author retains every veto. CI-coverable issues
(lint, format, tests) are confirmed viamake check, not duplicated here.
Summary
This PR introduces the kbagent stream command group (4 commands: list, create-source, detail, delete) for Keboola Data Streams (OpenTelemetry/OTLP), targeting v0.50.0. The implementation correctly follows the 3-layer architecture, all plugin synchronization surfaces are updated, OPERATION_REGISTRY is populated, the secret-masking logic is in the right layer, version tags are present in gotchas.md, and make check passes (3798 tests, ruff/ty clean). The PR is well-structured and the author walked the full checklist. The only finding is a cosmetic notation inconsistency between CLAUDE.md and context.py on the stream delete flag documentation. APPROVE.
Verdict
- Verdict: APPROVE
- Blocking findings: 0
- Non-blocking findings: 0
- Nits: 1
Blocking findings
(none)
Non-blocking findings
(none)
Nits
[NIT-1]CLAUDE.md:378vssrc/keboola_agent_cli/commands/context.py(stream delete section) —CLAUDE.mddocumentsstream deleteas[--yes] [--force](independent optional flags), whilecontext.py'sAGENT_CONTEXTrenders the same flags as[--yes|--force](either/or notation). Both are technically accurate (the flags are independent aliases in the implementation), but the two surfaces now describe the same command differently. Using[--yes|--force]consistently in both would match how the--forcehelp text ("Alias for --yes") frames the relationship.
Verification log
gh pr view 364 --json title,body,files,additions,deletions,baseRefName,headRefName,labels,state→ 26 files, +1959/-5, conventionalfeat(stream):prefix, state=OPEN ✓git rev-parse --abbrev-ref HEAD(worktree) →claude/stupefied-leakey-7625f2matches PR branch ✓make check→ 3798 passed, 8 skipped, 14 warnings in 80s; ruff/ty/changelog-check/check-error-codes all green ✓- Layer violation check (
grep typer/click in services/,grep httpx in commands/,grep formatter in client) → all empty ✓ - 3-layer architecture:
StreamClientinheritsBaseHttpClient; only imported instream_service.py;commands/stream.pyimports no client directly ✓ - OPERATION_REGISTRY (
permissions.py:45-48):stream.list=read,stream.detail=read,stream.create-source=write,stream.delete=destructive✓ commands/context.pyAGENT_CONTEXT: stream commands present under correct heading ✓CLAUDE.md## All CLI Commands:kbagent streamgroup added at line 375-378 ✓plugins/kbagent/agents/keboola-expert.md: version gate entrystream command group = 0.50.0+at line 116; tool selection matrix row for OTLP provisioning at line 142 ✓plugins/kbagent/skills/kbagent/references/commands-reference.md: stream section added at lines 121-125 ✓plugins/kbagent/skills/kbagent/references/gotchas.md:## \stream`: two hosts, secret-in-URL, no auto-sinks (since v0.50.0)` at line 2262, version tag present ✓plugins/kbagent/skills/kbagent/references/stream-workflow.md: new file created, mental model + end-to-end OTLP walkthrough ✓plugins/kbagent/skills/kbagent/SKILL.md:streamtrigger keywords and decision-table rows added ✓- Secret masking:
StreamService._mask()replaces secret in every endpoint surface;_sanitise_source()redacts thesecretkey and allurlfields in the raw source object;--revealplumbs through correctly;--jsonoutput is sanitised the same way as human mode ✓ - Constants:
STREAM_API_TIMEOUT,STREAM_TASK_POLL_INTERVAL,STREAM_TASK_TIMEOUT,OTLP_SIGNAL_PATHS,OTLP_PROTOCOL,OTLP_SINK_SIGNALSetc. all inconstants.py— no magic numbers in production code ✓ --hintdefinitions: no new entry added (correct, deprecated per CONTRIBUTING.md) ✓- Server router (
server/routers/stream.py): 4 routes (list, create-source, detail, delete) matching the 4 CLI commands; wired intoserver/app.pyandserver/dependencies.py✓ - Test coverage:
test_stream_client.py(14 tests),test_stream_service.py(16 tests),test_stream_cli.py(11 tests); E2Etest_stream_otlp_e2eintest_e2e.py✓ - Raw error_code string check: the single flagged instance (
error_code="API_ERROR"intest_stream_cli.py) is in a test that constructs aKeboolaApiErrorside_effect — an established repo-wide test pattern (same pattern intest_services.py:3922,test_cli.py:117, etc.);KeboolaApiError.error_codeacceptsstr | ErrorCodepererrors.py:159. Production code usesErrorCodeenum throughout ✓ - Token / secret in logs: no token appears in log lines;
mask_token()inherited fromBaseHttpClient; OTLP secret handled separately byStreamService._mask()✓ - Behavior reproduction: could not run live E2E (no
E2E_API_TOKEN/E2E_URLin this environment). PR description states live-validated against project 10539 with full OTLP round-trip (create → ingest → row in Storage → cleanup). Test infrastructure corroborates this claim via the E2E test fixture. - Backward compat: new command group; no existing fields changed; new-only surfaces ✓
Open questions for the author
(none)
…(0.50.0) Add a `kbagent stream` command group so OpenTelemetry / OTLP Data Streams sources can be provisioned and introspected from the CLI instead of copy-pasting endpoints out of the Keboola UI (closes #357). Commands: `stream list`, `stream create-source`, `stream detail`, `stream delete`. Architecture: - Stream control plane lives on a separate host derived from the project's Storage URL (connection.<region> -> stream.<region>, same scheme as ai./queue.) and authenticates with the per-project Storage token (X-StorageApi-Token) -- no manage token. - The OTLP ingestion endpoint (stream-in.<region>/otlp/.../<secret>) is returned by the API in source.otlp.url (never derived) with the secret in the URL path -- masked by default in every surface, --reveal to print it. - create-source --type otlp auto-provisions the logs/metrics/traces sinks (bucket in.c-otlp-<source>) so data actually lands in Storage, matching the UI; provisioning is idempotent and --no-sinks opts out. - create/delete/sink-create are async Tasks polled to completion. New layers: stream_client.py (StreamClient + create_sink + task polling), services/stream_service.py (alias resolution, secret masking, detail assembly, sink provisioning), commands/stream.py, server/routers/stream.py (1:1 serve REST). Wired into cli.py, permissions.py (read/write/destructive), constants.py, server dependencies/app. Tests: test_stream_client.py (14), test_stream_service.py (16), test_stream_cli.py (11); E2E test_stream_otlp_e2e (make test-e2e-stream). Live-validated against a real project: create source -> 3 sinks -> POST OTLP/HTTP logs -> 3 rows landed in in.c-otlp-<name>.logs -> read back via workspace query. Docs synced (CLAUDE.md, context.py AGENT_CONTEXT, keboola-expert.md, SKILL.md, commands-reference.md, gotchas.md, new stream-workflow.md); version 0.50.0 + version-sync.
… budget after 0.50.0 merge
69dbec8 to
7efac70
Compare
What
Adds a
kbagent streamcommand group for Keboola Data Streams (OpenTelemetry / OTLP), so OTLP sources can be provisioned and introspected from the CLI instead of copy-pasting endpoints out of the UI. Closes #357.Commands:
stream list --project P [--branch N]stream create-source --project P --name N [--type otlp|http] [--branch N] [--if-not-exists] [--no-sinks] [--reveal]stream detail [SOURCE_ID | --name N] --project P [--branch N] [--reveal]stream delete SOURCE_ID --project P [--branch N] [--dry-run] [--yes|--force]Why / key design points
stream.<region>(derived from the project'sconnection.<region>Storage URL, same scheme asai./queue.) and authenticates with the per-project Storage token — no manage token, no extra prompt. The OTLP ingestion endpoint (stream-in.<region>/otlp/<projectId>/<sourceName>/<secret>) lives on a different host and is returned by the API insource.otlp.url— never derived.detail/create-sourcemask the secret in the endpoint, the per-signal endpoints, and the raw--jsonsource echo.--revealprints it (e.g. to wireOTEL_EXPORTER_OTLP_ENDPOINTfor a daemon).POST /sourcescreates only a bare source (no sinks → data has nowhere to land). Matching the Keboola UI,create-source --type otlpauto-creates the threelogs/metrics/tracestable sinks into bucketin.c-otlp-<source>(mapping:iduuid + ingestdatetime+body= full flattened OTLP record JSON). Idempotent (only missing signals are added);--no-sinksopts out.Permission classes:
stream.list/stream.detail= read,stream.create-source= write,stream.delete= destructive.Layers
stream_client.py(StreamClient— source/sink CRUD + task polling) ·services/stream_service.py(alias resolution, secret masking, detail assembly, sink provisioning) ·commands/stream.py·server/routers/stream.py(1:1kbagent serveREST). Wired intocli.py,permissions.py,constants.py, serverdependencies.py/app.py.Tests
test_stream_client.py(14),test_stream_service.py(16),test_stream_cli.py(11).test_stream_otlp_e2e(make test-e2e-stream) — full create → detail (masked +--reveal) → delete round-trip, asserts auto-provisioned destination tables.POST /v1/logs(OTLP/HTTP JSON, HTTP 200) → 3 rows landed inin.c-otlp-<name>.logswithin ~18s → read back viaworkspace query. All test resources cleaned up afterwards.ruff/ruff format/tyclean;changelog-check+check-error-codesgreen.Docs / release
Version 0.50.0 (
make version-sync→ plugin.json / marketplace.json / uv.lock). Synced:CLAUDE.md,context.pyAGENT_CONTEXT,keboola-expert.md(version gate + tool matrix),SKILL.md(decision table + triggers),commands-reference.md,gotchas.md, newstream-workflow.md.Reviewer notes
id/datetime/body); thebodycolumn captures the full flattened OTLP record so nothing is lost — users can refine per-signal columns in the UI. Per-signal column-mapping CRUD is out of scope (issue is read-first + create + delete).