Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Code cleanup #348

Merged
merged 7 commits into from

2 participants

This page is out of date. Refresh to see the latest.
View
4 src/devpoll.cpp
@@ -70,7 +70,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
}
}
- assert (!fd_table [fd_].valid);
+ zmq_assert (!fd_table [fd_].valid);
fd_table [fd_].events = 0;
fd_table [fd_].reactor = reactor_;
@@ -88,7 +88,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
void zmq::devpoll_t::rm_fd (handle_t handle_)
{
- assert (fd_table [handle_].valid);
+ zmq_assert (fd_table [handle_].valid);
devpoll_ctl (handle_, POLLREMOVE);
fd_table [handle_].valid = false;
View
38 src/err.cpp
@@ -214,46 +214,36 @@ void zmq::win_error (char *buffer_, size_t buffer_size_)
zmq_assert (rc);
}
-void zmq::wsa_error_to_errno ()
+int zmq::wsa_error_to_errno (int errcode)
{
- int errcode = WSAGetLastError ();
switch (errcode) {
case WSAEINPROGRESS:
- errno = EAGAIN;
- return;
+ return EAGAIN;
case WSAEBADF:
- errno = EBADF;
- return;
+ return EBADF;
case WSAEINVAL:
- errno = EINVAL;
- return;
+ return EINVAL;
case WSAEMFILE:
- errno = EMFILE;
- return;
+ return EMFILE;
case WSAEFAULT:
- errno = EFAULT;
- return;
+ return EFAULT;
case WSAEPROTONOSUPPORT:
- errno = EPROTONOSUPPORT;
- return;
+ return EPROTONOSUPPORT;
case WSAENOBUFS:
- errno = ENOBUFS;
- return;
+ return ENOBUFS;
case WSAENETDOWN:
- errno = ENETDOWN;
- return;
+ return ENETDOWN;
case WSAEADDRINUSE:
- errno = EADDRINUSE;
- return;
+ return EADDRINUSE;
case WSAEADDRNOTAVAIL:
- errno = EADDRNOTAVAIL;
- return;
+ return EADDRNOTAVAIL;
case WSAEAFNOSUPPORT:
- errno = EAFNOSUPPORT;
- return;
+ return EAFNOSUPPORT;
default:
wsa_assert (false);
}
+ // Not reachable
+ return 0;
}
#endif
View
2  src/err.hpp
@@ -57,7 +57,7 @@ namespace zmq
const char *wsa_error ();
const char *wsa_error_no (int no_);
void win_error (char *buffer_, size_t buffer_size_);
- void wsa_error_to_errno ();
+ int wsa_error_to_errno (int errcode);
}
// Provides convenient way to check WSA-style errors on Windows.
View
3  src/i_engine.hpp
@@ -37,9 +37,6 @@ namespace zmq
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0;
- // Unplug the engine from the session.
- virtual void unplug () = 0;
-
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
virtual void terminate () = 0;
View
4 src/pgm_receiver.hpp
@@ -59,7 +59,6 @@ namespace zmq
// i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
- void unplug ();
void terminate ();
void activate_in ();
void activate_out ();
@@ -70,6 +69,9 @@ namespace zmq
private:
+ // Unplug the engine from the session.
+ void unplug ();
+
// PGM is not able to move subscriptions upstream. Thus, drop all
// the pending subscriptions.
void drop_subscriptions ();
View
4 src/pgm_sender.hpp
@@ -57,7 +57,6 @@ namespace zmq
// i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
- void unplug ();
void terminate ();
void activate_in ();
void activate_out ();
@@ -69,6 +68,9 @@ namespace zmq
private:
+ // Unplug the engine from the session.
+ void unplug ();
+
// TX and RX timeout timer ID's.
enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
View
2  src/pgm_socket.cpp
@@ -454,7 +454,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// We have to write all data as one packet.
if (nbytes > 0) {
zmq_assert (status == PGM_IO_STATUS_NORMAL);
- zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
+ zmq_assert (nbytes == data_len_);
} else {
zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
status == PGM_IO_STATUS_WOULD_BLOCK);
View
4 src/poll.cpp
@@ -57,7 +57,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
pollfd pfd = {fd_, 0, 0};
pollset.push_back (pfd);
- assert (fd_table [fd_].index == retired_fd);
+ zmq_assert (fd_table [fd_].index == retired_fd);
fd_table [fd_].index = pollset.size() - 1;
fd_table [fd_].events = events_;
@@ -71,7 +71,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::poll_t::rm_fd (handle_t handle_)
{
fd_t index = fd_table [handle_].index;
- assert (index != retired_fd);
+ zmq_assert (index != retired_fd);
// Mark the fd as unused.
pollset [index].fd = retired_fd;
View
6 src/router.cpp
@@ -219,8 +219,8 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_)
}
// Identity is not expected
- assert ((msg_->flags () & msg_t::identity) == 0);
- assert (pipe != NULL);
+ zmq_assert ((msg_->flags () & msg_t::identity) == 0);
+ zmq_assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part.
if (more_in)
@@ -273,7 +273,7 @@ bool zmq::router_t::xhas_in ()
return false;
// Identity is not expected
- assert ((prefetched_msg.flags () & msg_t::identity) == 0);
+ zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0);
blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ());
View
8 src/session_base.cpp
@@ -280,13 +280,7 @@ void zmq::session_base_t::process_plug ()
void zmq::session_base_t::process_attach (i_engine *engine_)
{
- // If some other object (e.g. init) notifies us that the connection failed
- // without creating an engine we need to start the reconnection process.
- if (!engine_) {
- zmq_assert (!engine);
- detached ();
- return;
- }
+ zmq_assert (engine_ != NULL);
// Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) {
View
36 src/stream_engine.cpp
@@ -52,7 +52,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
outsize (0),
encoder (out_batch_size),
session (NULL),
- leftover_session (NULL),
options (options_),
plugged (false)
{
@@ -109,7 +108,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
{
zmq_assert (!plugged);
plugged = true;
- leftover_session = NULL;
// Connect to session object.
zmq_assert (!session);
@@ -144,7 +142,6 @@ void zmq::stream_engine_t::unplug ()
// Disconnect from session object.
encoder.set_session (NULL);
decoder.set_session (NULL);
- leftover_session = session;
session = NULL;
endpoint.clear();
}
@@ -185,12 +182,8 @@ void zmq::stream_engine_t::in_event ()
else {
// Stop polling for input if we got stuck.
- if (processed < insize) {
-
- // This may happen if queue limits are in effect.
- if (plugged)
- reset_pollin (handle);
- }
+ if (processed < insize)
+ reset_pollin (handle);
// Adjust the buffer.
inpos += processed;
@@ -198,20 +191,14 @@ void zmq::stream_engine_t::in_event ()
}
// Flush all messages the decoder may have produced.
- // If IO handler has unplugged engine, flush transient IO handler.
- if (unlikely (!plugged)) {
- zmq_assert (leftover_session);
- leftover_session->flush ();
- } else {
- session->flush ();
- }
+ session->flush ();
// Input error has occurred. If the last decoded
// message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
- if (session != NULL && disconnection) {
+ if (disconnection) {
input_error = true;
if (decoder.stalled ())
reset_pollin (handle);
@@ -228,13 +215,6 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL;
encoder.get_data (&outpos, &outsize);
- // If IO handler has unplugged engine, flush transient IO handler.
- if (unlikely (!plugged)) {
- zmq_assert (leftover_session);
- leftover_session->flush ();
- return;
- }
-
// If there is no data to send, stop polling for output.
if (outsize == 0) {
reset_pollout (handle);
@@ -312,7 +292,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_)
return 0;
// Signalise peer failure.
- if (nbytes == -1 && (
+ if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
@@ -322,7 +302,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_)
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
- return (size_t) nbytes;
+ return nbytes;
#else
@@ -358,7 +338,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
return 0;
// Connection failure.
- if (nbytes == -1 && (
+ if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAECONNABORTED ||
@@ -374,7 +354,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
if (nbytes == 0)
return -1;
- return (size_t) nbytes;
+ return nbytes;
#else
View
4 src/stream_engine.hpp
@@ -51,7 +51,6 @@ namespace zmq
// i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
- void unplug ();
void terminate ();
void activate_in ();
void activate_out ();
@@ -62,6 +61,9 @@ namespace zmq
private:
+ // Unplug the engine from the session.
+ void unplug ();
+
// Function to handle network disconnections.
void error ();
View
17 src/tcp_connecter.cpp
@@ -193,7 +193,7 @@ int zmq::tcp_connecter_t::open ()
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
- wsa_error_to_errno ();
+ errno = wsa_error_to_errno (WSAGetLastError ());
return -1;
}
#else
@@ -218,20 +218,17 @@ int zmq::tcp_connecter_t::open ()
if (rc == 0)
return 0;
- // Translate other error codes indicating asynchronous connect has been
+ // Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
- if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS ||
- WSAGetLastError () == WSAEWOULDBLOCK)) {
+ const int error_code = WSAGetLastError ();
+ if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
errno = EINPROGRESS;
- return -1;
- }
- wsa_error_to_errno ();
+ else
+ errno = wsa_error_to_errno (error_code);
#else
- if (rc == -1 && errno == EINTR) {
+ if (errno == EINTR)
errno = EINPROGRESS;
- return -1;
- }
#endif
return -1;
}
View
8 src/tcp_listener.cpp
@@ -156,7 +156,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET)
- wsa_error_to_errno ();
+ errno = wsa_error_to_errno (WSAGetLastError ());
#endif
// IPv6 address family not supported, try automatic downgrade to IPv4.
@@ -170,7 +170,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
- wsa_error_to_errno ();
+ errno = wsa_error_to_errno (WSAGetLastError ());
return -1;
}
// On Windows, preventing sockets to be inherited by child processes.
@@ -203,7 +203,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
- wsa_error_to_errno ();
+ errno = wsa_error_to_errno (WSAGetLastError ());
return -1;
}
#else
@@ -215,7 +215,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
rc = listen (s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
- wsa_error_to_errno ();
+ errno = wsa_error_to_errno (WSAGetLastError ());
return -1;
}
#else
View
12 src/zmq.cpp
@@ -853,17 +853,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
- zmq::wsa_error_to_errno ();
- if (errno == ENOTSOCK)
- return -1;
- wsa_assert (false);
+ errno = zmq::wsa_error_to_errno (WSAGetLastError ());
+ wsa_assert (errno == ENOTSOCK);
+ return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
- if (errno == EINTR || errno == EBADF)
- return -1;
- errno_assert (false);
+ errno_assert (errno == EINTR || errno == EBADF);
+ return -1;
}
#endif
break;
Something went wrong with that request. Please try again.