From f20094c8e2cafeacfb3cc09af66c828a6a1bd4c2 Mon Sep 17 00:00:00 2001 From: Cornelius Soell Date: Mon, 19 Jun 2023 18:00:04 +0200 Subject: [PATCH] Problem: long flag isn't set for subscriptions if topic has between 246 and 255 characters Solution: fix V3.1 encoder to calculate long flag after evaluating the subscribe and cancel commands --- src/v3_1_encoder.cpp | 6 ++- tests/CMakeLists.txt | 1 + tests/test_xpub_topic.cpp | 81 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 tests/test_xpub_topic.cpp diff --git a/src/v3_1_encoder.cpp b/src/v3_1_encoder.cpp index fcf8cc6366..f6b04c3549 100644 --- a/src/v3_1_encoder.cpp +++ b/src/v3_1_encoder.cpp @@ -29,8 +29,6 @@ void zmq::v3_1_encoder_t::message_ready () protocol_flags = 0; if (in_progress ()->flags () & msg_t::more) protocol_flags |= v2_protocol_t::more_flag; - if (in_progress ()->size () > UCHAR_MAX) - protocol_flags |= v2_protocol_t::large_flag; if (in_progress ()->flags () & msg_t::command || in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) { protocol_flags |= v2_protocol_t::command_flag; @@ -39,6 +37,10 @@ void zmq::v3_1_encoder_t::message_ready () else if (in_progress ()->is_cancel ()) size += zmq::msg_t::cancel_cmd_name_size; } + // Calculate large_flag after command_flag. Subscribe or cancel commands + // increase the message size. + if (size > UCHAR_MAX) + protocol_flags |= v2_protocol_t::large_flag; // Encode the message length. For messages less then 256 bytes, // the length is encoded as 8-bit unsigned integer. For larger diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 66c44b1243..6b7f6cfa83 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -64,6 +64,7 @@ set(tests test_srcfd test_stream_timeout test_xpub_manual + test_xpub_topic test_xpub_welcome_msg test_xpub_verbose test_base85 diff --git a/tests/test_xpub_topic.cpp b/tests/test_xpub_topic.cpp new file mode 100644 index 0000000000..3fceafe915 --- /dev/null +++ b/tests/test_xpub_topic.cpp @@ -0,0 +1,81 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +const char bind_address[] = "tcp://127.0.0.1:*"; +char connect_address[MAX_SOCKET_STRING]; + +// 245 chars + 10 chars for subscribe command = 255 chars +const char short_topic[] = "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDE"; + +// 246 chars + 10 chars for subscribe command = 256 chars +const char long_topic[] = "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEF"; + + +template +void test_subscribe_cancel(void *xpub, void *sub, const char (&topic)[SIZE]) +{ + // Ignore '\0' terminating the topic string. + const size_t topic_len = SIZE - 1; + + // Subscribe for topic + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic, topic_len)); + + // Allow receiving more than the expected number of bytes + char buffer[topic_len + 5]; + + // Receive subscription + int rc = TEST_ASSERT_SUCCESS_ERRNO ( + zmq_recv (xpub, buffer, sizeof (buffer), 0)); + TEST_ASSERT_EQUAL_INT (topic_len + 1, rc); + TEST_ASSERT_EQUAL_UINT8 (1, buffer[0]); + TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len); + + // Unsubscribe from topic + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic, topic_len)); + + // Receive unsubscription + rc = TEST_ASSERT_SUCCESS_ERRNO ( + zmq_recv (xpub, buffer, sizeof (buffer), 0)); + TEST_ASSERT_EQUAL_INT (topic_len + 1, rc); + TEST_ASSERT_EQUAL_UINT8 (0, buffer[0]); + TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len); +} + +void test_xpub_subscribe_long_topic () +{ + void *xpub = test_context_socket (ZMQ_XPUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xpub, bind_address)); + size_t len = MAX_SOCKET_STRING; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, connect_address, &len)); + + void *sub = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, connect_address)); + + test_subscribe_cancel (xpub, sub, short_topic); + test_subscribe_cancel (xpub, sub, long_topic); + + // Clean up. + test_context_socket_close (xpub); + test_context_socket_close (sub); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_xpub_subscribe_long_topic); + + return UNITY_END (); +}