From 2052faab4376107fe99e8a3f955bbcbfa426f648 Mon Sep 17 00:00:00 2001 From: "Fabio M. Di Nitto" Date: Sun, 6 Aug 2017 06:47:40 +0200 Subject: [PATCH] [compress] add userdata compression support - implement generic plugin for compression - add zlib compression support Signed-off-by: Fabio M. Di Nitto --- configure.ac | 5 + libknet/Makefile.am | 9 +- libknet/compress.c | 123 ++++++++++++++++++ libknet/compress.h | 49 ++++++++ libknet/compress_zlib.c | 128 +++++++++++++++++++ libknet/compress_zlib.h | 32 +++++ libknet/handle.c | 57 +++++++++ libknet/internals.h | 9 ++ libknet/libknet.h | 48 ++++++- libknet/logging.c | 2 + libknet/onwire.h | 3 +- libknet/tests/api-check.mk | 4 + libknet/tests/api_knet_handle_compress.c | 151 +++++++++++++++++++++++ libknet/tests/knet_bench.c | 38 +++++- libknet/threads_rx.c | 24 ++++ libknet/threads_tx.c | 28 +++++ 16 files changed, 700 insertions(+), 10 deletions(-) create mode 100644 libknet/compress.c create mode 100644 libknet/compress.h create mode 100644 libknet/compress_zlib.c create mode 100644 libknet/compress_zlib.h create mode 100644 libknet/tests/api_knet_handle_compress.c diff --git a/configure.ac b/configure.ac index 732d082a4..f96161f4b 100644 --- a/configure.ac +++ b/configure.ac @@ -134,8 +134,13 @@ AC_CHECK_LIB([pthread], [pthread_create]) AC_CHECK_LIB([m], [ceil]) AC_CHECK_LIB([rt], [clock_gettime]) +# crypto libraries checks PKG_CHECK_MODULES([nss],[nss]) +# compress libraries checks +# zlib is cheap because it's pulled in by nss +PKG_CHECK_MODULES([zlib], [zlib]) + # Checks for header files. AC_CHECK_HEADERS([fcntl.h]) AC_CHECK_HEADERS([stdlib.h]) diff --git a/libknet/Makefile.am b/libknet/Makefile.am index 29719ec75..c8067f083 100644 --- a/libknet/Makefile.am +++ b/libknet/Makefile.am @@ -25,6 +25,8 @@ LIBS = sources = \ common.c \ compat.c \ + compress.c \ + compress_zlib.c \ crypto.c \ handle.c \ host.c \ @@ -52,6 +54,8 @@ pkgconfig_DATA = libknet.pc noinst_HEADERS = \ common.h \ compat.h \ + compress.h \ + compress_zlib.h \ crypto.h \ host.h \ internals.h \ @@ -72,7 +76,7 @@ lib_LTLIBRARIES = libknet.la libknet_la_SOURCES = $(sources) -libknet_la_CFLAGS = $(nss_CFLAGS) +libknet_la_CFLAGS = $(nss_CFLAGS) $(zlib_CFLAGS) EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE) @@ -80,4 +84,5 @@ libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \ --export-dynamic \ -version-number $(libversion) -libknet_la_LIBADD = $(nss_LIBS) -lrt -lpthread -lm +libknet_la_LIBADD = $(nss_LIBS) $(zlib_LIBS) \ + -lrt -lpthread -lm diff --git a/libknet/compress.c b/libknet/compress.c new file mode 100644 index 000000000..a12f9392a --- /dev/null +++ b/libknet/compress.c @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. + * + * Author: Fabio M. Di Nitto + * + * This software licensed under GPL-2.0+, LGPL-2.0+ + */ + +#include "config.h" + +#include +#include +#include + +#include "internals.h" +#include "compress.h" +#include "logging.h" +#include "compress_zlib.h" + +/* + * internal module switch data + */ + +/* + * DO NOT CHANGE ORDER HERE OR ONWIRE COMPATIBILITY + * WILL BREAK! + * + * add after zlib and before NULL/NULL/NULL. + */ + +compress_model_t compress_modules_cmds[] = { + { "none", NULL, NULL, NULL }, + { "zlib", zlib_val_level, zlib_compress, zlib_decompress }, + { NULL, NULL, NULL, NULL }, +}; + +static int get_model(const char *model) +{ + int idx = 0; + + while (compress_modules_cmds[idx].model_name != NULL) { + if (!strcmp(compress_modules_cmds[idx].model_name, model)) + return idx; + idx++; + } + return -1; +} + +static int get_max_model(void) +{ + int idx = 0; + while (compress_modules_cmds[idx].model_name != NULL) { + idx++; + } + return idx - 1; +} + +static int val_level( + knet_handle_t knet_h, + int compress_model, + int compress_level) +{ + return compress_modules_cmds[compress_model].val_level(knet_h, compress_level); +} + +int compress_init( + knet_handle_t knet_h, + struct knet_handle_compress_cfg *knet_handle_compress_cfg) +{ + int cmp_model; + + knet_h->compress_max_model = get_max_model(); + if (!knet_handle_compress_cfg) { + return 0; + } + + log_debug(knet_h, KNET_SUB_COMPRESS, + "Initizializing compress module [%s/%d]", + knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level); + + cmp_model = get_model(knet_handle_compress_cfg->compress_model); + if (cmp_model < 0) { + log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model); + errno = EINVAL; + return -1; + } + + if (cmp_model > 0) { + if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) { + log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s", + knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model); + errno = EINVAL; + return -1; + } + + } + + knet_h->compress_model = cmp_model; + knet_h->compress_level = knet_handle_compress_cfg->compress_level; + + return 0; +} + +int compress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len) +{ + return compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); +} + +int decompress( + knet_handle_t knet_h, + int compress_model, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len) +{ + return compress_modules_cmds[compress_model].decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len); +} diff --git a/libknet/compress.h b/libknet/compress.h new file mode 100644 index 000000000..cb835396b --- /dev/null +++ b/libknet/compress.h @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. + * + * Author: Fabio M. Di Nitto + * + * This software licensed under GPL-2.0+, LGPL-2.0+ + */ + +#ifndef __KNET_COMPRESS_H__ +#define __KNET_COMPRESS_H__ + +#include "internals.h" + +typedef struct { + const char *model_name; + int (*val_level)(knet_handle_t knet_h, + int compress_level); + int (*compress) (knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); + int (*decompress)(knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); +} compress_model_t; + +int compress_init( + knet_handle_t knet_h, + struct knet_handle_compress_cfg *knet_handle_compress_cfg); + +int compress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); + +int decompress( + knet_handle_t knet_h, + int compress_model, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); + +#endif diff --git a/libknet/compress_zlib.c b/libknet/compress_zlib.c new file mode 100644 index 000000000..0256e081b --- /dev/null +++ b/libknet/compress_zlib.c @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. + * + * Author: Fabio M. Di Nitto + * + * This software licensed under GPL-2.0+, LGPL-2.0+ + */ + +#include "config.h" + +#include +#include +#include +#include + +#include "internals.h" +#include "compress_zlib.h" +#include "logging.h" + +int zlib_val_level( + knet_handle_t knet_h, + int compress_level) +{ + if (compress_level < 0) { + log_err(knet_h, KNET_SUB_ZLIBCOMP, "zlib does not support negative compression level %d", + compress_level); + return -1; + } + if (compress_level > 9) { + log_err(knet_h, KNET_SUB_ZLIBCOMP, "zlib does not support compression level higher than 9"); + return -1; + } + if (compress_level == 0) { + log_warn(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress level 0 does NOT perform any compression"); + } + return 0; +} + +int zlib_compress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len) +{ + int zerr = 0, err = 0; + int savederrno = 0; + uLongf destLen = *buf_out_len; + + zerr = compress2(buf_out, &destLen, + buf_in, buf_in_len, + knet_h->compress_level); + + *buf_out_len = destLen; + + switch(zerr) { + case Z_OK: + err = 0; + savederrno = 0; + break; + case Z_MEM_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress mem error"); + err = -1; + savederrno = ENOMEM; + break; + case Z_BUF_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress buf error"); + err = -1; + savederrno = ENOBUFS; + break; + case Z_STREAM_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress stream error"); + err = -1; + savederrno = EINVAL; + break; + default: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress unknown error"); + break; + } + + errno = savederrno; + return err; +} + +int zlib_decompress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len) +{ + int zerr = 0, err = 0; + int savederrno = 0; + uLongf destLen = *buf_out_len; + + zerr = uncompress(buf_out, &destLen, + buf_in, buf_in_len); + + *buf_out_len = destLen; + + switch(zerr) { + case Z_OK: + err = 0; + savederrno = 0; + break; + case Z_MEM_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress mem error"); + err = -1; + savederrno = ENOMEM; + break; + case Z_BUF_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress buf error"); + err = -1; + savederrno = ENOBUFS; + break; + case Z_DATA_ERROR: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress data error"); + err = -1; + savederrno = EINVAL; + break; + default: + log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib unknown error"); + break; + } + + errno = savederrno; + return err; +} diff --git a/libknet/compress_zlib.h b/libknet/compress_zlib.h new file mode 100644 index 000000000..4bd210175 --- /dev/null +++ b/libknet/compress_zlib.h @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. + * + * Author: Fabio M. Di Nitto + * + * This software licensed under GPL-2.0+, LGPL-2.0+ + */ + +#ifndef __KNET_COMPRESS_ZLIB_H__ +#define __KNET_COMPRESS_ZLIB H__ + +#include "internals.h" + +int zlib_val_level( + knet_handle_t knet_h, + int compress_level); + +int zlib_compress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); + +int zlib_decompress( + knet_handle_t knet_h, + const unsigned char *buf_in, + const ssize_t buf_in_len, + unsigned char *buf_out, + ssize_t *buf_out_len); + +#endif diff --git a/libknet/handle.c b/libknet/handle.c index e538b8357..4a84a77c9 100644 --- a/libknet/handle.c +++ b/libknet/handle.c @@ -21,6 +21,7 @@ #include "internals.h" #include "crypto.h" +#include "compress.h" #include "compat.h" #include "common.h" #include "threads_common.h" @@ -235,6 +236,24 @@ static int _init_buffers(knet_handle_t knet_h) } memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT); + knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS); + if (!knet_h->recv_from_links_buf_decompress) { + savederrno = errno; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s", + strerror(savederrno)); + goto exit_fail; + } + memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS); + + knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS); + if (!knet_h->send_to_links_buf_compress) { + savederrno = errno; + log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s", + strerror(savederrno)); + goto exit_fail; + } + memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS); + memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker)); return 0; @@ -257,6 +276,8 @@ static void _destroy_buffers(knet_handle_t knet_h) free(knet_h->recv_from_links_buf[i]); } + free(knet_h->recv_from_links_buf_decompress); + free(knet_h->send_to_links_buf_compress); free(knet_h->recv_from_sock_buf); free(knet_h->recv_from_links_buf_decrypt); free(knet_h->recv_from_links_buf_crypt); @@ -606,6 +627,11 @@ knet_handle_t knet_handle_new(knet_node_id_t host_id, goto exit_fail; } + if (compress_init(knet_h, NULL)) { + savederrno = EINVAL; + goto exit_fail; + } + /* * create epoll fds */ @@ -1300,6 +1326,37 @@ int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet return err; } +int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg) +{ + int savederrno = 0; + int err = 0; + + if (!knet_h) { + errno = EINVAL; + return -1; + } + + if (!knet_handle_compress_cfg) { + errno = EINVAL; + return -1; + } + + savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); + if (savederrno) { + log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s", + strerror(savederrno)); + errno = savederrno; + return -1; + } + + err = compress_init(knet_h, knet_handle_compress_cfg); + savederrno = errno; + + pthread_rwlock_unlock(&knet_h->global_rwlock); + errno = savederrno; + return err; +} + ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; diff --git a/libknet/internals.h b/libknet/internals.h index 8f330dde5..129c39a3c 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -19,9 +19,13 @@ #include "compat.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE + #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD +#define KNET_DATABUFSIZE_COMPRESS_PAD 1024 +#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD + #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX @@ -178,6 +182,11 @@ struct knet_handle { unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; + int compress_model; + int compress_max_model; + int compress_level; + unsigned char *recv_from_links_buf_decompress; + unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; diff --git a/libknet/libknet.h b/libknet/libknet.h index 9b5a8973b..a5842a105 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -593,6 +593,48 @@ struct knet_handle_crypto_cfg { int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg); +/* + * knet_handle_compress + * + * knet_h - pointer to knet_handle_t + * + * knet_handle_compress_cfg - + * pointer to a knet_handle_compress_cfg structure + * + * compress_model should contain the mode name. + * Currently only "zlib" is supported. + * Setting to "none" will disable compress. + * + * compress_level many compression libraries adopted + * the standard values ranging from 0 for + * fast compression to 9 for high compression. + * Please refere to the library man pages + * on how to be set this value, as it is passed + * unmodified to the compression algorithm. + * + * Implementation notes: + * - it is possible to enable/disable compression at any time. + * - nodes can be using different compression algorithm at any time. + * - knet does NOT implement compression algorithm directly. it relies + * on external libraries for this functionality. Please read + * the libraries man pages to figure out which algorithm/compression + * level is best for the data you are planning to transmit. + * + * knet_handle_compress returns: + * + * 0 on success + * -1 on error and errno is set. EINVAL means that either the model or the + * level are not supported. + */ + +struct knet_handle_compress_cfg { + char compress_model[16]; + int compress_level; +}; + +int knet_handle_compress(knet_handle_t knet_h, + struct knet_handle_compress_cfg *knet_handle_compress_cfg); + /* * host structs/API calls */ @@ -864,8 +906,6 @@ int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id, * host_id 1 link_id 1 _must_ connect IP4 to IP3. * We might be able to lift this restriction in future, by using * other data to determine src/dst link_id, but for now, deal with it. - * - * - */ /* @@ -906,7 +946,6 @@ int knet_strtoaddr(const char *host, const char *port, * (recommended size: KNET_MAX_PORT_LEN) * * knet_strtoaddr returns same error codes as getnameinfo - * */ int knet_addrtostr(const struct sockaddr_storage *ss, socklen_t sslen, @@ -1460,6 +1499,7 @@ int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l #define KNET_SUB_LINK 4 /* link add/del/modify */ #define KNET_SUB_TRANSPORT 5 /* Transport common */ #define KNET_SUB_CRYPTO 6 /* crypto.c config generic layer */ +#define KNET_SUB_COMPRESS 7 /* compress.c config generic layer */ #define KNET_SUB_FILTER 19 /* allocated for users to log from dst_filter */ @@ -1476,6 +1516,8 @@ int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t l #define KNET_SUB_NSSCRYPTO 60 /* nsscrypto.c */ +#define KNET_SUB_ZLIBCOMP 70 /* zlibcompress.c */ + #define KNET_SUB_UNKNOWN 254 #define KNET_MAX_SUBSYSTEMS KNET_SUB_UNKNOWN + 1 diff --git a/libknet/logging.c b/libknet/logging.c index 1fb6d3461..7d08926fc 100644 --- a/libknet/logging.c +++ b/libknet/logging.c @@ -33,6 +33,7 @@ static struct pretty_names subsystem_names[] = { "link", KNET_SUB_LINK }, { "transport", KNET_SUB_TRANSPORT }, { "crypto", KNET_SUB_CRYPTO }, + { "compress", KNET_SUB_COMPRESS }, { "filter", KNET_SUB_FILTER }, { "dstcache", KNET_SUB_DSTCACHE }, { "heartbeat", KNET_SUB_HEARTBEAT }, @@ -43,6 +44,7 @@ static struct pretty_names subsystem_names[] = { "udp", KNET_SUB_TRANSP_UDP }, { "sctp", KNET_SUB_TRANSP_SCTP }, { "nsscrypto", KNET_SUB_NSSCRYPTO }, + { "zlibcomp", KNET_SUB_ZLIBCOMP }, { "unknown", KNET_SUB_UNKNOWN } /* unknown MUST always be last in this array */ }; diff --git a/libknet/onwire.h b/libknet/onwire.h index 5d51701c3..50de9af9b 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -100,8 +100,8 @@ typedef uint16_t seq_num_t; struct knet_header_payload_data { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pkcts */ + uint8_t khp_data_compress; /* identify if user data are compressed */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ - uint8_t khp_data_pad2; uint8_t khp_data_bcast; /* data destination bcast/ucast */ uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */ uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */ @@ -178,6 +178,7 @@ struct knet_header { #define khp_data_userdata kh_payload.khp_data.khp_data_userdata #define khp_data_bcast kh_payload.khp_data.khp_data_bcast #define khp_data_channel kh_payload.khp_data.khp_data_channel +#define khp_data_compress kh_payload.khp_data.khp_data_compress #define khp_ping_link kh_payload.khp_ping.khp_ping_link #define khp_ping_time kh_payload.khp_ping.khp_ping_time diff --git a/libknet/tests/api-check.mk b/libknet/tests/api-check.mk index 72ff8a2eb..f1cc8ff53 100644 --- a/libknet/tests/api-check.mk +++ b/libknet/tests/api-check.mk @@ -9,6 +9,7 @@ api_checks = \ api_knet_handle_new_test \ api_knet_handle_free_test \ + api_knet_handle_compress_test \ api_knet_handle_crypto_test \ api_knet_handle_setfwd_test \ api_knet_handle_enable_filter_test \ @@ -68,6 +69,9 @@ api_knet_handle_new_test_SOURCES = api_knet_handle_new.c \ api_knet_handle_free_test_SOURCES = api_knet_handle_free.c \ test-common.c +api_knet_handle_compress_test_SOURCES = api_knet_handle_compress.c \ + test-common.c + api_knet_handle_crypto_test_SOURCES = api_knet_handle_crypto.c \ test-common.c diff --git a/libknet/tests/api_knet_handle_compress.c b/libknet/tests/api_knet_handle_compress.c new file mode 100644 index 000000000..0a6e90f72 --- /dev/null +++ b/libknet/tests/api_knet_handle_compress.c @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2016 Red Hat, Inc. All rights reserved. + * + * Authors: Fabio M. Di Nitto + * + * This software licensed under GPL-2.0+, LGPL-2.0+ + */ + +#include "config.h" + +#include +#include +#include +#include +#include + +#include "libknet.h" + +#include "internals.h" +#include "test-common.h" + +static void test(void) +{ + knet_handle_t knet_h; + int logfds[2]; + struct knet_handle_compress_cfg knet_handle_compress_cfg; + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + + printf("Test knet_handle_compress incorrect knet_h\n"); + + if ((!knet_handle_compress(NULL, &knet_handle_compress_cfg)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); + exit(FAIL); + } + + setup_logpipes(logfds); + + knet_h = knet_handle_new(1, logfds[1], KNET_LOG_DEBUG); + + if (!knet_h) { + printf("knet_handle_new failed: %s\n", strerror(errno)); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with invalid cfg\n"); + + if ((!knet_handle_compress(knet_h, NULL)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid cfg or returned incorrect error: %s\n", strerror(errno)); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with un-initialized cfg\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + + if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid un-initialized cfg\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with none compress model (disable compress)\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + strncpy(knet_handle_compress_cfg.compress_model, "none", sizeof(knet_handle_compress_cfg.compress_model) - 1); + + if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) { + printf("knet_handle_compress did not accept none compress mode cfg\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with zlib compress and negative level\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); + knet_handle_compress_cfg.compress_level = -1; + + if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid (-1) compress level for zlib\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with zlib compress and excessive compress level\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); + knet_handle_compress_cfg.compress_level = 10; + + if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) { + printf("knet_handle_compress accepted invalid (10) compress level for zlib\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + printf("Test knet_handle_compress with zlib compress model normal compress level)\n"); + + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1); + knet_handle_compress_cfg.compress_level = 1; + + if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) { + printf("knet_handle_compress did not accept zlib compress mode with compress level 1 cfg\n"); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + + flush_logs(logfds[0], stdout); + + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); +} + +int main(int argc, char *argv[]) +{ + need_root(); + + test(); + + return PASS; +} diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index e6ca21c1e..d2406914a 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -42,6 +42,7 @@ static int broadcast_test = 1; static pthread_t rx_thread = (pthread_t)NULL; static char *rx_buf[PCKT_FRAG_MAX]; static int wait_for_perf_rx = 0; +static char *compresscfg = NULL; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -75,6 +76,8 @@ static void print_help(void) printf(" -d enable debug logs (default INFO)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); + printf(" -z [implementation]:[level] compress configuration. (default disabled)\n"); + printf(" Example: -z zlib:5\n"); printf(" -p [active|passive|rr] (default: passive)\n"); printf(" -P [udp|sctp] (default: udp) protocol (transport) to use\n"); printf(" -t [nodeid] This nodeid (required)\n"); @@ -213,10 +216,11 @@ static void setup_knet(int argc, char *argv[]) int wait = 1; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; + struct knet_handle_compress_cfg knet_handle_compress_cfg; memset(nodes, 0, sizeof(nodes)); - while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:P:h")) != EOF) { + while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); @@ -353,6 +357,13 @@ static void setup_knet(int argc, char *argv[]) case 'C': continous = 1; break; + case 'z': + if (compresscfg) { + printf("Error: -c can only be specified once\n"); + exit(FAIL); + } + compresscfg = optarg; + break; default: break; } @@ -421,6 +432,16 @@ static void setup_knet(int argc, char *argv[]) } } + if (compresscfg) { + memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg)); + snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":")); + knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":")); + if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) { + printf("Unable to configure compress\n"); + exit(FAIL); + } + } + if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); @@ -707,8 +728,17 @@ static void stop_rx_thread(void) static void send_ping_data(void) { - const char *buf = "Hello world!\x0"; - ssize_t len = strlen(buf); + char buf[65535]; + ssize_t len; + + memset(&buf, 0, sizeof(buf)); + snprintf(buf, sizeof(buf), "Hello world!"); + + if (compresscfg) { + len = sizeof(buf); + } else { + len = strlen(buf); + } if (knet_send(knet_h, buf, len, channel) != len) { printf("Error sending hello world: %s\n", strerror(errno)); @@ -850,7 +880,7 @@ static void send_perf_data_by_time(void) char ctrl_message[16]; int sent_msgs; int i; - uint32_t packetsize = 64; + uint32_t packetsize = 65536; struct timespec clock_start, clock_end; unsigned long long time_diff = 0; diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 0a3ea700f..353002451 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -16,6 +16,7 @@ #include #include "compat.h" +#include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" @@ -355,6 +356,29 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc len = len + KNET_HEADER_DATA_SIZE; } + if (inbuf->khp_data_compress) { + ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; + + if (inbuf->khp_data_compress > knet_h->compress_max_model) { + log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unsupported compression method. Dropping"); + return; + } + + err = decompress(knet_h, inbuf->khp_data_compress, + (const unsigned char *)inbuf->khp_data_userdata, + len - KNET_HEADER_DATA_SIZE, + knet_h->recv_from_links_buf_decompress, + &decmp_outlen); + if (!err) { + memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); + len = decmp_outlen + KNET_HEADER_DATA_SIZE; + } else { + log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", + err, strerror(errno)); + return; + } + } + if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 2913727b9..41724d4be 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -17,6 +17,7 @@ #include #include "compat.h" +#include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" @@ -148,6 +149,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t cha int msgs_to_send, msg_idx; unsigned int i; int send_local = 0; + int data_compressed = 0; inbuf = knet_h->recv_from_sock_buf; @@ -320,6 +322,26 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t cha temp_data_mtu = knet_h->data_mtu; } + /* + * compress data + */ + if (knet_h->compress_model > 0) { + ssize_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; + + err = compress(knet_h, + (const unsigned char *)inbuf->khp_data_userdata, inlen, + knet_h->send_to_links_buf_compress, &cmp_outlen); + if (err < 0) { + log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno)); + } else { + if (cmp_outlen < inlen) { + memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); + inlen = cmp_outlen; + data_compressed = 1; + } + } + } + /* * prepare the outgoing buffers */ @@ -330,6 +352,11 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t cha inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; + if (data_compressed) { + inbuf->khp_data_compress = knet_h->compress_model; + } else { + inbuf->khp_data_compress = 0; + } if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); @@ -392,6 +419,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t cha knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; + knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; frag_len = frag_len - temp_data_mtu; frag_idx++;