100% CPU usage with zeromq Push socket with 0 listeners #55

Closed
solatis opened this Issue Aug 10, 2014 · 13 comments

Comments

Projects
None yet
3 participants

solatis commented Aug 10, 2014

I originally submitted this as a StackOverflow question, but at this point I feel submitting this as an issue here is more appropriate.

I am currently in the process of implementing a communication pipeline between several processes using ZeroMQ, all using the Push/Pull mechanism. The pipeline starts with a 'ventilator' that generates tasks, and here is where my problem also starts: ZeroMQ seems to be using 100% CPU load when no workers are connected.

Here is the code in question, which attempts to send just one message:

module Main where

import System.ZMQ4.Monadic
import Data.ByteString.Char8 (pack)

main :: IO ()
main = do
     runZMQ $ do
            publisher <- socket Push
            bind publisher "tcp://*:10150"

            send publisher [] (pack "foo")

            close publisher

As you can see, this code is extremely simple and just attempts to send the message "foo" to any subscriber. I would expect this code to queue this message in the background, but instead it appears to get into a never-ending loop with the send command; when I strace the process, this is what I see happening:

poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
--- SIGVTALRM {si_signo=SIGVTALRM, si_code=SI_TIMER, si_pid=0, si_uid=0, si_value=0} ---
rt_sigreturn()                          = 1
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
--- SIGVTALRM {si_signo=SIGVTALRM, si_code=SI_TIMER, si_pid=0, si_uid=0, si_value=0} ---
rt_sigreturn()                          = 1

This is why I believe this might be a bug, or a problem in my code. There is an example in the zguide that is similar to what I'm trying to achieve: https://github.com/imatix/zguide/blob/master/examples/Haskell/taskvent.hs

In this example, they explicitly request user input to start sending (specifically, 'press enter when workers are ready') -- is this the way to work around this problem?

To elaborate, the following program (with a connected listener) works perfectly:

module Main where

import System.ZMQ4.Monadic
import Data.ByteString.Char8 (pack, unpack)
import Control.Applicative ((<$>))

main :: IO ()
main = do
     runZMQ $ do
            publisher <- socket Push
            receiver  <- socket Pull

            bind    publisher "tcp://*:10150"
            connect receiver "tcp://127.0.0.1:10150"

            send publisher [] (pack "foo")    
            message <- unpack <$> receive receiver

            liftIO $ putStrLn ("received data: " ++ message)

@twittner twittner added a commit that referenced this issue Aug 16, 2014

@twittner twittner Issue #55: Restructure wait loop.
The current implementation of `wait'` first retrieves the socket's
file descriptor and uses GHC's `threadWaitRead` and `threadWaitWrite`
to wait. Then it checks the 0MQ socket events if writing or reading
could be done in a non-blocking way.

AFAICS the file descriptor should only be polled for read events, as it
most likely isn't the actual FD where data is sent through. This aligns
with what api.zeromq.org states:

  "[...] the ØMQ library shall signal any pending events on the socket
  in an edge-triggered fashion by making the file descriptor become
  ready for reading." [1]

The behaviour described in #55 is caused by using `threadWaitWrite` on
this FD which immediately returns, but asking for `ZMQ_POLLOUT` events
is answered negatively as no clients have connected. Hence the loop is
restarted ad infinitum.

A related issue is the edge-triggered fashion in which the FD is
signaled. Currently we might wait longer than necessary for subsequent
messages as we always wait on the file descriptor first which would be
correct for level triggering.

This commit changes the behaviour in that we always inspect the 0MQ
socket events first. If they match with what has been requested we
directly continue writing or reading. Only if this test fails do we wait
with `threadWaitRead` for the underlying FD to become ready.

---
[1] http://api.zeromq.org/4-0:zmq-getsockopt#toc24
4a6bc23
Owner

twittner commented Aug 16, 2014

Thank you for your report. The Push socket type you are using blocks in "mute state". A different socket type (e.g. Pub) will drop the message and the program exists immediately. Insofar the behaviour is correct were it not for the 100% CPU consumption. The problem seems to be the combined usage of threadWaitWrite and 0MQ socket event checking. Commit 4a6bc23 is an attempt to fix this. If you test this code you should still observe the same blocking behaviour, but CPU usage should drop to 0.

solatis commented Aug 17, 2014

Thank you Toralf, sorry for being unclear: the 'blocking' behaviour was what I was expecting, of course -- I just didn't expect it to use 100% CPU while blocking.

Thanks for the fix, I will attempt to test it ASAP.

mwotton commented Aug 26, 2014

Another angle on this:


{-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
module Main where

import           Control.Concurrent       (threadDelay)
import           Control.Concurrent.Async (async)
import           Control.Concurrent.Async (wait)
import           Control.Monad            (forever)
import qualified Data.ByteString.Char8    as BS
import           Data.IORef               (newIORef)
import           Data.IORef               (atomicModifyIORef')
import           Data.List.NonEmpty       (NonEmpty (..))
import           Data.Maybe               (fromJust)
import qualified System.ZMQ4              as Z

main = do
    r2 <- newIORef (0::Integer)

    z <- async $ forever $ do
      Z.withContext $ \zmq_c ->
        Z.withSocket zmq_c Z.Push $ \push -> do

          let go n = do
                n <- atomicModifyIORef' r2 (\x -> (x+1,x))
                putStrLn $ show n
                Z.sendMulti push (BS.pack (show n):|[])
                go (n-1)
          Z.bind push "tcp://127.0.0.1:7777"
          Z.sendHighWM push >>= \x -> print ("HWM before", x)
          Z.setSendHighWM  (fromJust $ Z.toRestricted 1) push
          Z.sendHighWM push >>= \x -> print ("HWM", x)
          go 1000

    Z.withContext $ \zmq_c ->
      Z.withSocket zmq_c Z.Pull $ \pull -> do
        Z.setReceiveHighWM  (fromJust $ Z.toRestricted 1) pull
        Z.connect pull "tcp://127.0.0.1:7777"
        forever $ do
          [dom] <- Z.receiveMulti pull
          threadDelay 1000000
          print ("got", dom)

    wait z
    return ()

my expectation of this code is that the sender will only send one message, as the buffer fills up immediately, and then will have to wait for the pull socket. The actual behaviour is that the pusher keeps writing without bound. I will try 4a6bc23

mwotton commented Aug 26, 2014

ok, tried the branch - same behaviour.

Owner

twittner commented Aug 26, 2014

Given the small size of your messages I think you need to lower the send and receive buffers to observe the effect of your watermark settings.

mwotton commented Aug 26, 2014

      Z.setSendHighWM  (fromJust $ Z.toRestricted 1) push

isn't that the lowest the buffer can go?

Owner

twittner commented Aug 26, 2014

I am referring to ZMQ_RCVBUF and ZMQ_SNDBUF.

mwotton commented Aug 27, 2014

{-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
module Main where

import           Control.Concurrent       (threadDelay)
import           Control.Concurrent.Async (async)
import           Control.Concurrent.Async (wait)
import           Control.Monad            (forever)
import qualified Data.ByteString.Char8    as BS
import           Data.IORef               (newIORef)
import           Data.IORef               (atomicModifyIORef')
import           Data.List.NonEmpty       (NonEmpty (..))
import           Data.Maybe               (fromJust)
import qualified System.ZMQ4              as Z

main = do
    r2 <- newIORef (0::Integer)

    z <- async $ forever $ do
      Z.withContext $ \zmq_c ->
        Z.withSocket zmq_c Z.Push $ \push -> do

          let go n = do
                n <- atomicModifyIORef' r2 (\x -> (x+1,x))
                putStrLn $ show n
                Z.sendMulti push (BS.pack (show n):|[])
                go (n-1)
          Z.bind push "tcp://127.0.0.1:7777"
          Z.sendHighWM push >>= \x -> print ("HWM before", x)
          Z.setSendHighWM  (fromJust $ Z.toRestricted 1) push
          Z.setSendBuffer   (fromJust $ Z.toRestricted 1) push

          Z.sendHighWM push >>= \x -> print ("HWM", x)
          go 1000

    Z.withContext $ \zmq_c ->
      Z.withSocket zmq_c Z.Pull $ \pull -> do
        Z.setReceiveHighWM  (fromJust $ Z.toRestricted 1) pull
        Z.setReceiveBuffer   (fromJust $ Z.toRestricted 1) pull
        Z.connect pull "tcp://127.0.0.1:7777"
        forever $ do
          [dom] <- Z.receiveMulti pull
          threadDelay 1000000
          print ("got", dom)

    wait z
    return ()

Still sends 96500 messages before blocking.

Owner

twittner commented Aug 27, 2014

You need to apply those settings before connect/bind to have an effect. Cf. http://api.zeromq.org/4-0:zmq-setsockopt#toc1. Also socket buffer sizes are usually larger than 1 byte. I have modified your example slightly:

{-# LANGUAGE OverloadedStrings #-}

module Main (main) where

import Control.Concurrent (threadDelay)
import Control.Monad
import Data.ByteString.Char8 (pack)
import Data.Time
import System.ZMQ4.Monadic

recvDelay :: Int
recvDelay = 3000000

main :: IO ()
main = do
    putStrLn $ "receive delay = " ++ show recvDelay
    runZMQ $ do
        push <- socket Push
        setSendHighWM (restrict 1) push
        setSendBuffer (restrict 8192) push

        pull <- socket Pull
        setReceiveHighWM (restrict 1) pull
        setReceiveBuffer (restrict 8192) pull

        bind    push "tcp://127.0.0.1:7777"
        connect pull "tcp://127.0.0.1:7777"

        void . async $ loop push

        forever $ do
            liftIO $ threadDelay recvDelay
            void   $ receive pull
            showTime "R @ "
  where
    loop s = do
        let msg = pack (replicate 256000 'x')
        forever $ do
            send s [] msg
            showTime "S @ "

    showTime pref =
        liftIO $ putStrLn . (pref ++) . show =<< getCurrentTime

Running it yields:

$ /tmp/test 
receive delay = 3000000
S @ 2014-08-27 18:56:52.725888 UTC
S @ 2014-08-27 18:56:52.726498 UTC
S @ 2014-08-27 18:56:52.728965 UTC
S @ 2014-08-27 18:56:52.73134 UTC
R @ 2014-08-27 18:56:55.699119 UTC
S @ 2014-08-27 18:56:55.699861 UTC
R @ 2014-08-27 18:56:58.704432 UTC
S @ 2014-08-27 18:56:58.705356 UTC
R @ 2014-08-27 18:57:01.70867 UTC
S @ 2014-08-27 18:57:01.709735 UTC
R @ 2014-08-27 18:57:04.712766 UTC
S @ 2014-08-27 18:57:04.713723 UTC
R @ 2014-08-27 18:57:07.717016 UTC
S @ 2014-08-27 18:57:07.718061 UTC
R @ 2014-08-27 18:57:10.721206 UTC
S @ 2014-08-27 18:57:10.722257 UTC
R @ 2014-08-27 18:57:13.725467 UTC
S @ 2014-08-27 18:57:13.726532 UTC
R @ 2014-08-27 18:57:16.729036 UTC
S @ 2014-08-27 18:57:16.729919 UTC
^C

mwotton commented Aug 28, 2014

Ah, terrific. Yes, I was just putting them at the minimum I could to make it clear that they weren't being applied... I wonder if it would be possible to enforce that ordering in the typesystem? Backwards incompatible, of course.

Owner

twittner commented Aug 28, 2014

Right. I just wanted to point out that I am not sure if a value of 1 would be honoured by the OS. The socket man page on Linux states for instance: "SO_SNDBUF [...] The minimum (doubled) value for this option is 2048".

Regarding the usage of types to ensure relevant options are applied to a socket in the right order I think it might be possible. Not sure though when I can get around to implementing it.

Anyway, back to the current issue. Can I assume that the current master branch has resolved the problem? @solatis?

Owner

twittner commented Aug 29, 2014

Fixed with version 0.6.1 which has been published to hackage.

twittner closed this Aug 29, 2014

solatis commented Sep 8, 2014

@twittner Sorry for the late reply, but I can confirm that your patch fixes this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment