Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message queue keeps increasing in ZMQ_CONFLATE sub socket #3171

Closed
wangkf1 opened this issue Jun 21, 2018 · 3 comments
Closed

Message queue keeps increasing in ZMQ_CONFLATE sub socket #3171

wangkf1 opened this issue Jun 21, 2018 · 3 comments

Comments

@wangkf1
Copy link

@wangkf1 wangkf1 commented Jun 21, 2018

Issue description

When setting a SUB socket to use ZMQ_CONFLATE, I expected the message queue to hold only one newest message, and free the old one. But in a test case where a PUB program is sending messages in a loop, and a SUB program sits there collecting these messages, the SUB program collects memory. Some discussion on the mailing list here: https://lists.zeromq.org/pipermail/zeromq-dev/2018-June/032561.html

Environment

Ubuntu 16.04, ZMQ 4.2.4

Minimal test code / Steps to reproduce the issue

  1. Code is in the first email in the link above. Although I am using CZMQ, another person reported that the same thing happens when using libzmq functions.
  2. Build and run the two processes. I included the getline() just so it sits and gathers up messages.
  3. Watch the SUB process skyrocket in memory

What's the expected result?

I would have expected that when ZMQ_CONFLATE is set, that the queue doesn't collect messages, and does what it says: only collect the most recent message and free/replace old with the new message.

James Harvey, on mailing list thread, mentioned that in config.hpp, I can compile libzmq with inbound_poll_rate set to 1, but I assume that has its own tradeoffs and would apply to all sockets I create.

I am unsure if this is a bug, since I expected ZMQ_CONFLATE to not build up memory like a normal SUB socket would. Either this is something that can be fixed for ZMQ_CONFLATE sockets, or there is some setting/configuration/behavior that I should be using that I don't know about

@jamesdillonharvey

This comment has been minimized.

Copy link
Contributor

@jamesdillonharvey jamesdillonharvey commented Jun 22, 2018

Adding a little more info from my investigation.

The memory rises until zmq_recv (and zmq::socket_base_t::recv ) is called 100 times. It looks like an optimization, batching the work so it doesn't have to call process_commands every time if there are messages in the queue.

if (++_ticks == inbound_poll_rate) {

process_commands must somewhere further down the stack trigger the purging of the memory associated with the incoming messages that are to be dropped as ZMQ_CONFLATE is turned on.

Its an odd use case but maybe there should be a timer that can trigger the process_commands if in conflate mode and recv is not being called. Or is there another way to trigger the cleanup and conflate?

@laplaceyang

This comment has been minimized.

Copy link
Contributor

@laplaceyang laplaceyang commented Jun 26, 2018

You can add a timer for call getsockopt(ZMQ_EVENTS) in user code. Since function getsockopt with param ZMQ_EVENTS will trigger process_commands also as to prefetch the msg. Thus this action is same as call the recv, but actually not recv any msg. @jamesdillonharvey @wangkf1

@jamesdillonharvey

This comment has been minimized.

Copy link
Contributor

@jamesdillonharvey jamesdillonharvey commented Jun 26, 2018

Thanks @laplaceyang, that makes sense and it allows the caller to control how often they want the queue to be conflated.

I tried it out and it works like a charm, here is the test code which forces the memory prune to make the results clear. Even with heavy publisher the memory never climbs above the baseline.

#include "zmq.h"
#include <iostream>
#include <unistd.h>
#include <malloc.h>


int main(int argc, char* argv[])
{
   void *ctx = zmq_ctx_new ();
  //  Create a subscriber
    void *sub = zmq_socket (ctx, ZMQ_SUB);
    zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
    int conflate = 1;
    zmq_setsockopt (sub, ZMQ_CONFLATE, &conflate, sizeof(conflate));

    int rcvbuf = 4;
    zmq_connect (sub, "tcp://127.0.0.1:49152");

    uint64_t count =0;

    while(1){
        usleep(1000000);
        int event;
        size_t event_size = sizeof(event);

        // Here we are forcing the process_commands() to be triggered in zmq pipe. This causes the queued messages to be conflated.
        // This would normally not be needed as calling recv triggers the process_commands() after a threshold number of calls.
        zmq_getsockopt (sub, ZMQ_EVENTS, &event, &event_size);

        if(count%10 == 0)
        {
          printf("\n\ncount: %u\n", count);
          malloc_trim(0);
          malloc_stats();
        }
        ++count;
    }


  return 0;
}

bluca added a commit that referenced this issue Jun 26, 2018
* More info on conflate queue
@bluca bluca closed this Jun 26, 2018
wangkf1 added a commit to SpikeGadgets/TrodesNetwork that referenced this issue Jun 26, 2018
MohammadAlTurany added a commit to FairRootGroup/libzmq that referenced this issue Jan 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.