diff --git a/Makefile.am b/Makefile.am index ce79601b5..6ad9ba2ec 100644 --- a/Makefile.am +++ b/Makefile.am @@ -50,6 +50,7 @@ NANOMSG_CORE = \ src/core/global.h \ src/core/global.c \ src/core/pipe.c \ + src/core/poll.c \ src/core/sock.h \ src/core/sock.c \ src/core/sockbase.c \ @@ -336,7 +337,8 @@ MAN3 = \ doc/nn_sendmsg.txt \ doc/nn_recvmsg.txt \ doc/nn_device.txt \ - doc/nn_cmsg.txt + doc/nn_cmsg.txt \ + doc/nn_poll.txt MAN1 = \ doc/nanocat.txt diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad0d77c2e..b0b937e90 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -43,6 +43,7 @@ set (NN_SOURCES core/global.h core/global.c core/pipe.c + core/poll.c core/sock.h core/sock.c core/sockbase.c diff --git a/src/core/poll.c b/src/core/poll.c new file mode 100644 index 000000000..63b67c399 --- /dev/null +++ b/src/core/poll.c @@ -0,0 +1,105 @@ +/* + Copyright (c) 2013 250bpm s.r.o. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "../nn.h" + +#include "../utils/alloc.h" +#include "../utils/fast.h" +#include "../utils/err.h" + +#include +#include + +int nn_poll (struct nn_pollfd *fds, int nfds, int timeout) +{ + int rc; + int i; + int pos; + int fd; + int res; + size_t sz; + struct pollfd *pfd; + + /* Construct a pollset to be used with OS-level 'poll' function. */ + pfd = nn_alloc (sizeof (struct pollfd) * nfds * 2, "pollset"); + alloc_assert (pfd); + pos = 0; + for (i = 0; i != nfds; ++i) { + if (fds [i].events & NN_POLLIN) { + sz = sizeof (fd); + rc = nn_getsockopt (fds [i].fd, NN_SOL_SOCKET, NN_RCVFD, &fd, &sz); + if (nn_slow (rc < 0)) { + nn_free (pfd); + errno = -rc; + return -1; + } + nn_assert (sz == sizeof (fd)); + pfd [pos].fd = fd; + pfd [pos].events = POLLIN; + ++pos; + } + if (fds [i].events & NN_POLLOUT) { + sz = sizeof (fd); + rc = nn_getsockopt (fds [i].fd, NN_SOL_SOCKET, NN_SNDFD, &fd, &sz); + if (nn_slow (rc < 0)) { + nn_free (pfd); + errno = -rc; + return -1; + } + nn_assert (sz == sizeof (fd)); + pfd [pos].fd = fd; + pfd [pos].events = POLLIN; + ++pos; + } + } + + /* Do the polling itself. */ + rc = poll (pfd, pos, timeout); + if (nn_slow (rc <= 0)) { + nn_free (pfd); + errno = -rc; + return -1; + } + + /* Move the results from OS-level poll to nn_poll's pollset. */ + res = 0; + pos = 0; + for (i = 0; i != nfds; ++i) { + fds [i].revents = 0; + if (fds [i].events & NN_POLLIN) { + if (pfd [pos].revents & POLLIN) + fds [i].revents |= NN_POLLIN; + ++pos; + } + if (fds [i].events & NN_POLLOUT) { + if (pfd [pos].revents & POLLIN) + fds [i].revents |= NN_POLLOUT; + ++pos; + } + if (fds [i].revents) + ++res; + } + + nn_free (pfd); + return res; +} + diff --git a/src/nn.h b/src/nn.h index 17b8002c7..24119dc58 100644 --- a/src/nn.h +++ b/src/nn.h @@ -306,6 +306,21 @@ NN_EXPORT int nn_recv (int s, void *buf, size_t len, int flags); NN_EXPORT int nn_sendmsg (int s, const struct nn_msghdr *msghdr, int flags); NN_EXPORT int nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags); +/******************************************************************************/ +/* Socket mutliplexing support. */ +/******************************************************************************/ + +#define NN_POLLIN 1 +#define NN_POLLOUT 2 + +struct nn_pollfd { + int fd; + short events; + short revents; +}; + +NN_EXPORT int nn_poll (struct nn_pollfd *fds, int nfds, int timeout); + /******************************************************************************/ /* Built-in support for devices. */ /******************************************************************************/ diff --git a/tests/poll.c b/tests/poll.c index dfdda4c55..33a62b934 100644 --- a/tests/poll.c +++ b/tests/poll.c @@ -125,6 +125,26 @@ int main () int sb; char buf [3]; struct nn_thread thread; + struct nn_pollfd pfd [2]; + + /* Test nn_poll() function. */ + sb = test_socket (AF_SP, NN_PAIR); + test_bind (sb, SOCKET_ADDRESS); + sc = test_socket (AF_SP, NN_PAIR); + test_connect (sc, SOCKET_ADDRESS); + test_send (sc, "ABC"); + nn_sleep (100); + pfd [0].fd = sb; + pfd [0].events = NN_POLLIN | NN_POLLOUT; + pfd [1].fd = sc; + pfd [1].events = NN_POLLIN | NN_POLLOUT; + rc = nn_poll (pfd, 2, -1); + errno_assert (rc >= 0); + nn_assert (rc == 2); + nn_assert (pfd [0].revents == NN_POLLIN | NN_POLLOUT); + nn_assert (pfd [1].revents == NN_POLLOUT); + test_close (sc); + test_close (sb); /* Create a simple topology. */ sb = test_socket (AF_SP, NN_PAIR);