Skip to content

Refactor slot response streaming messages#3017

Open
markphelps wants to merge 1 commit into
mainfrom
cog-streaming-1
Open

Refactor slot response streaming messages#3017
markphelps wants to merge 1 commit into
mainfrom
cog-streaming-1

Conversation

@markphelps
Copy link
Copy Markdown
Contributor

Summary

  • Rename slot response log and output messages to explicit streaming-oriented variants.
  • Add protocol version and binary chunk variants for future transport extensions.
  • Include output chunk indexes and update parent/worker routing tests.

Test Plan

  • mise run fmt:rust:fix
  • mise run lint:rust
  • mise run test:rust

@markphelps markphelps requested a review from a team as a code owner May 13, 2026 18:11
@markphelps
Copy link
Copy Markdown
Contributor Author

/bonk review

@markphelps markphelps requested a review from michaeldwan May 13, 2026 18:28
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 13, 2026

@markphelps Bonk workflow was cancelled.

View workflow run · To retry, trigger Bonk again.

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 13, 2026

Based on my review of the code changes in PR #3017, here's my assessment:

Review Summary

LGTM - The PR refactors slot response streaming messages cleanly and adds extensibility for future transport protocols.

Key Changes

  1. Renamed message variants for clarity:

    • LogLogLine (explicit streaming-oriented name)
    • OutputOutputChunk (explicit streaming-oriented name)
  2. Added protocol versioning with ProtocolVersion message and SLOT_RESPONSE_PROTOCOL_VERSION = 1 constant for future compatibility checking

  3. Added BinaryChunk for future WebSocket streaming support (currently logged and ignored)

  4. Added index: u64 to OutputChunk for output chunk ordering/tracking

  5. Updated all call sites in orchestrator, worker, and tests to use new variant names

Code Quality

  • Clean separation of concerns - protocol changes isolated to protocol.rs
  • Backward-compatible approach using explicit protocol version
  • Good test coverage for new variants
  • Proper handling of unknown protocol versions (logs warning, continues)

Minor Observation

The index field in OutputChunk is received but not currently used in the orchestrator (marked with _). This appears to be intentional groundwork for future ordering/reassembly logic.

github run

Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean refactor. The renames from Log/Output to LogLine/OutputChunk are a clear win -- the old names were ambiguous, especially Output which clashed with the concept of final prediction output.

A few things to address:

  • Orphaned snapshot files. The old slot_log_serializes and slot_output_serializes snapshots are still on disk now that the tests were renamed and switched to inline assertions. Delete them or run cargo insta test --unreferenced=delete.
  • BinaryChunk encoding. Vec<u8> serializes as a JSON array of integers, which is wildly inefficient for binary payloads. If this is intentionally placeholder for a future binary transport, a doc comment saying so would prevent someone from accidentally using it over the current JSON IPC.
  • ProtocolVersion scaffolding. Nothing sends this message yet. A brief comment on the intent would help future readers.

Minor nit: send_output burns an index even when the output spills to disk as FileOutput (where index is unused), creating gaps in the sequence. Not a bug since the orchestrator ignores index today, but worth knowing if index continuity ever matters.

BinaryChunk {
mime_type: String,
data: Vec<u8>,
},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec<u8> with serde's default JSON serialization produces an array of integers -- a 1 MB audio chunk becomes ~4 MB of JSON. If this will only ever transit a future binary WebSocket transport (bypassing the JSON codec), a doc comment saying so would be helpful. If it might ever go over the current JSON IPC, consider serde_bytes or base64.

serde_json::to_value(resp).unwrap(),
json!({
"type": "log_line",
"source": "stdout",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old snapshot files slot_log_serializes.snap and slot_output_serializes.snap are now orphaned since these tests were renamed and switched to inline assertions. They should be deleted.

Some((slot_id, result)) = slot_msg_rx.recv() => {
match result {
Ok(SlotResponse::Log { source, data }) => {
Ok(SlotResponse::ProtocolVersion { version }) => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing in the worker sends ProtocolVersion yet -- this arm is dead code for now. A quick comment like // TODO: worker sends ProtocolVersion on slot connect (or wherever it'll be sent) would clarify the intent for future readers.

Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving -- the inline comments from my previous review are all minor/nits. Nothing blocking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants