Permalink
Browse files

Handle SIGPIPE when client closes closes before broker sends sasl mec…

…hanisms frame.
  • Loading branch information...
1 parent fd4a580 commit 120a4baf70baae54863efe86cf5d3ee9c0997e68 @ewalshe ewalshe committed Nov 8, 2011
View
1 TODO
@@ -0,0 +1 @@
+* Ignore SIGPIPE if SO_NOSIGPIPE is not available.
View
@@ -41,7 +41,7 @@ static inline int amqp_trailing_zeros_32(uint32_t value)
return amqp_mod_37_bit_position[value % 37];
}
-#if ULONG_MAX > 4294967295ULL
+#if defined(__LP__) || defined(_WIN64) || ULONG_MAX > 4294967295ULL
#define amqp_mask_t uint64_t
#define amqp_trailing_zeros amqp_trailing_zeros_64
#define amqp_next_power_two amqp_next_power_two_64
@@ -229,6 +229,7 @@ static void attempt_connection_to_next_address(amqp_connection_t *connection)
}
amqp_set_socket_to_nonblocking(connection->socket.fd);
+ amqp_set_socket_to_ignore_sigpipe(connection->socket.fd);
if (connect(connection->socket.fd, iterator->ai_addr, iterator->ai_addrlen) == 0)
{
@@ -20,6 +20,8 @@
#include "Transport/Connection/ConnectionTrace.h"
#include "Transport/Connection/ConnectionTestHook.h"
+#include "debug_helper.h"
+
#ifdef LIBAMQP_TRACE_CONNECT_STATE
#define save_old_state() const char* old_state_name = connection->state.connection.name
#define trace_transition(old_state_name) amqp_connection_trace_transition(connection, AMQP_TRACE_CONNECTION, old_state_name, connection->state.connection.name)
@@ -211,6 +213,7 @@ static void fail_while_accepting_sasl(amqp_connection_t *connection)
{
amqp_connection_failure_flag_set(connection, AMQP_CONNECTION_SASL_NEGOTIATION_REJECTED);
amqp_connection_trace(connection, "SASL negotiation failed");
+ break_two();
connection->state.connection.drain(connection);
}
static void reject_while_accepting_sasl(amqp_connection_t *connection, uint32_t version)
@@ -320,6 +320,7 @@ static void default_write_next(amqp_connection_t *connection)
static void default_write_stop(amqp_connection_t *connection, amqp_connection_action_f done_callback)
{
+ done_callback(connection);
}
static void default_write_abort(amqp_connection_t *connection)
@@ -14,8 +14,6 @@
limitations under the License.
*/
-
-
#include "Context/Context.h"
#include "Transport/LowLevel/Listener.h"
#include "Transport/LowLevel/Socket.h"
@@ -107,7 +105,7 @@ void accept_new_connection_handler(amqp_event_loop_t* loop, ev_io *io, const int
}
}
-static int bind_socket_to_any(amqp_context_t *context, int socket_fd, int port_number)
+static int bind_socket_to_any(amqp_context_t *context, amqp_socket_t socket_fd, int port_number)
{
struct sockaddr_in6 sin6;
bzero(&sin6, sizeof(sin6));
@@ -126,14 +124,20 @@ static int bind_socket_to_any(amqp_context_t *context, int socket_fd, int port_n
return true;
}
-static int start_listening_on_socket(amqp_context_t *context, int socket_fd, int port_number)
+static int start_listening_on_socket(amqp_context_t *context, amqp_socket_t socket_fd, int port_number)
{
if (amqp_set_socket_to_nonblocking(socket_fd) == -1)
{
amqp_io_error(context, "Cannot set non-blocking flag on socket");
return false;
}
+ if (amqp_set_socket_to_ignore_sigpipe(socket_fd) == -1)
+ {
+ amqp_io_error(context, "Cannot set socket to ignore SIGPIPE");
+ return false;
+ }
+
if (amqp_set_socket_option(socket_fd, SO_REUSEADDR, 1) == -1)
{
amqp_io_error(context, "Cannot set SO_REUSEADDR option on socket");
@@ -157,7 +161,7 @@ static int start_listening_on_socket(amqp_context_t *context, int socket_fd, int
amqp_io_event_watcher_t *amqp_listener_initialize(amqp_context_t *context, amqp_event_loop_t *loop, int port_number, amqp_accept_event_handle_t accept_handler, amqp_accept_handler_arguments_t *arguments)
{
- int socket_fd;
+ amqp_socket_t socket_fd;
amqp_io_event_watcher_t *result;
assert(context != 0);
@@ -67,7 +67,47 @@ int amqp_socket_address_port(struct sockaddr_storage *socket_address, socklen_t
}
}
-int amqp_socket_shutdown_output(int fd)
+int amqp_socket_shutdown_output(amqp_socket_t fd)
{
return shutdown(fd, SHUT_WR);
}
+
+int amqp_set_socket_to_nonblocking(amqp_socket_t fd)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1)
+ {
+ return -1;
+ }
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+int amqp_set_socket_to_blocking(amqp_socket_t fd)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1)
+ {
+ return -1;
+ }
+ return fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+}
+
+int amqp_socket_get_error(amqp_socket_t fd)
+{
+ int so_error = 0;
+ socklen_t so_error_length = sizeof(so_error);
+ int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_length);
+ return rc != -1 ? so_error : errno;
+}
+
+int amqp_set_socket_to_ignore_sigpipe(amqp_socket_t fd)
+{
+#ifndef AMQP_WINDOWS_PORT
+#ifdef SO_NOSIGPIPE
+ int value = 1;
+ return amqp_set_socket_option(fd, SO_NOSIGPIPE, value);
+#else
+#error "Ignore SIGPIPE"
+#endif
+#endif
+}
@@ -31,52 +31,30 @@ extern "C" {
#include <netinet/in.h>
#include <netdb.h>
-#ifndef _MSC_VER
+#ifndef AMQP_WINDOWS_PORT
+typedef int amqp_socket_t;
#include <unistd.h>
+#else
+typedef HANDLE amqp_socket_t;
#endif
extern void bzero(void *block, size_t n);
extern void amqp_socket_address_tos(char *buffer, size_t buffer_size, struct sockaddr_storage *client_address, socklen_t address_size);
extern int amqp_socket_address_port(struct sockaddr_storage *client_address, socklen_t address_size);
-extern int amqp_socket_shutdown_output(int fd);
+extern int amqp_socket_shutdown_output(amqp_socket_t fd);
inline static
-int amqp_set_socket_option(int fd, int option, int value)
+int amqp_set_socket_option(amqp_socket_t fd, int option, int value)
{
socklen_t value_length = sizeof(value);
return setsockopt(fd, SOL_SOCKET, option, &value, value_length);
}
-inline static
-int amqp_set_socket_to_nonblocking(int fd)
-{
- int flags = fcntl(fd, F_GETFL, 0);
- if (flags == -1)
- {
- return -1;
- }
- return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
-}
-
-inline static
-int amqp_set_socket_to_blocking(int fd)
-{
- int flags = fcntl(fd, F_GETFL, 0);
- if (flags == -1)
- {
- return -1;
- }
- return fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
-}
+extern int amqp_set_socket_to_nonblocking(amqp_socket_t fd);
+extern int amqp_set_socket_to_blocking(amqp_socket_t fd);
+extern int amqp_socket_get_error(amqp_socket_t fd);
-inline static
-int amqp_socket_get_error(int fd)
-{
- int so_error = 0;
- socklen_t so_error_length = sizeof(so_error);
- int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_length);
- return rc != -1 ? so_error : errno;
-}
+extern int amqp_set_socket_to_ignore_sigpipe(amqp_socket_t fd);
#ifdef __cplusplus
}
@@ -38,6 +38,7 @@ extern "C" {
#ifdef _MSC_VER
#include "libamqp_win32.h"
#define not_reached() abort(); return 0;
+#define AMQP_WINDOWS_PORT
#else
#include <stdint.h>
#define not_reached() abort();
View
@@ -31,7 +31,7 @@ INCLUDES :=
TEST_SRC :=
TEST_HARNESS_SRC :=
TEST_HARNESS_INCLUDES :=
-#TEST_RUNNER_ARGS := --exclude SlowRunning
+TEST_RUNNER_ARGS := --exclude SlowRunning
#TEST_RUNNER_ARGS := BasicDecode CodecDecode IntegerDecode NumericDecode VariableTypeDecoder CompoundTypeDecode BadDataDecoder CodecEncode CompoundEncoding VariableTypesEncoding
#TEST_RUNNER_ARGS := CodecDecode
JENKINS_BUILD_OPTIONS :=
@@ -51,7 +51,7 @@ BUILD_OPTIONS += -DLIBAMQP_TRACE_CONNECT_STATE
CPPFLAGS = $(patsubst %,-I%, $(INCLUDES)) $(patsubst %,-I%, $(TEST_HARNESS_INCLUDES)) $(BUILD_OPTIONS) $(JENKINS_BUILD_OPTIONS)
# CFLAGS += -g -O3 -Wall -Werror -std=c99 -pedantic -D_POSIX_C_SOURCE=200112L -fasm
-CFLAGS = -g -O0 -Wall -Werror -std=c99 -pedantic -D_POSIX_C_SOURCE=200112L -fasm $(COVERAGE_CFLAGS)
+CFLAGS = -g -O0 -Wall -Werror -std=c99 -pedantic -D_POSIX_C_SOURCE=200112L -D_DARWIN_C_SOURCE -fasm $(COVERAGE_CFLAGS)
CXXFLAGS = -g -Wall
LDFLAGS = -lev -lpthread $(COVERAGE_LDFLAGS)

0 comments on commit 120a4ba

Please sign in to comment.