Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add more documentation.

  • Loading branch information...
commit c999fd993b1975dfa3de1e7b0289f2a6977baab6 1 parent 00de992
@twittner authored
Showing with 148 additions and 24 deletions.
  1. +146 −22 src/System/ZMQ.hs
  2. +1 −1  src/System/ZMQ/Base.hsc
  3. +1 −1  zeromq-haskell.cabal
View
168 src/System/ZMQ.hs
@@ -6,7 +6,11 @@
-- Maintainer : toralf.wittner@gmail.com
-- Stability : experimental
-- Portability : non-portable
---
+--
+-- 0MQ haskell binding. The API closely follows the C-API of 0MQ with
+-- the main difference that sockets are typed.
+-- The documentation of the individual socket types and socket options
+-- is copied from 0MQ's man pages authored by Martin Sustrik.
module System.ZMQ (
@@ -61,10 +65,10 @@ import qualified Data.ByteString as SB
import qualified Data.ByteString.Unsafe as UB
import System.Posix.Types (Fd(..))
--- ^ A 0MQ context representation.
+-- | A 0MQ context representation.
newtype Context = Context { ctx :: ZMQCtx }
--- ^ A 0MQ Socket.
+-- | A 0MQ Socket.
newtype Socket a = Socket { sock :: ZMQSocket }
-- A 0MQ Message representation.
@@ -78,38 +82,74 @@ type Size = Word
class SType a where
zmqSocketType :: a -> ZMQSocketType
+-- | Socket to communicate with a single peer. Allows for only a
+-- single connect or a single bind. There's no message routing
+-- or message filtering involved. /Compatible peer sockets/: 'P2P'.
data P2P = P2P
instance SType P2P where
zmqSocketType = const p2p
+-- | Socket to distribute data. 'receive' function is not
+-- implemented for this socket type. Messages are distributed in
+-- fanout fashion to all the peers. /Compatible peer sockets/: 'Sub'.
data Pub = Pub
instance SType Pub where
zmqSocketType = const pub
+-- | Socket to subscribe for data. Send function is not implemented
+-- for this socket type. Initially, socket is subscribed for no
+-- messages. Use 'subscribe' to specify which messages to subscribe for.
+-- /Compatible peer sockets/: 'Pub'.
data Sub = Sub
instance SType Sub where
zmqSocketType = const sub
+-- | Socket to send requests and receive replies. Requests are
+-- load-balanced among all the peers. This socket type allows only an
+-- alternated sequence of send's and recv's.
+-- /Compatible peer sockets/: 'Rep', 'Xrep'.
data Req = Req
instance SType Req where
zmqSocketType = const request
+-- | Socket to receive requests and send replies. This socket type
+-- allows only an alternated sequence of receive's and send's. Each
+-- send is routed to the peer that issued the last received request.
+-- /Compatible peer sockets/: 'Req', 'XReq'.
data Rep = Rep
instance SType Rep where
zmqSocketType = const response
+-- | Special socket type to be used in request/reply middleboxes
+-- such as zmq_queue(7). Requests forwarded using this socket type
+-- should be tagged by a proper prefix identifying the original requester.
+-- Replies received by this socket are tagged with a proper postfix
+-- that can be use to route the reply back to the original requester.
+-- /Compatible peer sockets/: 'Rep', 'Xrep'.
data XReq = Xreq
instance SType XReq where
zmqSocketType = const xrequest
+-- | Special socket type to be used in request/reply middleboxes
+-- such as zmq_queue(7). Requests received using this socket are already
+-- properly tagged with prefix identifying the original requester. When
+-- sending a reply via XREP socket the message should be tagged with a
+-- prefix from a corresponding request.
+-- /Compatible peer sockets/: 'Req', 'Xreq'.
data XRep = Xrep
instance SType XRep where
zmqSocketType = const xresponse
+-- | Socket to receive messages from up the stream. Messages are
+-- fair-queued from among all the connected peers. Send function is not
+-- implemented for this socket type. /Compatible peer sockets/: 'Down'.
data Up = Up
instance SType Up where
zmqSocketType = const upstream
+-- | Socket to send messages down stream. Messages are load-balanced
+-- among all the connected peers. Send function is not implemented for
+-- this socket type. /Compatible peer sockets/: 'Up'.
data Down = Down
instance SType Down where
zmqSocketType = const downstream
@@ -119,7 +159,79 @@ instance SType Down where
class SubsType a
instance SubsType Sub
--- ^ The option to set on 0MQ sockets (cf. man zmq_setsockopt)
+-- | The option to set on 0MQ sockets (descriptions reproduced here from
+-- zmq_setsockopt(3) (cf. man zmq_setsockopt for further details)).
+--
+-- [@HighWM@] High watermark for the message pipes associated with the
+-- socket. The water mark cannot be exceeded. If the messages
+-- don't fit into the pipe emergency mechanisms of the
+-- particular socket type are used (block, drop etc.)
+-- If HWM is set to zero, there are no limits for the content
+-- of the pipe.
+-- /Default/: 0
+--
+-- [@LowWM@] Low watermark makes sense only if high watermark is
+-- defined (i.e. is non-zero). When the emergency state is
+-- reached when messages overflow the pipe, the emergency lasts
+-- at most till the size of the pipe decreases to low watermark.
+-- Normal state is resumed at that point.
+-- /Default/: 0
+--
+-- [@Swap@] Swap allows the pipe to exceed high watermark. However,
+-- the data are written to the disk rather than held in the memory.
+-- Until high watermark is exceeded there is no disk activity involved
+-- though. The value of the option defines maximal size of the swap file.
+-- /Default/: 0
+--
+-- [@Affinity@] Affinity defines which threads in the thread pool will
+-- be used to handle newly created sockets. This way you can dedicate
+-- some of the threads (CPUs) to a specific work. Value of 0 means no
+-- affinity. Work is distributed fairly among the threads in the
+-- thread pool. For non-zero values, the lowest bit corresponds to the
+-- thread 1, second lowest bit to the thread 2 etc. Thus, value of 3
+-- means that from now on newly created sockets will handle I/O activity
+-- exclusively using threads no. 1 and 2.
+-- /Default/: 0
+--
+-- [@Identity@] Identity of the socket. Identity is important when
+-- restarting applications. If the socket has no identity, each run of
+-- the application is completely separated from other runs. However,
+-- with identity application reconnects to existing infrastructure
+-- left by the previous run. Thus it may receive messages that were
+-- sent in the meantime, it shares pipe limits with the previous run etc.
+-- /Default/: NULL
+--
+-- [@Rate@] This option applies only to sending side of multicast
+-- transports (pgm & udp). It specifies maximal outgoing data rate that
+-- an individual sender socket can send.
+-- /Default/: 100
+--
+-- [@RecoveryIVL@] This option applies only to multicast transports
+-- (pgm & udp). It specifies how long can the receiver socket survive
+-- when the sender is inaccessible. Keep in mind that large recovery
+-- intervals at high data rates result in very large recovery buffers,
+-- meaning that you can easily overload your box by setting say 1 minute
+-- recovery interval at 1Gb/s rate (requires 7GB in-memory buffer).
+-- /Default/: 10
+--
+-- [@McastLoop@] This option applies only to multicast transports
+-- (pgm & udp). Value of 1 means that the mutlicast packets can be
+-- received on the box they were sent from. Setting the value to 0
+-- disables the loopback functionality which can have negative impact on
+-- the performance. If possible, disable the loopback in production
+-- environments.
+-- /Default/: 1
+--
+-- [@SendBuf@] Sets the underlying kernel transmit buffer size to the
+-- specified size. See SO_SNDBUF POSIX socket option. Value of zero
+-- means leaving the OS default unchanged.
+-- /Default/: 0
+--
+-- [@ReceiveBuf@] Sets the underlying kernel receive buffer size to
+-- the specified size. See SO_RCVBUF POSIX socket option. Value of
+-- zero means leaving the OS default unchanged.
+-- /Default/: 0
+--
data SocketOption =
HighWM Int64 -- ^ ZMQ_HWM
| LowWM Int64 -- ^ ZMQ_LWM
@@ -133,27 +245,39 @@ data SocketOption =
| ReceiveBuf Word64 -- ^ ZMQ_RCVBUF
deriving (Eq, Ord, Show)
--- ^ Flags to apply on send operations (cf. man zmq_send)
+-- | Flags to apply on send operations (cf. man zmq_send)
+--
+-- [@NoBlock@] Send operation should be performed in non-blocking mode.
+-- If it cannot be performed immediatley an error will be thrown (errno
+-- is set to EAGAIN).
+--
+-- [@NoFlush@] 'send' should not flush the message downstream immediately,
+-- instead it should batch messages send with 'NoFlush' and really send them
+-- only when 'flush' is invoked. zmq_send(3) states: \"This is an optimisation
+-- for cases where several messages are sent in a single business transaction.
+-- However, the effect is measurable only in extremely high-perf scenarios
+-- (million messages a second or so). If that's not your case, use standard
+-- flushing send instead.\"
data Flag =
NoBlock -- ^ ZMQ_NOBLOCK
| NoFlush -- ^ ZMQ_NOFLUSH
deriving (Eq, Ord, Show)
--- ^ The events to wait for in poll (cf. man zmq_poll)
+-- | The events to wait for in poll (cf. man zmq_poll)
data PollEvent =
- In -- ^ ZMQ_POLLIN
- | Out -- ^ ZMQ_POLLOUT
+ In -- ^ ZMQ_POLLIN (incoming messages)
+ | Out -- ^ ZMQ_POLLOUT (outgoing messages, i.e. at least 1 byte can be written)
| InOut -- ^ ZMQ_POLLIN | ZMQ_POLLOUT
deriving (Eq, Ord, Show)
--- ^ Type representing a descriptor, poll is waiting for
+-- | Type representing a descriptor, poll is waiting for
-- (either a 0MQ socket or a file descriptor) plus the type
-- of event of wait for.
data Poll =
forall a. S (Socket a) PollEvent
| F Fd PollEvent
--- ^ Initialize a 0MQ context (cf. zmq_init).
+-- | Initialize a 0MQ context (cf. zmq_init for details).
init :: Size -> Size -> Bool -> IO Context
init appThreads ioThreads doPoll = do
c <- throwErrnoIfNull "init" $
@@ -162,21 +286,21 @@ init appThreads ioThreads doPoll = do
(if doPoll then usePoll else 0)
return (Context c)
--- ^ Terminate 0MQ context (cf. zmq_term).
+-- | Terminate 0MQ context (cf. zmq_term).
term :: Context -> IO ()
term = throwErrnoIfMinus1_ "term" . c_zmq_term . ctx
--- ^ Create a new 0MQ socket within the given context.
+-- | Create a new 0MQ socket within the given context.
socket :: SType a => Context -> a -> IO (Socket a)
socket (Context c) t =
let zt = typeVal . zmqSocketType $ t
in Socket <$> throwErrnoIfNull "socket" (c_zmq_socket c zt)
--- ^ Close a 0MQ socket.
+-- | Close a 0MQ socket.
close :: Socket a -> IO ()
close = throwErrnoIfMinus1_ "close" . c_zmq_close . sock
--- ^ Set the given option on the socket. Please note that there are
+-- | Set the given option on the socket. Please note that there are
-- certain combatibility constraints w.r.t the socket type (cf. man
-- zmq_setsockopt).
--
@@ -194,34 +318,34 @@ setOption s (McastLoop o) = setIntOpt s mcastLoop o
setOption s (SendBuf o) = setIntOpt s sendBuf o
setOption s (ReceiveBuf o) = setIntOpt s receiveBuf o
--- ^ Subscribe Socket to given subscription.
+-- | Subscribe Socket to given subscription.
subscribe :: SubsType a => Socket a -> String -> IO ()
subscribe s = setStrOpt s B.subscribe
--- ^ Unsubscribe Socket from given subscription.
+-- | Unsubscribe Socket from given subscription.
unsubscribe :: SubsType a => Socket a -> String -> IO ()
unsubscribe s = setStrOpt s B.unsubscribe
--- ^ Bind the socket to the given address (zmq_bind)
+-- | Bind the socket to the given address (zmq_bind)
bind :: Socket a -> String -> IO ()
bind (Socket s) str = throwErrnoIfMinus1_ "bind" $
withCString str (c_zmq_bind s)
--- ^ Connect the socket to the given address (zmq_connect).
+-- | Connect the socket to the given address (zmq_connect).
connect :: Socket a -> String -> IO ()
connect (Socket s) str = throwErrnoIfMinus1_ "connect" $
withCString str (c_zmq_connect s)
--- ^ Send the given 'ByteString' over the socket (zmq_send).
+-- | Send the given 'ByteString' over the socket (zmq_send).
send :: Socket a -> SB.ByteString -> [Flag] -> IO ()
send (Socket s) val fls = bracket (messageOf val) messageClose $ \m ->
throwErrnoIfMinus1_ "send" $ c_zmq_send s (msgPtr m) (combine fls)
--- ^ Flush the given socket (useful for 'send's with 'NoFlush').
+-- | Flush the given socket (useful for 'send's with 'NoFlush').
flush :: Socket a -> IO ()
flush = throwErrnoIfMinus1_ "flush" . c_zmq_flush . sock
--- ^ Receive a 'ByteString' from socket (zmq_recv).
+-- | Receive a 'ByteString' from socket (zmq_recv).
receive :: Socket a -> [Flag] -> IO (SB.ByteString)
receive (Socket s) fls = bracket messageInit messageClose $ \m -> do
throwErrnoIfMinus1_ "receive" $ c_zmq_recv s (msgPtr m) (combine fls)
@@ -229,7 +353,7 @@ receive (Socket s) fls = bracket messageInit messageClose $ \m -> do
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
--- ^ Polls for events on the given 'Poll' descriptors. Returns the
+-- | Polls for events on the given 'Poll' descriptors. Returns the
-- list of 'Poll' descriptors for which an event occured (cf. zmq_poll).
poll :: [Poll] -> Timeout -> IO [Poll]
poll fds to = do
View
2  src/System/ZMQ/Base.hsc
@@ -23,7 +23,7 @@ instance Storable ZMQMsg where
alignment _ = #{alignment zmq_msg_t}
sizeOf _ = #{size zmq_msg_t}
peek p = ZMQMsg <$> #{peek zmq_msg_t, content} p
- poke p (ZMQMsg c) = #{poke zmq_msg_t, content} p c
+ poke p (ZMQMsg c) = #{poke zmq_msg_t, content} p c
data ZMQPoll = ZMQPoll
{ pSocket :: ZMQSocket
View
2  zeromq-haskell.cabal
@@ -1,5 +1,5 @@
name: zeromq-haskell
-version: 0.1.1
+version: 0.1.2
synopsis: bindings to zeromq
description: Bindings to zeromq (http://zeromq.org)
category: System, FFI
Please sign in to comment.
Something went wrong with that request. Please try again.