81 changes: 54 additions & 27 deletions src/nxt_port_socket.c
Expand Up @@ -5,6 +5,7 @@
*/

#include <nxt_main.h>
#include <nxt_socket_msg.h>
#include <nxt_port_queue.h>
#include <nxt_port_memory_int.h>

Expand All @@ -22,6 +23,7 @@ static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg);
nxt_inline void nxt_port_close_fds(nxt_fd_t *fd);
static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
Expand Down Expand Up @@ -593,16 +595,21 @@ nxt_port_msg_close_fd(nxt_port_send_msg_t *msg)
return;
}

if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
nxt_port_close_fds(msg->fd);
}

msg->fd[0] = -1;
}

if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
nxt_inline void
nxt_port_close_fds(nxt_fd_t *fd)
{
if (fd[0] != -1) {
nxt_fd_close(fd[0]);
fd[0] = -1;
}

msg->fd[1] = -1;
if (fd[1] != -1) {
nxt_fd_close(fd[1]);
fd[1] = -1;
}
}

Expand Down Expand Up @@ -725,16 +732,17 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
nxt_int_t ret;
nxt_port_t *port;
struct iovec iov[2];
nxt_recv_oob_t oob;
nxt_port_recv_msg_t msg;
struct iovec iov[2];

port = msg.port = nxt_container_of(obj, nxt_port_t, socket);

nxt_assert(port->engine == task->thread->engine);

for ( ;; ) {

b = nxt_port_buf_alloc(port);

if (nxt_slow_path(b == NULL)) {
Expand All @@ -747,9 +755,22 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;

n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);

if (n > 0) {
msg.fd[0] = -1;
msg.fd[1] = -1;

ret = nxt_socket_msg_oob_get(&oob, msg.fd,
nxt_recv_msg_cmsg_pid_ref(&msg));
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "failed to get oob data from %d",
port->socket.fd);

nxt_port_close_fds(msg.fd);

goto fail;
}

msg.buf = b;
msg.size = n;
Expand Down Expand Up @@ -778,8 +799,8 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
return;
}

/* n == 0 || n == NXT_ERROR */

fail:
/* n == 0 || error */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_port_error_handler, task, &port->socket, NULL);
return;
Expand All @@ -792,8 +813,10 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
nxt_int_t ret;
nxt_port_t *port;
struct iovec iov[2];
nxt_recv_oob_t oob;
nxt_port_queue_t *queue;
nxt_port_recv_msg_t msg, *smsg;
uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
Expand Down Expand Up @@ -884,7 +907,23 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;

n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);

if (n > 0) {
msg.fd[0] = -1;
msg.fd[1] = -1;

ret = nxt_socket_msg_oob_get(&oob, msg.fd,
nxt_recv_msg_cmsg_pid_ref(&msg));
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "failed to get oob data from %d",
port->socket.fd);

nxt_port_close_fds(msg.fd);

return;
}
}

if (n == (ssize_t) sizeof(nxt_port_msg_t)
&& msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
Expand Down Expand Up @@ -1139,13 +1178,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_alert(task, "port %d: too small message:%uz",
port->socket.fd, msg->size);

if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
}

if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
}
nxt_port_close_fds(msg->fd);

return;
}
Expand Down Expand Up @@ -1225,13 +1258,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = NULL;

} else {
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
}

if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
}
nxt_port_close_fds(msg->fd);
}
} else {
if (nxt_fast_path(msg->cancelled == 0)) {
Expand Down
4 changes: 2 additions & 2 deletions src/nxt_socket.h
Expand Up @@ -114,8 +114,8 @@ NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task,
NXT_EXPORT void nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair);
NXT_EXPORT ssize_t nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd,
nxt_iobuf_t *iob, nxt_uint_t niob);
NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd,
nxt_iobuf_t *iob, nxt_uint_t niob);
NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev,
nxt_iobuf_t *iob, nxt_uint_t niob, void *oob);


#define \
Expand Down
57 changes: 57 additions & 0 deletions src/nxt_socket_msg.c
@@ -0,0 +1,57 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/

#include <nxt_main.h>
#include <nxt_socket_msg.h>


ssize_t
nxt_sendmsg(nxt_socket_t s, nxt_iobuf_t *iob, nxt_uint_t niob,
const nxt_send_oob_t *oob)
{
struct msghdr msg;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;
/* Flags are cleared just to suppress valgrind warning. */
msg.msg_flags = 0;

if (oob != NULL && oob->size != 0) {
msg.msg_control = (void *) oob->buf;
msg.msg_controllen = oob->size;

} else {
msg.msg_control = NULL;
msg.msg_controllen = 0;
}

return sendmsg(s, &msg, 0);
}


ssize_t
nxt_recvmsg(nxt_socket_t s, nxt_iobuf_t *iob, nxt_uint_t niob,
nxt_recv_oob_t *oob)
{
ssize_t n;
struct msghdr msg;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;
msg.msg_control = oob->buf;
msg.msg_controllen = sizeof(oob->buf);

n = recvmsg(s, &msg, 0);

if (nxt_fast_path(n != -1)) {
oob->size = msg.msg_controllen;
}

return n;
}
220 changes: 220 additions & 0 deletions src/nxt_socket_msg.h
@@ -0,0 +1,220 @@
/*
* Copyright (C) NGINX, Inc.
*/

#ifndef _NXT_SOCKET_MSG_H_INCLUDED_
#define _NXT_SOCKET_MSG_H_INCLUDED_

#if (NXT_HAVE_UCRED)
#include <sys/un.h>
#endif


#if (NXT_HAVE_UCRED)
#define NXT_CRED_USECMSG 1
#define NXT_CRED_CMSGTYPE SCM_CREDENTIALS
#define NXT_CRED_GETPID(u) (u->pid)

typedef struct ucred nxt_socket_cred_t;

#elif (NXT_HAVE_MSGHDR_CMSGCRED)
#define NXT_CRED_USECMSG 1
#define NXT_CRED_CMSGTYPE SCM_CREDS
#define NXT_CRED_GETPID(u) (u->cmcred_pid)

typedef struct cmsgcred nxt_socket_cred_t;
#endif

#if (NXT_CRED_USECMSG)
#define NXT_OOB_RECV_SIZE \
(CMSG_SPACE(2 * sizeof(int)) + CMSG_SPACE(sizeof(nxt_socket_cred_t)))
#else
#define NXT_OOB_RECV_SIZE \
CMSG_SPACE(2 * sizeof(int))
#endif

#if (NXT_HAVE_MSGHDR_CMSGCRED)
#define NXT_OOB_SEND_SIZE \
(CMSG_SPACE(2 * sizeof(int)) + CMSG_SPACE(sizeof(nxt_socket_cred_t)))
#else
#define NXT_OOB_SEND_SIZE \
CMSG_SPACE(2 * sizeof(int))
#endif


typedef struct {
size_t size;
u_char buf[NXT_OOB_RECV_SIZE];
} nxt_recv_oob_t;


typedef struct {
size_t size;
u_char buf[NXT_OOB_SEND_SIZE];
} nxt_send_oob_t;


/**
* The nxt_sendmsg is a wrapper for sendmsg.
* The oob struct must be initialized using nxt_socket_msg_oob_init().
*/
NXT_EXPORT ssize_t nxt_sendmsg(nxt_socket_t s, nxt_iobuf_t *iob,
nxt_uint_t niob, const nxt_send_oob_t *oob);

/**
* The nxt_recvmsg is a wrapper for recvmsg.
* The oob buffer must be consumed by using nxt_socket_msg_oob_get().
*/
NXT_EXPORT ssize_t nxt_recvmsg(nxt_socket_t s,
nxt_iobuf_t *iob, nxt_uint_t niob, nxt_recv_oob_t *oob);


nxt_inline void
nxt_socket_msg_oob_init(nxt_send_oob_t *oob, int *fds)
{
int nfds;
struct cmsghdr *cmsg;

#if (NXT_HAVE_MSGHDR_CMSGCRED)
cmsg = (struct cmsghdr *) (oob->buf);
/*
* Fill all padding fields with 0.
* Code in Go 1.11 validate cmsghdr using padding field as part of len.
* See Cmsghdr definition and socketControlMessageHeaderAndData function.
*/
nxt_memzero(cmsg, sizeof(struct cmsghdr));

cmsg->cmsg_len = CMSG_LEN(sizeof(nxt_socket_cred_t));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = NXT_CRED_CMSGTYPE;

oob->size = CMSG_SPACE(sizeof(nxt_socket_cred_t));

#else
oob->size = 0;
#endif

nfds = (fds[0] != -1 ? 1 : 0) + (fds[1] != -1 ? 1 : 0);

if (nfds == 0) {
return;
}

cmsg = (struct cmsghdr *) (oob->buf + oob->size);

nxt_memzero(cmsg, sizeof(struct cmsghdr));

cmsg->cmsg_len = CMSG_LEN(nfds * sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;

/*
* nxt_memcpy() is used instead of simple
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
* dereferencing type-punned pointer will break strict-aliasing rules
*
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
nxt_memcpy(CMSG_DATA(cmsg), fds, nfds * sizeof(int));

oob->size += CMSG_SPACE(nfds * sizeof(int));
}


nxt_inline nxt_int_t
nxt_socket_msg_oob_get_fds(nxt_recv_oob_t *oob, nxt_fd_t *fd)
{
size_t size;
struct msghdr msg;
struct cmsghdr *cmsg;

msg.msg_control = oob->buf;
msg.msg_controllen = oob->size;

for (cmsg = CMSG_FIRSTHDR(&msg);
cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg))
{
size = cmsg->cmsg_len - CMSG_LEN(0);

if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
if (nxt_slow_path(size != sizeof(int) && size != 2 * sizeof(int))) {
return NXT_ERROR;
}

nxt_memcpy(fd, CMSG_DATA(cmsg), size);

return NXT_OK;
}
}

return NXT_OK;
}


nxt_inline nxt_int_t
nxt_socket_msg_oob_get(nxt_recv_oob_t *oob, nxt_fd_t *fd, nxt_pid_t *pid)
{
size_t size;
struct msghdr msg;
struct cmsghdr *cmsg;

if (oob->size == 0) {
return NXT_OK;
}

#if (NXT_CRED_USECMSG)
*pid = -1;
#endif

msg.msg_control = oob->buf;
msg.msg_controllen = oob->size;

for (cmsg = CMSG_FIRSTHDR(&msg);
cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg))
{
size = cmsg->cmsg_len - CMSG_LEN(0);

if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
if (nxt_slow_path(size != sizeof(int) && size != 2 * sizeof(int))) {
return NXT_ERROR;
}

nxt_memcpy(fd, CMSG_DATA(cmsg), size);

#if (!NXT_CRED_USECMSG)
break;
#endif
}

#if (NXT_CRED_USECMSG)
else if (cmsg->cmsg_level == SOL_SOCKET
&& cmsg->cmsg_type == NXT_CRED_CMSGTYPE)
{
nxt_socket_cred_t *creds;

if (nxt_slow_path(size != sizeof(nxt_socket_cred_t))) {
return NXT_ERROR;
}

creds = (nxt_socket_cred_t *) CMSG_DATA(cmsg);
*pid = NXT_CRED_GETPID(creds);
}
#endif
}

#if (NXT_CRED_USECMSG)
/* For platforms supporting credential passing, it's enforced */
if (nxt_slow_path(*pid == -1)) {
return NXT_ERROR;
}
#endif

return NXT_OK;
}


#endif /* _NXT_SOCKET_MSG_H_INCLUDED_ */
201 changes: 32 additions & 169 deletions src/nxt_socketpair.c
Expand Up @@ -5,7 +5,7 @@
*/

#include <nxt_main.h>

#include <nxt_socket_msg.h>

/*
* SOCK_SEQPACKET protocol is supported for AF_UNIX in Solaris 8 X/Open
Expand All @@ -20,12 +20,6 @@
#endif


static ssize_t nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob,
nxt_uint_t niob);
static ssize_t nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob,
nxt_uint_t niob);


nxt_int_t
nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair)
{
Expand All @@ -52,6 +46,24 @@ nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair)
goto fail;
}

#if NXT_HAVE_SOCKOPT_SO_PASSCRED
int enable_creds = 1;

if (nxt_slow_path(setsockopt(pair[0], SOL_SOCKET, SO_PASSCRED,
&enable_creds, sizeof(enable_creds)) == -1))
{
nxt_alert(task, "failed to set SO_PASSCRED %E", nxt_errno);
goto fail;
}

if (nxt_slow_path(setsockopt(pair[1], SOL_SOCKET, SO_PASSCRED,
&enable_creds, sizeof(enable_creds)) == -1))
{
nxt_alert(task, "failed to set SO_PASSCRED %E", nxt_errno);
goto fail;
}
#endif

return NXT_OK;

fail:
Expand All @@ -74,11 +86,14 @@ ssize_t
nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob,
nxt_uint_t niob)
{
ssize_t n;
nxt_err_t err;
ssize_t n;
nxt_err_t err;
nxt_send_oob_t oob;

nxt_socket_msg_oob_init(&oob, fd);

for ( ;; ) {
n = nxt_sendmsg(ev->fd, fd, iob, niob);
n = nxt_sendmsg(ev->fd, iob, niob, &oob);

err = (n == -1) ? nxt_socket_errno : 0;

Expand Down Expand Up @@ -123,19 +138,19 @@ nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob,


ssize_t
nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob,
nxt_uint_t niob)
nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_iobuf_t *iob, nxt_uint_t niob,
void *oob)
{
ssize_t n;
nxt_err_t err;

for ( ;; ) {
n = nxt_recvmsg(ev->fd, fd, iob, niob);
n = nxt_recvmsg(ev->fd, iob, niob, oob);

err = (n == -1) ? nxt_socket_errno : 0;

nxt_debug(ev->task, "recvmsg(%d, %FD, %FD, %ui): %z", ev->fd, fd[0],
fd[1], niob, n);
nxt_debug(ev->task, "recvmsg(%d, %ui, %uz): %z",
ev->fd, niob, ((nxt_recv_oob_t *) oob)->size, n);

if (n > 0) {
return n;
Expand Down Expand Up @@ -163,162 +178,10 @@ nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob,
continue;

default:
nxt_alert(ev->task, "recvmsg(%d, %p, %ui) failed %E",
ev->fd, fd, niob, err);
nxt_alert(ev->task, "recvmsg(%d, %ui) failed %E",
ev->fd, niob, err);

return NXT_ERROR;
}
}
}


#if (NXT_HAVE_MSGHDR_MSG_CONTROL)

/*
* Linux, FreeBSD, Solaris X/Open sockets,
* MacOSX, NetBSD, AIX, HP-UX X/Open sockets.
*/

static ssize_t
nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob)
{
size_t csize;
struct msghdr msg;
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int) * 2)];
} cmsg;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;
/* Flags are cleared just to suppress valgrind warning. */
msg.msg_flags = 0;

if (fd[0] != -1) {
csize = (fd[1] == -1) ? sizeof(int) : sizeof(int) * 2;

msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = CMSG_SPACE(csize);

#if (NXT_VALGRIND)
nxt_memzero(&cmsg, sizeof(cmsg));
#endif

cmsg.cm.cmsg_len = CMSG_LEN(csize);
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;

/*
* nxt_memcpy() is used instead of simple
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
* dereferencing type-punned pointer will break strict-aliasing rules
*
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
nxt_memcpy(CMSG_DATA(&cmsg.cm), fd, csize);

} else {
msg.msg_control = NULL;
msg.msg_controllen = 0;
}

return sendmsg(s, &msg, 0);
}


static ssize_t
nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob)
{
ssize_t n;
struct msghdr msg;
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int) * 2)];
} cmsg;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;
msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = sizeof(cmsg);

fd[0] = -1;
fd[1] = -1;

#if (NXT_VALGRIND)
nxt_memzero(&cmsg, sizeof(cmsg));
#endif

n = recvmsg(s, &msg, 0);

if (n > 0
&& cmsg.cm.cmsg_level == SOL_SOCKET
&& cmsg.cm.cmsg_type == SCM_RIGHTS)
{
if (cmsg.cm.cmsg_len == CMSG_LEN(sizeof(int))) {
nxt_memcpy(fd, CMSG_DATA(&cmsg.cm), sizeof(int));
}

if (cmsg.cm.cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
nxt_memcpy(fd, CMSG_DATA(&cmsg.cm), sizeof(int) * 2);
}
}

return n;
}

#else

/* Solaris 4.3BSD sockets. */

static ssize_t
nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob)
{
struct msghdr msg;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;

if (fd[0] != -1) {
msg.msg_accrights = (caddr_t) fd;
msg.msg_accrightslen = sizeof(int);

if (fd[1] != -1) {
msg.msg_accrightslen += sizeof(int);
}

} else {
msg.msg_accrights = NULL;
msg.msg_accrightslen = 0;
}

return sendmsg(s, &msg, 0);
}


static ssize_t
nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob)
{
struct msghdr msg;

fd[0] = -1;
fd[1] = -1;

msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iob;
msg.msg_iovlen = niob;
msg.msg_accrights = (caddr_t) fd;
msg.msg_accrightslen = sizeof(int) * 2;

return recvmsg(s, &msg, 0);
}

#endif
212 changes: 74 additions & 138 deletions src/nxt_unit.c

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/nxt_unit.h
Expand Up @@ -155,7 +155,7 @@ struct nxt_unit_callbacks_s {

/* Receive data on port id. Optional. */
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
void *buf, size_t buf_size, void *oob, size_t *oob_size);

int (*ready_handler)(nxt_unit_ctx_t *);
};
Expand Down