Skip to content

Commit

Permalink
Problem: long flag isn't set for subscriptions if topic has between 2…
Browse files Browse the repository at this point in the history
…46 and 255 characters

Solution: fix V3.1 encoder to calculate long flag after evaluating the subscribe and cancel commands
  • Loading branch information
soell committed Jun 19, 2023
1 parent 7af09a0 commit f20094c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/v3_1_encoder.cpp
Expand Up @@ -29,8 +29,6 @@ void zmq::v3_1_encoder_t::message_ready ()
protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress ()->size () > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command
|| in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) {
protocol_flags |= v2_protocol_t::command_flag;
Expand All @@ -39,6 +37,10 @@ void zmq::v3_1_encoder_t::message_ready ()
else if (in_progress ()->is_cancel ())
size += zmq::msg_t::cancel_cmd_name_size;
}
// Calculate large_flag after command_flag. Subscribe or cancel commands
// increase the message size.
if (size > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;

// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Expand Up @@ -64,6 +64,7 @@ set(tests
test_srcfd
test_stream_timeout
test_xpub_manual
test_xpub_topic
test_xpub_welcome_msg
test_xpub_verbose
test_base85
Expand Down
81 changes: 81 additions & 0 deletions tests/test_xpub_topic.cpp
@@ -0,0 +1,81 @@
/* SPDX-License-Identifier: MPL-2.0 */

#include "testutil.hpp"
#include "testutil_unity.hpp"

SETUP_TEARDOWN_TESTCONTEXT

const char bind_address[] = "tcp://127.0.0.1:*";
char connect_address[MAX_SOCKET_STRING];

// 245 chars + 10 chars for subscribe command = 255 chars
const char short_topic[] = "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDE";

// 246 chars + 10 chars for subscribe command = 256 chars
const char long_topic[] = "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEF";


template <size_t SIZE>
void test_subscribe_cancel(void *xpub, void *sub, const char (&topic)[SIZE])
{
// Ignore '\0' terminating the topic string.
const size_t topic_len = SIZE - 1;

// Subscribe for topic
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic, topic_len));

// Allow receiving more than the expected number of bytes
char buffer[topic_len + 5];

// Receive subscription
int rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (xpub, buffer, sizeof (buffer), 0));
TEST_ASSERT_EQUAL_INT (topic_len + 1, rc);
TEST_ASSERT_EQUAL_UINT8 (1, buffer[0]);
TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len);

// Unsubscribe from topic
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic, topic_len));

// Receive unsubscription
rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (xpub, buffer, sizeof (buffer), 0));
TEST_ASSERT_EQUAL_INT (topic_len + 1, rc);
TEST_ASSERT_EQUAL_UINT8 (0, buffer[0]);
TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len);
}

void test_xpub_subscribe_long_topic ()
{
void *xpub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xpub, bind_address));
size_t len = MAX_SOCKET_STRING;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, connect_address, &len));

void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, connect_address));

test_subscribe_cancel (xpub, sub, short_topic);
test_subscribe_cancel (xpub, sub, long_topic);

// Clean up.
test_context_socket_close (xpub);
test_context_socket_close (sub);
}

int main ()
{
setup_test_environment ();

UNITY_BEGIN ();
RUN_TEST (test_xpub_subscribe_long_topic);

return UNITY_END ();
}

0 comments on commit f20094c

Please sign in to comment.