test(nodes): fill RTMP protocol test coverage gaps#503
Conversation
Add unit tests to increase coverage of the RTMP implementation: rtmp_client.rs: - send_audio() happy path (Publishing state) - send_audio() before Publishing (error path) - handle_error() transitions to Disconnecting - handle_on_status() with non-Publish.Start code - maybe_send_ack() when window exceeded - amf0_decode for unsupported EcmaArray marker - ChunkDecoder 2-byte and 3-byte basic header forms rtmp.rs: - stamp() with None timestamp (fallback behaviour) - Config defaults (sample_rate=48000, channels=2) - convert_annexb_to_avcc with SEI/AUD NAL types Signed-off-by: Devin AI <devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: streamkit-devin <devin@streamkit.dev>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
| let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); | ||
| conn.feed_recv_buf(&status_msg).unwrap(); | ||
|
|
||
| assert_ne!(conn.state(), RtmpConnectionState::Publishing); |
There was a problem hiding this comment.
🟡 BadName rejection test accepts a stuck publish state
The new BadName scenario builds a NetStream.Publish.BadName response but only asserts that the connection is not Publishing. With the current production logic, this response falls through the handle_on_status match because only codes containing Error, Failed, or Rejected are treated as terminal (crates/nodes/src/transport/rtmp_client.rs:1510-1523), so the connection remains PublishPending. In the real node handshake loop, PublishPending is neither success nor failure (crates/nodes/src/transport/rtmp.rs:493-497), causing an invalid stream-name/server rejection to wait until the 10s handshake timeout instead of surfacing the server's error. The test should require the terminal disconnect/error behavior rather than accepting any non-publishing state.
Prompt for agents
Tighten the BadName publish-rejection handling and test. The new test in crates/nodes/src/transport/rtmp_client.rs creates an onStatus response with code NetStream.Publish.BadName, which is a failed publish response, but it currently only asserts the connection is not Publishing. The existing handle_on_status implementation only treats codes containing Error, Failed, or Rejected as terminal, so BadName leaves the connection in PublishPending. RtmpPublishNode::drive_handshake only exits on Publishing or Disconnecting, so this becomes a timeout in production. Update the production status-code handling to classify BadName (and any other known terminal publish failures you consider in scope) as a disconnect/rejection, and update the test to assert Disconnecting and the emitted DisconnectedByPeer reason instead of assert_ne!(Publishing).
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Good catch. Updated the test to pin the current production behavior: assert_eq!(conn.state(), RtmpConnectionState::PublishPending). The comment now documents that BadName falls through the catch-all arm because it doesn't match Error/Failed/Rejected.
Filed #504 to track the production fix — handle_on_status should classify BadName (and similar non-Start codes) as terminal to avoid a silent 10s timeout.
| let mut wire = Vec::new(); | ||
| enc.encode_message(&msg, &mut wire); | ||
|
|
||
| // Verify the basic header uses the 2-byte form. |
There was a problem hiding this comment.
🟡 Added assertion comments restate the following code
The repository's mandatory comment guidelines say not to add line narration that restates what the next line does. This added comment says the test is verifying the 2-byte basic header immediately before an assert_eq! whose message already says the same thing; the same pattern is repeated for the 3-byte case at crates/nodes/src/transport/rtmp_client.rs:2703-2704. These comments should be removed or replaced with non-obvious protocol rationale if there is one.
Prompt for agents
Remove the redundant line-narration comments added around the basic-header assertions in crates/nodes/src/transport/rtmp_client.rs. The assertion messages already state that the csid field should indicate the 2-byte or 3-byte form, so the comments do not add non-obvious context and violate the repository comment guidelines. If there is a protocol quirk worth documenting, keep a short why-focused comment instead of restating the assertion.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Removed — the assertion messages already carry the intent.
| #[test] | ||
| fn stamp_with_no_metadata_defaults_to_zero() { | ||
| let mut state = RtmpTimestampState::new(); | ||
| let pkt = Packet::Binary { | ||
| data: bytes::Bytes::from_static(&[0]), | ||
| metadata: None, | ||
| content_type: None, | ||
| }; | ||
| let ts = state.stamp(&pkt, Track::Video, "test"); | ||
| assert_eq!(ts, 0); |
There was a problem hiding this comment.
📝 Info: New no-metadata timestamp test documents an existing fallback, not a new runtime path
The added stamp_with_no_metadata_defaults_to_zero test covers the current RtmpTimestampState::stamp fallback where missing PacketMetadata::timestamp_us becomes pkt_ms = 0 (crates/nodes/src/transport/rtmp.rs:682-693). I did not flag this as a bug because encoded media packets normally carry metadata through the video/audio encoders, and the test is documenting a defensive fallback rather than changing behavior. If reviewers expect RTMP publishing to reject untimestamped media instead of collapsing timestamps to zero/monotonic increments, that would be a product-contract decision outside this test-only PR.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| #[test] | ||
| fn convert_annexb_sei_and_aud_in_video_data() { | ||
| let mut annexb = Vec::new(); | ||
| // SEI NAL (type 6) | ||
| annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); | ||
| let sei = [0x06, 0x05, 0x04, 0x03]; | ||
| annexb.extend_from_slice(&sei); | ||
| // AUD NAL (type 9) | ||
| annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); | ||
| let aud = [0x09, 0x10]; | ||
| annexb.extend_from_slice(&aud); | ||
| // IDR slice (type 5) | ||
| annexb.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]); | ||
| let idr = [0x65, 0x88, 0x84]; | ||
| annexb.extend_from_slice(&idr); | ||
|
|
||
| let result = convert_annexb_to_avcc(&annexb); | ||
|
|
||
| assert!(result.sps_list.is_empty()); | ||
| assert!(result.pps_list.is_empty()); | ||
|
|
||
| // All three NALs (SEI, AUD, IDR) should appear in video_data. | ||
| let avcc = &result.video_data; | ||
| let mut nal_types = Vec::new(); | ||
| let mut offset = 0; | ||
| while offset + 4 <= avcc.len() { | ||
| let len = u32::from_be_bytes([ | ||
| avcc[offset], | ||
| avcc[offset + 1], | ||
| avcc[offset + 2], | ||
| avcc[offset + 3], | ||
| ]) as usize; | ||
| offset += 4; | ||
| assert!(offset + len <= avcc.len(), "AVCC data truncated"); | ||
| nal_types.push(avcc[offset] & H264_NAL_TYPE_MASK); | ||
| offset += len; | ||
| } | ||
|
|
||
| assert!(nal_types.contains(&6), "SEI should be in video_data"); | ||
| assert!(nal_types.contains(&9), "AUD should be in video_data"); | ||
| assert!(nal_types.contains(&5), "IDR should be in video_data"); |
There was a problem hiding this comment.
📝 Info: SEI/AUD retention test intentionally matches current AVCC conversion semantics
The new convert_annexb_sei_and_aud_in_video_data test asserts that non-SPS/PPS NAL units are preserved in video_data. That matches the converter, which only extracts SPS and PPS and length-prefixes every other NAL (crates/nodes/src/transport/rtmp.rs:1035-1048). I considered whether AUD should be stripped for RTMP/AVCC payloads, but there is no existing repository contract requiring that, and this test merely locks in the current broad "exclude parameter sets only" behavior.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| fn drive_to_publishing() -> (RtmpPublishClientConnection, ChunkEncoder) { | ||
| let url = RtmpUrl::parse("rtmp://x.rtmp.youtube.com/live2/stream-key").unwrap(); | ||
| let mut conn = RtmpPublishClientConnection::new(url); | ||
|
|
||
| let c0c1 = conn.send_buf().to_vec(); | ||
| conn.advance_send_buf(c0c1.len()); | ||
|
|
||
| let mut s0s1s2 = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2); | ||
| s0s1s2.push(0x03); | ||
| s0s1s2.extend_from_slice(&vec![0xBB; HANDSHAKE_SIZE]); | ||
| s0s1s2.extend_from_slice(&c0c1[1..=HANDSHAKE_SIZE]); | ||
| conn.feed_recv_buf(&s0s1s2).unwrap(); | ||
| conn.advance_send_buf(conn.send_buf().len()); | ||
|
|
||
| let mut srv_enc = ChunkEncoder::new(); | ||
|
|
||
| let win_ack = server_encode( | ||
| &mut srv_enc, | ||
| 2, | ||
| MSG_WIN_ACK_SIZE, | ||
| 0, | ||
| 2_500_000u32.to_be_bytes().to_vec(), | ||
| ); | ||
| let mut set_bw_payload = 59_768_832u32.to_be_bytes().to_vec(); | ||
| set_bw_payload.push(2); | ||
| let set_bw = server_encode(&mut srv_enc, 2, MSG_SET_PEER_BANDWIDTH, 0, set_bw_payload); | ||
| let mut server_msg = Vec::new(); | ||
| server_msg.extend_from_slice(&win_ack); | ||
| server_msg.extend_from_slice(&set_bw); | ||
| conn.feed_recv_buf(&server_msg).unwrap(); | ||
| conn.advance_send_buf(conn.send_buf().len()); | ||
|
|
||
| // connect _result | ||
| let mut result_payload = Vec::new(); | ||
| amf0_encode(&Amf0Value::String("_result".to_string()), &mut result_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Number(1.0), &mut result_payload).unwrap(); | ||
| amf0_encode( | ||
| &Amf0Value::Object(vec![ | ||
| ("fmsVer".to_string(), Amf0Value::String("FMS/3,5,7,7009".to_string())), | ||
| ("capabilities".to_string(), Amf0Value::Number(31.0)), | ||
| ]), | ||
| &mut result_payload, | ||
| ) | ||
| .unwrap(); | ||
| amf0_encode( | ||
| &Amf0Value::Object(vec![ | ||
| ("level".to_string(), Amf0Value::String("status".to_string())), | ||
| ( | ||
| "code".to_string(), | ||
| Amf0Value::String("NetConnection.Connect.Success".to_string()), | ||
| ), | ||
| ("description".to_string(), Amf0Value::String("Connection succeeded".to_string())), | ||
| ]), | ||
| &mut result_payload, | ||
| ) | ||
| .unwrap(); | ||
| let result_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, result_payload); | ||
| conn.feed_recv_buf(&result_msg).unwrap(); | ||
| conn.advance_send_buf(conn.send_buf().len()); | ||
|
|
||
| // createStream _result | ||
| let mut cs_payload = Vec::new(); | ||
| amf0_encode(&Amf0Value::String("_result".to_string()), &mut cs_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Number(2.0), &mut cs_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Null, &mut cs_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Number(1.0), &mut cs_payload).unwrap(); | ||
| let cs_msg = server_encode(&mut srv_enc, 3, MSG_COMMAND_AMF0, 0, cs_payload); | ||
| conn.feed_recv_buf(&cs_msg).unwrap(); | ||
| conn.advance_send_buf(conn.send_buf().len()); | ||
|
|
||
| // onStatus → NetStream.Publish.Start | ||
| let mut status_payload = Vec::new(); | ||
| amf0_encode(&Amf0Value::String("onStatus".to_string()), &mut status_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Number(0.0), &mut status_payload).unwrap(); | ||
| amf0_encode(&Amf0Value::Null, &mut status_payload).unwrap(); | ||
| amf0_encode( | ||
| &Amf0Value::Object(vec![ | ||
| ("level".to_string(), Amf0Value::String("status".to_string())), | ||
| ("code".to_string(), Amf0Value::String("NetStream.Publish.Start".to_string())), | ||
| ("description".to_string(), Amf0Value::String("Publishing".to_string())), | ||
| ]), | ||
| &mut status_payload, | ||
| ) | ||
| .unwrap(); | ||
| let status_msg = server_encode(&mut srv_enc, 4, MSG_COMMAND_AMF0, 1, status_payload); | ||
| conn.feed_recv_buf(&status_msg).unwrap(); | ||
| conn.advance_send_buf(conn.send_buf().len()); | ||
|
|
||
| assert_eq!(conn.state(), RtmpConnectionState::Publishing); | ||
| (conn, srv_enc) |
There was a problem hiding this comment.
📝 Info: Publishing helper reuses one server encoder so compressed chunk headers remain realistic
drive_to_publishing returns the same ChunkEncoder it used for the simulated server handshake/status flow. That matters because later tests such as maybe_send_ack_when_window_exceeded send additional server messages through this encoder, preserving server-side chunk-stream header state instead of always resetting to fmt=0. I checked the decoder path (crates/nodes/src/transport/rtmp_client.rs:702-823) and did not find this helper to be masking a bug; it actually keeps tests closer to real RTMP chunk compression behavior.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| #[test] | ||
| fn amf0_decode_ecma_array_unsupported() { | ||
| // AMF0 EcmaArray (type marker 0x08) is not supported by this | ||
| // minimal codec. Verify it produces a descriptive error. | ||
| let mut buf = Vec::new(); | ||
| buf.push(0x08); // EcmaArray marker | ||
| buf.extend_from_slice(&4u32.to_be_bytes()); // associative-count | ||
| // key-value pair: "key" → Number(1.0) | ||
| buf.extend_from_slice(&2u16.to_be_bytes()); | ||
| buf.extend_from_slice(b"ab"); | ||
| buf.push(AMF0_NUMBER); | ||
| buf.extend_from_slice(&1.0f64.to_be_bytes()); | ||
| buf.extend_from_slice(&AMF0_OBJECT_END); | ||
|
|
||
| let err = amf0_decode(&buf).unwrap_err(); | ||
| assert!( | ||
| err.to_string().contains("0x08"), | ||
| "error should mention the unsupported marker: {err}" | ||
| ); |
There was a problem hiding this comment.
📝 Info: ECMA-array test codifies a deliberate unsupported-marker behavior
The added amf0_decode_ecma_array_unsupported test verifies that marker 0x08 produces a descriptive unsupported-type error. I considered whether lack of ECMA Array support is itself an RTMP interoperability bug because some servers can use ECMA arrays in metadata, but the current client command flow only decodes command/status response values needed for publishing and already has a "minimal codec" scope (crates/nodes/src/transport/rtmp_client.rs:287-303, crates/nodes/src/transport/rtmp_client.rs:1376-1524). Without evidence that supported publish responses in this code path require ECMA arrays, I left it as an analysis rather than a bug.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
…comments - BadName test now asserts PublishPending (current production behavior) with a comment noting the catch-all arm gap - Removed line-narration comments on basic header assertions Signed-off-by: Devin AI <devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: streamkit-devin <devin@streamkit.dev>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #503 +/- ##
==========================================
+ Coverage 77.88% 78.08% +0.20%
==========================================
Files 232 232
Lines 64928 65234 +306
Branches 1909 1909
==========================================
+ Hits 50567 50938 +371
+ Misses 14355 14290 -65
Partials 6 6
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
- Simplify EcmaArray test fixture to single-byte marker (decode errors at byte 0, extra bytes were never read) - Remove restating comment above AVCC NAL-type assertions Signed-off-by: Devin AI <devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: streamkit-devin <devin@streamkit.dev>
Summary
crates/nodes/:send_audio()happy/error paths,_errorcommand handling,onStatusrejection for non-Publish.Start codes, ACK window enforcement, unsupported AMF0 type handling, and chunk decoder 2-byte/3-byte basic header forms.rtmp.rscovering timestamp fallback with missing metadata, config defaults, and SEI/AUD NAL pass-through in Annex B → AVCC conversion.drive_to_publishing()test helper that drives a connection through the full handshake → connect → createStream → publish flow.Review & Validation
cargo test -p streamkit-nodes -- transport::rtmp— all 84 tests passjust lintpassesNotes
handle_on_status_bad_name_does_not_publishtest pins the current production behavior whereNetStream.Publish.BadNamefalls through thehandle_on_statuscatch-all arm and the connection staysPublishPending. This is a real gap — filed bug: handle_on_status does not treat NetStream.Publish.BadName as terminal #507 to track the production fix.Link to Devin session: https://staging.itsdev.in/sessions/3faa05c8346d45b1a2d5b85a9702ea47
Requested by: @streamer45
Devin Review
09e0f87(HEAD isa95ff38)