Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PUB/SUB] Subscriber slow motion video (queue keeps growing) #27

Closed
BrandonYuen opened this issue Nov 19, 2019 · 36 comments
Closed

[PUB/SUB] Subscriber slow motion video (queue keeps growing) #27

BrandonYuen opened this issue Nov 19, 2019 · 36 comments

Comments

@BrandonYuen
Copy link

BrandonYuen commented Nov 19, 2019

Hi there,

Thanks for your work on ImageZMQ, it's been very useful for me!

I'm using a project setup which has:

  • A server that sends processed opencv frames via ImageZMQ to a receiver
  • A pi that shows the received frames on screen

I've successfully implemented the REQ/REP pattern into my project and it works well. The only issue for me is that the REQ/REP pattern is blocking the server from processing as many images as it can. Because it's waiting for the receiver for the OK reply at every frame.

This is when I started trying the PUB/SUB pattern. For the server this works great. However, when I use PUB/SUB the video plays in slow motion on the receiver. With slow motion I mean that it's queue'ing all the frames it gets, but probably isn't fast enough to display all the frames it gets from the server. This creates an every growing queue of images. I've also tried it on a stronger machine (macbook), but it's the same result.

Any tips or ideas on how I could solve my issue? Any help is much appreciated!

Edit
I've changed my code a bit by re-instantiating the ImageHub object every loop iteration (instead of just once before the while (true) loop), and it seems to get rid of the queue-problem. It doesn't play in slowmotion anymore! However, I wonder if this is really the best solution; because re-instancing ImageHub every loop doesn't seem the most efficient way?

Before (queue/latency growing):

imageHub = imagezmq.ImageHub(open_port='tcp://{}:5555'.format(args["server"]), REQ_REP=False)
while true:
     rpiName, frame = imageHub.recv_image()
     cv2.imshow("Window", frame)

After (steady latency):

while true:
     imageHub = imagezmq.ImageHub(open_port='tcp://{}:5555'.format(args["server"]), REQ_REP=False)
     rpiName, frame = imageHub.recv_image()
     cv2.imshow("Window", frame)
@jeffbass
Copy link
Owner

Thanks for your testing of the PUB/SUB pattern and reporting the slowdown on the receiver. This is the first I've heard of this problem. Frankly, I don't know what could be causing it. And it is counterintuitive that re-instantiating the Hub fixes it. I'm going to leave this issue open to see if anyone else responds to it.
One hypothesis I have is that the latency might be related to cv2.imshow() interacting somehow with ZMQ, but I have no specific evidence for that. One experiment might be to just count frames received when cv2.imshow() is commented out versus counting frames received when it is being used in your latency growing example. That would rule out a cv2.imshow() / ZMQ interaction. I'll try some testing myself over the next few weeks. I am doing some timing testing on PUB/SUB and will try some experiments with this. I will report back on this thread, but not likely before 2 weeks or so.

@BrandonYuen
Copy link
Author

No problem, I will also do some more testing to see if I can figure something out and let you know the results.

@oggyjack
Copy link

Hi Jeff,
Thanks a lot for this great code. I am only beginner on Python world, but I see clearly great solution out there. I wish as well to use picams to supply frames to imagezmq hub, but I do not want the pi client script to block if server goes down, or at least a way to restart both when server restarts. So I had a look at the pubsub option, which works indeed well except the the very slow motion fact as for BrandonYuen. Before implementing the imagehub re-instantiating in loop as done by Brandon, I was wondering if you add any explaination of the issue. Regards, T

@zarar7576
Copy link

maybe ZMQ_CONFLATE ? Somewhere or somehow may solve the issue?

@jeffbass
Copy link
Owner

I am not a user of the PUB/SUB pattern, but I did see this on StackOverflow that might be helpful:
https://stackoverflow.com/questions/25523231/zmq-latency-with-pub-sub-slow-subscriber

@jeffbass
Copy link
Owner

If what you are trying to accomplish is to have the image sending program be robust to a server restart or similar glitch, I handle that in my imagenode program using the REQ/REP pattern and using a signal timer to detect a non-response. That is what I do in my imagenode program:

            while not node.send_q:
                node.read_cameras()
            while len(node.send_q) > 0:  # send until send_q is empty
                try:
                    with Patience(settings.patience):
                        text, image = node.send_q.popleft()
                        hub_reply = node.send_frame(text, image)
                except Patience.Timeout:  # if no timely response from hub
                    log.info('No imagehub reply for '
                        + str(int(settings.patience)) + ' seconds')
                    hub_reply = node.fix_comm_link()
                node.process_hub_reply(hub_reply)

The Patience() class, when instantiated, starts a timer using the system SIGALRM signal. It is used in a with clause to allow a blocking task to be interrupted if it does not return in specified number of seconds (settings.patience is the number of seconds to wait before giving up and fixing the comm link).

The event loop code is here:
https://github.com/jeffbass/imagenode/blob/master/imagenode/imagenode.py
And the Patience class to restart the client after a server non-response is here (at line 45):
https://github.com/jeffbass/imagenode/blob/master/imagenode/tools/utils.py

@DaveXanatos
Copy link

DaveXanatos commented Mar 6, 2020

I am having this same issue with ImageZMQ. In a receiver script with no significant image processing, the video is displayed with no increasing lag. However when the receiving script processes the image for face recognition, the processed frame rate is about two per second, but it is processing from queued frames, rather than just grabbing what frame is ready when it's ready to chew on another new frame. As a result, the queue grows until the system eventually freezes. I am resizing frames to 400px on the TX side and as I mentioned earlier, receiving this and displaying the window without the processing is quick.

If I set a frame send delay on the TX side of .5 (2 frames/second) the receiver keeps up, but anything more and it bombs out eventually.

What I'd like is to have the TX side maintain its framerate, and have the RX side NOT queue images, but just grab the next frame coming over the link when it's ready. In this way, other nodes in my system that need to receive and process the image can do so at their own rate, while the slower scripts can crunch along at whatever frame rate they need but not have to drown in queued frames. Is this possible? Some way to have ImageZMQ only take every 5th frame or something? Alternately, is there a way I can clear the queue in each loop, so it gets frame, processes frame, displays frame, clears queue, gets new frame, repeat... ? Thanks.

I have created a video that shows clearly the difference between streamed video without significant processing, and streamed video that is live-processed, both using ImageZMQ. Link is
https://youtu.be/664LyQNgHTo

Thanks for your attention.

@jeffbass
Copy link
Owner

Hi @DaveXanatos , I really appreciate your video and the time you took to make it. I don't believe that the imagezmq library is actually the root cause of the frames backing up. I believe that ZMQ is backing up the frames at the SUB end of its PUB / SUB protocol. It is probably related to the size of the frames (not that your frames are too large, but most ZMQ PUB / SUB usage is for small message packets). As a first step, would you be willing to modify your program to use the REQ / REP protocol? It shouldn't be too hard to create a version of your programs that use REQ / REP. If that fixes the problem with the queue backing up, then that at least narrows it down to the ZMQ PUB / SUB protocol. If you are willing to try that, let me know if it works. It may be we need to file an issue with ZMQ. I don't know of a way to get ZMQ to flush the SUB queue on the receiving end (though there may be one). Let me know what your performance is like with REQ / REP and we'll proceed from there.
Jeff

@hjinlee88
Copy link

maybe ZMQ_SNDHWM will be a solution?

@DaveXanatos
Copy link

Thanks very much Jeff. I will have a video for you this evening using REQ/REP. I agree that it's the SUB side that's backing up. My intent is to stream the video frames in a manner where fast digesters (like the frame displays upper-left in my initial video) can process without delays induced by slower digesters (like face recognition) operating on the same frame stream. Several Raspberry Pies will be watching the video streams, all with several scripts looking for and processing different aspects of the stream to create a robust set of interpretations of what the robot is "seeing" (face recognition, object detection, OCR processing with Tesseract, etc) ImageZMQ seems like the perfect functionality here if we can solve the SUB side backups. I appreciate your working with me, very much.
Dave

@DaveXanatos
Copy link

Hi jeffbass,
Thanks very much for helping me figure this out. Here is the overview of what the video will show:

1: When running REP/REQ, the images do NOT que up, and frame processing runs around 3 FPS.
2: When running REP/REQ, all other scripts that attempt to connect the broadcast image screen bomb out with "address already in use". This effectively makes REQ/REP unusable for my applications as it seems (unless I'm doing something wrong... always possible) to eliminate the possibility of multiple scripts watching the same image stream.

I tried changing start orders, etc., but with REQ/REP mode, a "one to many" function seems to not be possible, but the "one" that is connected runs at an acceptable frame rate and does not queue up frames leading to a system crash.

With the PUB/SUB mode, a "one to many" scenario can be established, but the frame backup on any slow SUB leads to a system crash (not to mention uselessly slow operation of the frame processing script)

I feel frustratingly close to getting this to function perfectly :) Each piece offers partially correct function, but neither allows for fully correct function to allow for both multiple scripts to watch the stream, and to allow slow scripts to not hang the system and operate on old queued up frames that are useless.

Truly, honestly appreciate your time and thought on this issue. Let me know whatever you need me to do to help to figure this out. Thank you!

Dave

VIDEO LINK: https://youtu.be/jQGz_nnZm7M

@jeffbass
Copy link
Owner

Hi @DaveXanatos ,
You're doing a great job of tracking this down. I've also been doing some reading on stackoverflow and in the ZMQ documentation specific to PUB SUB. Here's what I believe is going on and what might be best to try next.

  1. You are correct in your observation that "one to many" won't work with REQ/REP. Each REQ connection can send to one and only one REP server. An existing REP server can receive from many REQ senders, but not the other way around. The many REQ senders that send to the same REP receiver is my own use case. I have 8-10 Raspberry Pi's sending to a single REP hub that is saving the images from all of them. REQ/REP "many to one" is the exact reason I wrote imagezmq in the first place. It fits my own projects exactly. BUT, if you need multiple recipients to watch one stream, REP/REP is not going to work for you.
  2. PUB/SUB should be perfect for you. It is designed for "one to many", but you and others (above) have found a slow SUB receiver builds up a large and unreliable queue and eventually crashes. I have done some searches of stackoverflow and of the ZMQ documentation. The PUB/SUB "slow subscriber building an excessive queue" has a number of questions on stackoverflow and a special case documentation in the ZMQ docs.
    If you search "zmq sub queue slowing down", you get led to a number of stackoverflow questions from folks that are have the exact issue you are having. Some are in C++, some are in Python, but they are all basically the same issue you are having. You may want to take a look at some of them. But I learned the most when reading this section of the ZMQ docs: http://zguide.zeromq.org/php:chapter5#toc4 . Basically, a slow subscriber can cause the exact problems you are having and the "fix" suggested is the for slow subscriber to watch for slowdowns and then restart itself. Seems like overkill to me. You'll see above in this issue thread that @BrandonYuen found a "restart the SUB side" workaround that eliminated the queueing problem. He moved the imagehub instantiation inside the frame receiving loop. What that does is close and restart the SUB side before reading every frame. It is overkill and inefficient, but it stopped the excessive queueing because, like the ZMQ docs link above says, killing the process (or the ZMQ SUB instance in the process) restarts the SUB queue. Definitely NOT an elegant or optimal solution. But restarting the ZMQ SUB link every "X" number of frames received should fix it in the slow subscriber.
    Here's a suggestion for your next step in debugging this:
  3. Go back to PUB /SUB. It is the ZMQ pattern that will work for you.
  4. Leave your SUB receivers that are not having problems as they are; they should work fine.
  5. In your SUB receiver that is building up the long queue and lagging:
    1. In your frame receiving loop, add a frame counter that counts frames received.
    2. Every 3 frames, reinstantiate the imagehub (thus restarting it) and set the frame counter back to 0.
    3. Experiment and adjust. Maybe reinstantiating every 2 frames or every 5 frames works better, but it is likely that some low number will fix your queueing problem. In @BrandonYuen's code snippet at the start of this issue, he is reinstantiating the imagehub SUB receiver every single frame. I think every 3rd (or 2nd or 5th) frame will be enough for your application. You are effectively "throwing away" frames to prevent the lag. It is a very inelegant way to do it, but it is likely to work.

What this workaround is doing is fixing the SUB latency queuing problem by restarting the SUB receiver. An easy way to do this is re-instantiate the imagehub SUB receiver before the queue gets "bad". It only needs to be done in the the program that is actually experiencing the lag, not the other programs.

This is definitely NOT the longer term way to solve this problem. I suspect that it may be better to set the HWM (HIgh Water Mark) option in the slow subscriber to ZMQ_RCVHWM=1, forcing the SUB receiver to drop any inbound frames when the queue is still full from the last one. You would only set this option on the slow subscriber. Another possibility is to use the ZMQ_CONFLATE option, which is a flag to keep only the last frame, discarding all others (It was mentioned by @zarar7576 above). This is not likely to work with imagezmq because imagezmq is sending compound messages composed of 2 parts (text and image). Setting ZMQ_SNDHWM (mentioned by @hjinlee88 above) is less likely to help because it would have to be set in the PUB side, which is not where you are having a problem. In all the reading I have done, this is a problem with ZMQ slow subscribers, not with ZMQ publishers. So you would want to change HWM on the SUB side. Some options, like "HWM" are available as attributes in the imagezmq classes. For example, once imagehub is instantiated, print(imagehub.zmq_socket.hwm) prints "1000", the default HWM size.

I suggest you try the steps above. That will answer the question whether re-instantiating the imagehub instance every 3 (or more or fewer) frames on the slow SUB subscriber prevents the queue lagging problem. Let me know how that works out.
Jeff

@DaveXanatos
Copy link

Hi @jeffbass I'm uploading the video of tonight's testing now. It should be up by about 8:45pm EDT. THANK YOU for your detailed and thoughtful reply above. Much for me to work with. Spoilers: HWM did nothing, but putting the instantiation of image_hub inside the loop does make it workable for now, even if it is utterly repulsive aesthetically :) My hope is that ZMQ makes it so that the SUB side's HWM setting ignores incoming frames if it's still processing one...

I'll post when the video link is ready.

Dave

@DaveXanatos
Copy link

The video is ready at https://youtu.be/WmLbb9kVUng

As always, thank you for being so helpful with getting this issue resolved.

Dave

@jeffbass
Copy link
Owner

Thanks again for your video. Moving the instantiation inside the frame loop is a very inefficient way to start with a new queue each frame (and throw away frames that would have caused the queue to become slow). But having a new SUB queue for each frame actually seems to work. Each reinstantiation is using the same port and so it is not likely to be causing some memory or port leaks. The only purpose of the test was to see if eliminating the SUB queue on your slow SUB would fix the problem. It looks like it did. I am still hopeful that there is a more elegant and effective way to keep the SUB queue from growing. But I think the test has shown that the slow SUB receiver is indeed the problem. While you did set the HWM correctly in your other test program, I'm pretty sure it only takes effect if you close the connection and reopen it. Let me check on that and get back to you.
Jeff

@jeffbass
Copy link
Owner

I think setting the HWM in the slow SUB image receiver is worth trying again. From the documentation on PyZMQ:

New values only take effect for subsequent socket bind/connects.

That means that when you tried to set the HWM in your test, it did not actually affect the socket since it was not a "subsequent socket connect". So, try instantiation before the loop (and don't have any instantiation inside the frame loop) by trying something like this:

imagehub = imagezmq.ImageHub(open_port='tcp://127.0.0.1:5555', REQ_REP=False)
imagehub.zmq_socket.disconnect('tcp://127.0.0.1:5555')
imagehub.zmq_socket.set_hwm = 2  # try the value 2 or 4 or 8; imagezmq uses 2 messages per send
imagehub.zmq_socket.connect('tcp://127.0.0.1:5555')

It is important to specify the address explicitly if you are going to disconnect and reconnect in this way. Defaults won't behave correctly. It would be OK to have the address in a string variable.

Let me know what happens when you try this.

@DaveXanatos
Copy link

Thanks again for these great replies. What you're saying makes sense and I'll have another video (hopefully shorter and not as rambling lol) this evening. If this HWM test is successful it will also mean that almost every place I've seen people trying to use it are doing it wrong (ie., like you saw in my code test). HWM would be a much better way to do this than eternal reinstantiation. We should know by this evening. Thanks again!

@DaveXanatos
Copy link

DaveXanatos commented Mar 12, 2020

Video complete. HWM did not fix the issue unfortunately. I should have the video up in an hour or so (dinner time... :) )

Thanks again!

@DaveXanatos
Copy link

Here is the video link with the demo, code, commentary, etc. https://youtu.be/WTDTK8tX7mQ
Thanks!

Dave

@jeffbass
Copy link
Owner

@DaveXanatos , Thanks again for your video and willingness to experiment. I was hoping HWM on the SUB side would solve the problem. Sorry that it did not.

Where to go from here? As a temporary and very ugly workaround, you can use the re-instantiation inside the frame loop. If you do that, I would not re-instantiate the image hub every frame, but would use a frame counter and an if statement that does the re-instantiation every 2 or 4 or 6 or ? frames. How many would be determined by experiment. Other than that, I have to honest that I am out of ideas to try. I don't have enough PUB / SUB experience to be helpful with deeper testing in the PUB / SUB arena.

I do not think the core ZMQ developers see this as a bug they want to spend time on fixing because they have already produced a recommended workaround: The Suicidal Snail description in the ZMQ Guide makes their position clear: http://zguide.zeromq.org/php:chapter5#toc4 . It offers a number of recommended workarounds that all involve have the application code restart itself in some way. Or having the publisher throttle their speed of sending messages in some way. I have also not seen any better solutions on stackoverflow or elsewhere.

You might want to consider setting up a deque with a small maxlen that receives frames at full speed in a separate thread. The frames would be received into the deque at full speed and be thrown away when deque reaches maxlen. Then your slow subscriber would read the last frame from the deque whenever it is ready for a new frame. I use the Python deque in my own projects in this way.

My best current understanding of this issue is: imagezmq PUB / SUB is not a good solution when there are slow subscribers because the underlying ZMQ library is not a good solution when there are slow subscribers. I did not know that when I did my own testing of the PUB / SUB option because I did not test it with any slow subscribers. The PUB / SUB option in imagzmq was contributed by @bigdaddymax, who did a great job with the code contribution. We both did testing, but did not encounter the slow subscriber issue. This issue thread is evidence that you and others have found a significant issue with slow SUB receivers. I am going to leave this issue open so that others can be aware of the problem and hope that another user more experienced with PUB / SUB will have a suggestion for a solution. I will also add a description of this problem to the imagezmq PUB / SUB documentation.

If you come up with a better PUB /SUB solution as you continue development of your project, please comment back in this thread. If it involves a ZMQ alternative like MQTT or RabbitMQ, please let us know how that works out. If it involves a deque in a thread, let us know that, too.

Thanks again for all your work on this. Sorry we don't have a good solution yet.
Jeff

@DaveXanatos
Copy link

Well crap. :) This is one of the saddest things I've read in a while... I'll keep playing around, see what I can do. On the bright side, glad I got to know you a little. My wife & I have a small organic farm here, and I have a recent background in biotech. Hopefully we can catch up on FB.

Thanks again for walking through this with me, it was a truly valuable experience, and at least I know I'm not alone in this experience. It seems like the core ZMQ folks could solve this easily.... but perhaps they are satisfied with inelegant workarounds... They probably have bigger fish to fry...

Dave

@jeffbass
Copy link
Owner

Looking forward to catching up on FB. Love my small organic farm (2 acres in Southern California).
As for the ZMQ folks, they are a great crew. They do most of their work for their core users who are passing lots of small messages in all kinds of ways really fast. ZMQ is used much less for large messages like images, so I don't think the Slow Subscriber problem is one of their top priorities. Good luck with your project! I'll be following along with interest.

@jeffbass
Copy link
Owner

I have added the PUB / SUB slow subscriber issue discussed in this thread to the imagezmq documentation. I am hopeful that someone will come up with a solution. Leaving this issue open until we find a better solution than just restarting the slow subscriber.

@DaveXanatos
Copy link

DaveXanatos commented Mar 21, 2020

Hi Jeff,

I have added another video with some further behavioral oddities related to the differences between the original Raspberry Pi CM3 running Stretch and the newer Pi 4 (4G) running Buster. While not strictly only related to ImageZMQ, it does affect it to a great deal. The video and complete text description of the observed issues is up at https://youtu.be/mTo28NhR-aQ

If you have time and can review and offer any suggestions or observations, I would be very appreciative.

I hope you and all your loved ones are staying healthy & safe!

UPDATE: I fixed the imshow window by using these lines (primarily the cv2.WINDOW_GUI_NORMAL flag), although it had no appreciable effect on the delay vs. the CM3 running Stretch:

test_img = np.zeros(shape=(300,400,3)).astype('uint8')
cv2.namedWindow('OD Frame', cv2.WINDOW_GUI_NORMAL)
cv2.imshow('OD Frame',test_img)
cv2.moveWindow('OD Frame',525,0)
cv2.waitKey(1)

Still trying to figure out what's making the Pi 4 slower than the Pi 3. Makes no sense.

@jeffbass
Copy link
Owner

Hi Dave,
Watched the video. What versions of OpenCV are you running on the Pi CM3 vs. the Pi 4 (4G)?

@philipp-schmidt
Copy link
Contributor

Hello,

I'm not sure about all the details, but reading this thread I feel like you should try to offload receiving the messages on the pi to a different thread and then access the received frames from another thread to display them. Lock a mutex of course.

This way the pi will probably not display every image and skip a few (as it might not be capable to do that anyway though), but it won't become a "slow subscriber".
This pattern is described here for zmq, but is quite common in any near-realtime IO application.

I'll put together a few lines of code later. Gotta do it anyway for a different project.

@philipp-schmidt
Copy link
Contributor

philipp-schmidt commented Apr 17, 2020

I think it's crucial to know, that even if the pi will not slow down the PUB/SUB after this fix, a slow connection between server and pi might still lead to a similar behaviour...

@philipp-schmidt
Copy link
Contributor

philipp-schmidt commented Apr 18, 2020

This should do the trick:

class VideoStreamSubscriber:

    def __init__(self, hostname, port):
        self.hostname = hostname
        self.port = port
        self._stop = False
        self._data_ready = threading.Event()
        self._thread = threading.Thread(target=self._run, args=())
        self._thread.daemon = True
        self._thread.start()

    def receive(self, timeout=15.0):
        flag = self._data_ready.wait(timeout=timeout)
        if not flag:
            raise TimeoutError(
                f"Timeout while reading from subscriber tcp://{self.hostname}:{self.port}")
        self._data_ready.clear()
        return self._data

    def _run(self):
        self.zmq_context = SerializingContext()
        self.zmq_socket = self.zmq_context.socket(zmq.SUB)
        self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
        self.zmq_socket.connect(f"tcp://{self.hostname}:{self.port}")
        while not self._stop:
            self._data = self.zmq_socket.recv_array()
            self._data_ready.set()
        self.zmq_socket.close()

    def close(self):
        self._stop = True

As @jeffbass correctly pointed out though:

I have learned that multiprocessing may be a better choice, since the RPi has 4 cores and with Python threading, only 1 core is used for all the threads.

So this might not be the ultimate solution for a pi, but maybe worth a try?
The pi will still receive all frames, but only the most recent is returned in receive(). The events make sure a frame can not be read twice.

@jeffbass
Copy link
Owner

Thanks @philipp-schmidt!
I'm going to play with that code example. Would you like to pull together an example pair of test programs similar to the test programs timing* in the /tests folder of this repository? I've created a branch for submitting examples that might serve as a template. See the 'pub-sub-broadcast' branch of this code repository.
Thanks again,
Jeff.

@philipp-schmidt
Copy link
Contributor

Hey,

put together a minimal example and docu, pull request #34.

Let me know what you think.

Best,
Philipp

@jeffbass
Copy link
Owner

Hello @BrandonYuen, @oggyjack, @hjinlee88, @zarar7576, @DaveXanatos,
I just merged @philipp-schmidt's example programs and documentation into the Master branch. They work well even with deliberately inserted receiver / subscriber delays. There is no growing ZMQ queue or other slowdowns. His examples use an elegant threaded class that fixes the "slow subscriber" problem discussed in this issue. His example programs are in the /examples folder. His documentation file is Advanced PUB/SUB example with multithreaded fast subscribers for realtime processing. The main README.rst has the link to the documentation file, also. I tested his example programs on multiple combinations of RPi's and Macs with both webcams and PiCameras. They worked well in all combinations I tried. Try it out and provide updates in this thread if it fixes your problem. Or let us know if it doesn't.
Thanks, (and thanks to @philipp-schmidt!)
Jeff

@jeffbass
Copy link
Owner

Closing this one for now. Philipp's fix is documented and mentioned in the examples and FAQs. Thanks again for the great contribution @philipp-schmidt !

@victorjourne
Copy link

To prevent from growing queue in PUB/SUB pattern in case of slow receiver, the option CONFLATE=1 enables to keep only the latest message.
But we couldn't easily use it in imagezmq because the sender send actually 2 elements. A json with some image info and the image.

My solution here was to overload the ImageSender and ImageHub init_pubsub class to add some extra parameters in order to keep the queue short, with the RCVHWM and SNDHWM options.

    def init_pubsub(self, address):
        socketType = zmq.PUB
        self.zmq_context = imagezmq.SerializingContext()
        self.zmq_socket = self.zmq_context.socket(socketType)
        self.zmq_socket.setsockopt(zmq.SNDHWM, 2)
        self.zmq_socket.bind(address)
        # Assign corresponding send methods for PUB/SUB mode
        self.send_image = self.send_image_pubsub
        self.send_jpg   = self.send_jpg_pubsub

and

   def init_pubsub(self, address):
      socketType = zmq.SUB
      self.zmq_context = imagezmq.SerializingContext()
      self.zmq_socket = self.zmq_context.socket(socketType)
      self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
      self.zmq_socket.setsockopt(zmq.RCVHWM, 2)
      self.zmq_socket.connect(address)

@philipp-schmidt
Copy link
Contributor

philipp-schmidt commented Sep 30, 2021

Hi,

can you explain in more detail how the conflate option works? How exactly does it replace the data in receive and send buffers?

If this works as you described it briefly there might not even be a need for a seperate IO thread the way I implemented it.

@philipp-schmidt
Copy link
Contributor

philipp-schmidt commented Sep 30, 2021

So the way I understand it: for each client zmq will overwrite the "server" side send buffer if the number of queued messages exceeds the limit set by zmq.SNDHWM?

There were some discussions and experiments in this thread already and they failed to solve the issue.

@victorjourne
Copy link

Hi @philipp-schmidt.
The conflate option needs to be set before the socket connection, for the subscriber side (here in this issue, the raspberry). The subscriber keeps the last message, leading to no latency (I have no idea how exactly zmq replaces the data in this case).

However, in the imagezmq library, every image is sent with some metadata in multi-part messages, which is not supported with the conflate option.

So my solution was intended to control the size of the sending queues. If the limit has been reached the socket drops the new message.

class ImageSenderSmallQueue(imagezmq.ImageSender):
    def init_pubsub(self, address):
        socketType = zmq.PUB
        self.zmq_context = imagezmq.SerializingContext()
        self.zmq_socket = self.zmq_context.socket(socketType)
        self.zmq_socket.setsockopt(zmq.SNDHWM, 2)
        self.zmq_socket.bind(address)

        # Assign corresponding send methods for PUB/SUB mode
        self.send_image = self.send_image_pubsub
        self.send_jpg   = self.send_jpg_pubsub

# Accept connections on all tcp addresses, port 5555
sender = ImageSenderSmallQueue(connect_to='tcp://*:5555', REQ_REP=False)

You could even limit the size of the receiving queue, but I have no idea of the outcome in case of several senders...

class ImageHubSmallQueue(imagezmq.ImageHub):
    def init_pubsub(self, address):
       socketType = zmq.SUB
       self.zmq_context = imagezmq.SerializingContext()
       self.zmq_socket = self.zmq_context.socket(socketType)
       self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
       self.zmq_socket.setsockopt(zmq.RCVHWM, 2)
       self.zmq_socket.connect(address)

# Instantiate and provide the first sender / publisher address
image_hub = ImageHubSmallQueue(open_port='tcp://sender1:5555', REQ_REP=False)

I have tested it, with a slower receiver than the sender. It gives correct results with a small delay between us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants