diff --git a/TODO b/TODO index f434a3376..e20b9f3f8 100644 --- a/TODO +++ b/TODO @@ -4,11 +4,6 @@ distro/packaging: https://bugzilla.redhat.com/show_bug.cgi?id=893015 link/host level: - - (issue) fix 0 byte read from local socket case handling - use flag to determine the behavior: - 0 byte -> process/send - 0 byte -> do callback - 0 byte -> ignore - (issue) simplify handling of DATA and HOSTINFO code paths in send/recv code - (issue) need bind to interface for dynamic ip local interfaces vs src ip address or find a way to autodetect the new ip on that interface diff --git a/kronosnetd/vty_cli_cmds.c b/kronosnetd/vty_cli_cmds.c index 670a563d2..ab979b428 100644 --- a/kronosnetd/vty_cli_cmds.c +++ b/kronosnetd/vty_cli_cmds.c @@ -1557,6 +1557,14 @@ static int knet_cmd_no_interface(struct knet_vty *vty) return err; } +static void sock_notify_fn(void *private_data, int datafd, int8_t chan, uint8_t tx_rx, int error, int errorno) +{ + struct knet_vty *vty = (struct knet_vty *)private_data; + + knet_vty_write(vty, "Error: received sock notify, datafd: %d channel: %d direction: %u error: %d errno: %d (%s)%s", + datafd, chan, tx_rx, error, errorno, strerror(errorno), telnet_newline); +} + static int knet_cmd_interface(struct knet_vty *vty) { int err = 0, paramlen = 0, paramoffset = 0, found = 0, requested_id, tapfd; @@ -1622,6 +1630,13 @@ static int knet_cmd_interface(struct knet_vty *vty) goto out_clean; } + if (knet_handle_enable_sock_notify(knet_iface->cfg_ring.knet_h, &vty, sock_notify_fn)) { + knet_vty_write(vty, "Error: Unable to add sock notify callback to to knet_handle %s%s", + strerror(errno), telnet_newline); + err = -1; + goto out_clean; + } + if (knet_handle_add_datafd(knet_iface->cfg_ring.knet_h, &tapfd, &channel) < 0) { knet_vty_write(vty, "Error: Unable to add tapfd to knet_handle %s%s", strerror(errno), telnet_newline); diff --git a/libknet/handle.c b/libknet/handle.c index 314c95adb..c4a1f61a3 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -691,6 +691,45 @@ int knet_handle_free(knet_handle_t knet_h) return 0; } +int knet_handle_enable_sock_notify(knet_handle_t knet_h, + void *sock_notify_fn_private_data, + void (*sock_notify_fn) ( + void *private_data, + int datafd, + int8_t channel, + uint8_t tx_rx, + int error, + int errorno)) +{ + int savederrno = 0, err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!sock_notify_fn) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_wrlock(&knet_h->list_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; + knet_h->sock_notify_fn = sock_notify_fn; + log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); + + pthread_rwlock_unlock(&knet_h->list_rwlock); + + return err; +} + int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) { int err = 0, savederrno = 0; @@ -725,6 +764,13 @@ int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) return -1; } + if (!knet_h->sock_notify_fn) { + log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!"); + savederrno = EINVAL; + err = -1; + goto out_unlock; + } + if (*datafd > 0) { for (i = 0; i < KNET_DATAFD_MAX; i++) { if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) { @@ -761,6 +807,7 @@ int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel) knet_h->sockfd[*channel].is_created = 0; knet_h->sockfd[*channel].is_socket = 0; + knet_h->sockfd[*channel].has_error = 0; if (*datafd > 0) { int sockopt; @@ -847,15 +894,17 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) goto out_unlock; } - memset(&ev, 0, sizeof(struct epoll_event)); + if (!knet_h->sockfd[channel].has_error) { + memset(&ev, 0, sizeof(struct epoll_event)); - if (epoll_ctl(knet_h->send_to_links_epollfd, - EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[i].is_created], &ev)) { - savederrno = errno; - err = -1; - log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", - knet_h->sockfd[channel].sockfd[knet_h->sockfd[i].is_created], strerror(savederrno)); - goto out_unlock; + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + goto out_unlock; + } } if (knet_h->sockfd[channel].is_created) { @@ -870,43 +919,6 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd) return err; } -int knet_handle_enable_sock_notify(knet_handle_t knet_h, - void *sock_notify_fn_private_data, - void (*sock_notify_fn) ( - void *private_data, - int datafd, - int8_t channel, - int error, - int errorno)) -{ - int savederrno = 0; - - if (!knet_h) { - errno = EINVAL; - return -1; - } - - savederrno = pthread_rwlock_wrlock(&knet_h->list_rwlock); - if (savederrno) { - log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", - strerror(savederrno)); - errno = savederrno; - return -1; - } - - knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data; - knet_h->sock_notify_fn = sock_notify_fn; - if (knet_h->sock_notify_fn) { - log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled"); - } else { - log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn disabled"); - } - - pthread_rwlock_unlock(&knet_h->list_rwlock); - - return 0; -} - int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd) { int err = 0, savederrno = 0; @@ -1241,7 +1253,7 @@ ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const return -1; } - if ((buff == NULL) || (buff_len == 0)) { + if ((buff == NULL) || (buff_len <= 0)) { errno = EINVAL; return -1; } @@ -1269,7 +1281,7 @@ ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, if ((!knet_h) || (buff == NULL) || - (buff_len == 0) || (buff_len > KNET_MAX_PACKET_SIZE)) { + (buff_len <= 0) || (buff_len > KNET_MAX_PACKET_SIZE)) { errno = EINVAL; return -1; } diff --git a/libknet/internals.h b/libknet/internals.h index 879c4a19d..c177e9307 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -107,6 +107,8 @@ struct knet_sock { int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ + int has_error; /* set to 1 if there were errors reading from the sock + * and socket has been removed from epoll */ }; struct knet_handle { @@ -190,6 +192,7 @@ struct knet_handle { void *private_data, int datafd, int8_t channel, + uint8_t tx_rx, int error, int errorno); int fini_in_progress; diff --git a/libknet/libknet.h b/libknet/libknet.h index 8bfae1808..5a2c77d78 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -44,6 +44,13 @@ #define KNET_MAX_HOST_LEN 64 #define KNET_MAX_PORT_LEN 6 +/* + * Some notifications can be generated either on TX or RX + */ + +#define KNET_NOTIFY_TX 0 +#define KNET_NOTIFY_RX 1 + typedef struct knet_handle *knet_handle_t; /* @@ -95,9 +102,56 @@ knet_handle_t knet_handle_new(uint16_t host_id, int knet_handle_free(knet_handle_t knet_h); +/* + * knet_handle_enable_sock_notify + * + * knet_h - pointer to knet_handle_t + * + * sock_notify_fn_private_data + * void pointer to data that can be used to identify + * the callback. + * + * sock_notify_fn + * A callback function that is invoked every time + * a socket in the datafd pool will report an error (-1) + * or an end of read (0) (see socket.7). + * This function MUST NEVER block or add substantial delays. + * The callback is invoked in an internal unlocked area + * to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd + * to swap/replace the bad fd. + * if both err and errno are 0, it means that the socket + * has received a 0 byte packet (EOF?). + * The callback function must either remove the fd from knet + * (by calling knet_handle_remove_fd()) or dup a new fd in its place. + * Failure to do this can cause problems. + * + * knet_handle_enable_sock_notify returns: + * + * 0 on success + * -1 on error and errno is set. + */ + +int knet_handle_enable_sock_notify(knet_handle_t knet_h, + void *sock_notify_fn_private_data, + void (*sock_notify_fn) ( + void *private_data, + int datafd, + int8_t channel, + uint8_t tx_rx, + int error, + int errorno)); /* sorry! can't call it errno ;) */ + /* * knet_handle_add_datafd * + * IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify + * _MUST_ be set and be able to handle both errors (-1) and + * 0 bytes read / write from the provided datafd. + * On read error (< 0) from datafd, the socket is automatically + * removed from polling to avoid spinning on dead sockets. + * It is safe to call knet_handle_remove_datafd even on sockets + * that have been removed. + * * knet_h - pointer to knet_handle_t * * *datafd - read/write file descriptor. @@ -201,33 +255,6 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd); * -1 on error and errno is set. */ -int knet_handle_enable_sock_notify(knet_handle_t knet_h, - void *sock_notify_fn_private_data, - void (*sock_notify_fn) ( - void *private_data, - int datafd, - int8_t channel, - int error, - int errorno)); /* sorry! can't call it errno ;) */ - -/* - * knet_handle_get_channel - * - * knet_h - pointer to knet_handle_t - * - * datafd - file descriptor to search - * - * *channel - will contain the result - * - * knet_handle_get_channel returns: - * - * 0 on success - * and *channel will contain the results - * - * -1 on error and errno is set. - * and *channel content is meaningless - */ - int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel); /* @@ -328,9 +355,6 @@ ssize_t knet_send(knet_handle_t knet_h, * -1 on error and errno is set. */ -#define KNET_DST_HOST_FILTER_TX 0 -#define KNET_DST_HOST_FILTER_RX 1 - int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( diff --git a/libknet/ping_test.c b/libknet/ping_test.c index 8048090f8..079fd915c 100644 --- a/libknet/ping_test.c +++ b/libknet/ping_test.c @@ -297,10 +297,13 @@ static void host_notify(void *private_data, uint16_t host_id, uint8_t reachable, return; } -static void sock_notify(void *private_data, int datafd, int8_t chan, int error, int errorno) +static void sock_notify(void *private_data, int datafd, int8_t chan, uint8_t tx_rx, int error, int errorno) { - printf("Received sock notify, datafd: %d channel: %d error: %d errno: %d (%s)\n", - datafd, chan, error, errorno, strerror(errorno)); + printf("Received sock notify, datafd: %d channel: %d direction: %u error: %d errno: %d (%s)\n", + datafd, chan, tx_rx, error, errorno, strerror(errorno)); + + printf("Something went wrong with our sockets!\n"); + exit(EXIT_FAILURE); } static void recv_data(knet_handle_t khandle, int inchannel, int has_crypto) diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c index fe8ff147e..8ac2c7d42 100644 --- a/libknet/threads_send_recv.c +++ b/libknet/threads_send_recv.c @@ -117,7 +117,7 @@ static void _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inl knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, - KNET_DST_HOST_FILTER_TX, + KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, @@ -331,7 +331,7 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; - int savederrno, docallback = 0, err = 0; + int savederrno, docallback = 0; if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) { log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock"); @@ -345,22 +345,9 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann inlen = readv(sockfd, &iov_in, 1); - if (inlen < 0) { - err = inlen; + if (inlen <= 0) { savederrno = errno; docallback = 1; - log_debug(knet_h, KNET_SUB_SEND_T, "Unrecoverable error: %s", strerror(errno)); - goto out_unlock; - } - - /* - * readv and later recvmmsg can receive 0 bytes packets. - * Stop processing them and generate a callback. - */ - if (inlen == 0) { - err = 0; - savederrno = 0; - docallback = 1; goto out_unlock; } @@ -370,32 +357,56 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann } else { msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT, NULL); if (msg_recv < 0) { - err = msg_recv; + inlen = msg_recv; savederrno = errno; docallback = 1; - log_err(knet_h, KNET_SUB_SEND_T, "No message received from recvmmsg: %s", strerror(errno)); goto out_unlock; } for (i = 0; i < msg_recv; i++) { - if (msg[i].msg_len == 0) { - err = 0; + inlen = msg[i].msg_len; + if (inlen == 0) { savederrno = 0; docallback = 1; goto out_unlock; + break; } knet_h->recv_from_sock_buf[i]->kh_type = type; - _parse_recv_from_sock(knet_h, i, msg[i].msg_len, channel); + _parse_recv_from_sock(knet_h, i, inlen, channel); } } out_unlock: pthread_rwlock_unlock(&knet_h->list_rwlock); - if ((docallback) && - (knet_h->sock_notify_fn)) { + + if (inlen < 0) { + struct epoll_event ev; + + if (pthread_rwlock_wrlock(&knet_h->list_rwlock) != 0) { + log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock"); + goto callback; + } + + memset(&ev, 0, sizeof(struct epoll_event)); + + if (epoll_ctl(knet_h->send_to_links_epollfd, + EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { + log_err(knet_h, KNET_SUB_SEND_T, "Unable to del datafd %d from linkfd epoll pool: %s", + knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); + } else { + knet_h->sockfd[channel].has_error = 1; + } + + pthread_rwlock_unlock(&knet_h->list_rwlock); + } + +callback: + + if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, - err, + KNET_NOTIFY_TX, + inlen, savederrno); } } @@ -760,7 +771,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, struct sockaddr_storage knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, - KNET_DST_HOST_FILTER_RX, + KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, @@ -795,7 +806,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, struct sockaddr_storage if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_LINK_T, - "recieved packet fo channel %d but there is no local sock connected", + "received packet for channel %d but there is no local sock connected", channel); return; } @@ -804,10 +815,18 @@ static void _parse_recv_from_links(knet_handle_t knet_h, struct sockaddr_storage iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; - if (writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1) == iov_out[0].iov_len) { + outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); + if (outlen <= 0) { + knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, + knet_h->sockfd[channel].sockfd[0], + channel, + KNET_NOTIFY_RX, + outlen, + errno); + return; + } + if (outlen == iov_out[0].iov_len) { _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0); - } else { - log_debug(knet_h, KNET_SUB_LINK_T, "Packet has not been delivered"); } } else { /* HOSTINFO */ _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0);