Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix event handling.

0MQ returns events as bitmasked integrals. The old approach of using
equality comparison to figure out which event has occurred would not
work when events have been OR'ed together. This fix will utilise lists
of events to denote the possibility of multiple events, i.e. the
functions "events" now returns [Event] and "poll" uses [Event] as
interest sets.

Additionally we now accept an optional handler function for each Poll
instance which will be invoked iff events occured for which an interest
was expressed. The handlers will be called sequentially, so they should
not block and also not cause exceptions.
  • Loading branch information...
commit 07c31e47637e0e8c7567ae648415d17179d67cb1 1 parent 014776f
@twittner authored
View
89 src/System/ZMQ3.hs
@@ -1,4 +1,4 @@
-{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE GADTs #-}
-- |
-- Module : System.ZMQ3
-- Copyright : (c) 2010-2012 Toralf Wittner
@@ -206,6 +206,8 @@ import qualified Prelude as P
import Control.Applicative
import Control.Exception
import Control.Monad (unless, void)
+import Control.Monad.IO.Class
+import Data.List (intersect, foldl')
import Data.Restricted
import Foreign hiding (throwIf, throwIf_, throwIfNull, void)
import Foreign.C.String
@@ -357,17 +359,15 @@ instance Sender Push
data Event =
In -- ^ ZMQ_POLLIN (incoming messages)
| Out -- ^ ZMQ_POLLOUT (outgoing messages, i.e. at least 1 byte can be written)
- | InOut -- ^ ZMQ_POLLIN | ZMQ_POLLOUT
- | Native -- ^ ZMQ_POLLERR
- | None
+ | Err -- ^ ZMQ_POLLERR
deriving (Eq, Ord, Read, Show)
-- | Type representing a descriptor, poll is waiting for
-- (either a 0MQ socket or a file descriptor) plus the type
-- of event to wait for.
-data Poll =
- forall s. S (Socket s) Event
- | F Fd Event
+data Poll m where
+ Sock :: Socket s -> [Event] -> Maybe ([Event] -> m ()) -> Poll m
+ File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll m
-- | Return the runtime version of the underlying 0MQ library as a
-- (major, minor, patch) triple.
@@ -438,8 +438,8 @@ unsubscribe s = setByteStringOpt s B.unsubscribe
-- Read Only
-- | Cf. @zmq_getsockopt ZMQ_EVENTS@
-events :: Socket a -> IO Event
-events s = toEvent <$> getIntOpt s B.events 0
+events :: Socket a -> IO [Event]
+events s = toEvents <$> getIntOpt s B.events 0
-- | Cf. @zmq_getsockopt ZMQ_FD@
fileDescriptor :: Socket a -> IO Fd
@@ -692,7 +692,7 @@ send :: Sender a => Socket a -> [Flag] -> SB.ByteString -> IO ()
send sock fls val = bracket (messageOf val) messageClose $ \m ->
onSocket "send" sock $ \s ->
retry "send" (waitWrite sock) $
- c_zmq_sendmsg s (msgPtr m) (combine (DontWait : fls))
+ c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
-- | Send the given 'LB.ByteString' over the socket (cf. zmq_sendmsg).
-- This is operationally identical to @send socket (Strict.concat
@@ -706,7 +706,7 @@ send' :: Sender a => Socket a -> [Flag] -> LB.ByteString -> IO ()
send' sock fls val = bracket (messageOfLazy val) messageClose $ \m ->
onSocket "send'" sock $ \s ->
retry "send'" (waitWrite sock) $
- c_zmq_sendmsg s (msgPtr m) (combine (DontWait : fls))
+ c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
-- | Send a multi-part message.
-- This function applies the 'SendMore' 'Flag' between all message parts.
@@ -781,42 +781,47 @@ monitor es ctx sock = do
-- same list of 'Poll' descriptors with an "updated" 'PollEvent' field
-- (cf. zmq_poll). Sockets which have seen no activity have 'None' in
-- their 'PollEvent' field.
-poll :: [Poll] -> Timeout -> IO [Poll]
-poll fds to = do
- let len = length fds
- ps = map createZMQPoll fds
- withArray ps $ \ptr -> do
+poll :: MonadIO m => Timeout -> [Poll m] -> m [[Event]]
+poll to desc = do
+ let len = length desc
+ let ps = map toZMQPoll desc
+ ps' <- liftIO $ withArray ps $ \ptr -> do
throwIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
- ps' <- peekArray len ptr
- return $ map createPoll (zip ps' fds)
+ peekArray len ptr
+ mapM fromZMQPoll (zip desc ps')
where
- createZMQPoll :: Poll -> ZMQPoll
- createZMQPoll (S (Socket (SocketRepr s _)) e) =
- ZMQPoll s 0 (fromEvent e) 0
- createZMQPoll (F (Fd s) e) =
- ZMQPoll nullPtr (fromIntegral s) (fromEvent e) 0
-
- createPoll :: (ZMQPoll, Poll) -> Poll
- createPoll (zp, S (Socket (SocketRepr s t)) _) =
- S (Socket (SocketRepr s t)) (toEvent . fromIntegral . pRevents $ zp)
- createPoll (zp, F fd _) =
- F fd (toEvent . fromIntegral . pRevents $ zp)
+ toZMQPoll :: MonadIO m => Poll m -> ZMQPoll
+ toZMQPoll (Sock (Socket (SocketRepr s _)) e _) =
+ ZMQPoll s 0 (combine (map fromEvent e)) 0
+
+ toZMQPoll (File (Fd s) e _) =
+ ZMQPoll nullPtr (fromIntegral s) (combine (map fromEvent e)) 0
+
+ fromZMQPoll :: MonadIO m => (Poll m, ZMQPoll) -> m [Event]
+ fromZMQPoll (p, zp) = do
+ let e = toEvents . fromIntegral . pRevents $ zp
+ let (e', f) = case p of
+ (Sock _ x g) -> (x, g)
+ (File _ x g) -> (x, g)
+ unless (null (e `intersect` e')) $
+ maybe (return ()) ($ e) f
+ return e
fromEvent :: Event -> CShort
- fromEvent In = fromIntegral . pollVal $ pollIn
- fromEvent Out = fromIntegral . pollVal $ pollOut
- fromEvent InOut = fromIntegral . pollVal $ pollInOut
- fromEvent Native = fromIntegral . pollVal $ pollerr
- fromEvent None = 0
-
--- Convert bit-masked word into Event.
-toEvent :: Word32 -> Event
-toEvent e | e == (fromIntegral . pollVal $ pollIn) = In
- | e == (fromIntegral . pollVal $ pollOut) = Out
- | e == (fromIntegral . pollVal $ pollInOut) = InOut
- | e == (fromIntegral . pollVal $ pollerr) = Native
- | otherwise = None
+ fromEvent In = fromIntegral . pollVal $ pollIn
+ fromEvent Out = fromIntegral . pollVal $ pollOut
+ fromEvent Err = fromIntegral . pollVal $ pollerr
+
+-- Convert bit-masked word into Event list.
+toEvents :: Word32 -> [Event]
+toEvents e = foldl' (\es f -> f e es) [] tests
+ where
+ tests =
+ [ \i xs -> if i .&. (fromIntegral . pollVal $ pollIn) /= 0 then In:xs else xs
+ , \i xs -> if i .&. (fromIntegral . pollVal $ pollOut) /= 0 then Out:xs else xs
+ , \i xs -> if i .&. (fromIntegral . pollVal $ pollerr) /= 0 then Err:xs else xs
+ ]
retry :: String -> IO () -> IO CInt -> IO ()
retry msg wait act = throwIfMinus1RetryMayBlock_ msg act wait
View
3  src/System/ZMQ3/Base.hsc
@@ -146,8 +146,7 @@ newtype ZMQPollEvent = ZMQPollEvent { pollVal :: CShort } deriving (Eq, Ord)
#{enum ZMQPollEvent, ZMQPollEvent,
pollIn = ZMQ_POLLIN,
pollOut = ZMQ_POLLOUT,
- pollerr = ZMQ_POLLERR,
- pollInOut = ZMQ_POLLIN | ZMQ_POLLOUT
+ pollerr = ZMQ_POLLERR
}
-- general initialization
View
8 src/System/ZMQ3/Internal.hs
@@ -29,6 +29,7 @@ module System.ZMQ3.Internal
, toZMQFlag
, combine
+ , combineFlags
, mkSocketRepr
, closeSock
, onSocket
@@ -243,8 +244,11 @@ toZMQFlag :: Flag -> ZMQFlag
toZMQFlag DontWait = dontWait
toZMQFlag SendMore = sndMore
-combine :: [Flag] -> CInt
-combine = fromIntegral . foldr ((.|.) . flagVal . toZMQFlag) 0
+combineFlags :: [Flag] -> CInt
+combineFlags = fromIntegral . combine . map (flagVal . toZMQFlag)
+
+combine :: (Integral i, Bits i) => [i] -> i
+combine = foldr (.|.) 0
bool2cint :: Bool -> CInt
bool2cint True = 1
View
7 src/System/ZMQ3/Monadic.hs
@@ -68,7 +68,7 @@ module System.ZMQ3.Monadic (
, unsubscribe
, proxy
, monitor
- , poll
+ , Z.poll
-- * Socket Options (Read)
, affinity
@@ -283,9 +283,6 @@ unsubscribe s = liftIO . Z.unsubscribe s
proxy :: MonadZMQ m => Z.Socket a -> Z.Socket b -> Maybe (Z.Socket c) -> m ()
proxy a b = liftIO . Z.proxy a b
-poll :: MonadZMQ m => [Z.Poll] -> Z.Timeout -> m [Z.Poll]
-poll xs = liftIO . Z.poll xs
-
monitor :: MonadZMQ m => [Z.EventType] -> Z.Socket t -> m (Bool -> IO (Maybe Z.EventMsg))
monitor es s = onContext $ \ctx -> Z.monitor es ctx s
@@ -300,7 +297,7 @@ backlog = liftIO . Z.backlog
delayAttachOnConnect :: MonadZMQ m => Z.Socket t -> m Bool
delayAttachOnConnect = liftIO . Z.delayAttachOnConnect
-events :: MonadZMQ m => Z.Socket t -> m Z.Event
+events :: MonadZMQ m => Z.Socket t -> m [Z.Event]
events = liftIO . Z.events
fileDescriptor :: MonadZMQ m => Z.Socket t -> m Fd
Please sign in to comment.
Something went wrong with that request. Please try again.