Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZSTD and GZIP/DEFLATE streaming support #16268

Merged
merged 46 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f6c8747
move compression header to compression.h
ktsaou Oct 24, 2023
339dc18
prototype with zstd compression
ktsaou Oct 24, 2023
33eacec
updated capabilities
ktsaou Oct 24, 2023
ae954cd
no need for resetting compression
ktsaou Oct 24, 2023
a83628d
left-over reset function
ktsaou Oct 24, 2023
00bf15c
use ZSTD_compressStream() instead of ZSTD_compressStream2() for backw…
ktsaou Oct 24, 2023
b60ef0c
remove call to LZ4_decoderRingBufferSize()
ktsaou Oct 24, 2023
94f9a90
debug signature failures
ktsaou Oct 24, 2023
5982933
fix the buffers of lz4
ktsaou Oct 24, 2023
3109847
fix decoding of zstd
ktsaou Oct 24, 2023
ee7f014
detect compression based on initialization; prefer ZSTD over LZ4
ktsaou Oct 24, 2023
f4855c4
allow both lz4 and zstd
ktsaou Oct 24, 2023
37d23e1
initialize zstd streams
ktsaou Oct 24, 2023
ecc64c9
define missing ZSTD_CLEVEL_DEFAULT
ktsaou Oct 24, 2023
4b09e4d
log zero compressed size
ktsaou Oct 24, 2023
c0e25ef
debug log
ktsaou Oct 24, 2023
3bbb899
flush compression buffer
ktsaou Oct 24, 2023
7067bc8
add sender compression statistics
ktsaou Oct 24, 2023
20fdd6d
removed debugging messages
ktsaou Oct 25, 2023
d5db93a
do not fail if zstd is not available
ktsaou Oct 25, 2023
9fddaf7
cleanup and buildinfo
ktsaou Oct 25, 2023
4d67fa7
fix max message size, use zstd level 1, add compressio ratio reporting
ktsaou Oct 25, 2023
81fedce
use compression level 1
ktsaou Oct 25, 2023
5e6bbda
fix ratio title
ktsaou Oct 25, 2023
84d11de
better compression error logs
ktsaou Oct 25, 2023
031ed09
for backwards compatibility use buffers of COMPRESSION_MAX_CHUNK
ktsaou Oct 25, 2023
d75be4b
switch to default compression level
ktsaou Oct 25, 2023
0867586
additional streaming error conditions detection
ktsaou Oct 25, 2023
767c375
do not expose compression stats when compression is not enabled
ktsaou Oct 25, 2023
c9617da
test for the right lz4 functions
ktsaou Oct 25, 2023
f466017
moved lz4 and zstd to their own files
ktsaou Oct 25, 2023
fd1c057
add gzip streaming compression
ktsaou Oct 25, 2023
e4bc89c
gzip error handling
ktsaou Oct 25, 2023
1531454
added unittest for streaming compression
ktsaou Oct 25, 2023
e993608
eliminate a copy of the uncompressed data during zstd compression
ktsaou Oct 25, 2023
96f5669
eliminate not needed zstd allocations
ktsaou Oct 25, 2023
0f20469
cleanup
ktsaou Oct 25, 2023
7ecb709
decode gzip with Z_SYNC_FLUSH
ktsaou Oct 25, 2023
62cfd63
set the decoding gzip algorithm
ktsaou Oct 25, 2023
21bb837
user configuration for compression levels and compression algorithms …
ktsaou Oct 26, 2023
45fe5b4
fix exclusion of not preferred compressions
ktsaou Oct 26, 2023
7039d7b
remove now obsolete compression define, since gzip is always available
ktsaou Oct 26, 2023
6d4ec73
rename compression algorithms order in stream.conf
ktsaou Oct 26, 2023
ed2dfd3
move common checks in compression.c
ktsaou Oct 26, 2023
a1af178
cleanup
ktsaou Oct 26, 2023
cddea85
backwards compatible error checking
ktsaou Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ API_PLUGIN_FILES = \
STREAMING_PLUGIN_FILES = \
streaming/rrdpush.c \
streaming/compression.c \
streaming/compression.h \
streaming/compression_gzip.c \
streaming/compression_gzip.h \
streaming/compression_lz4.c \
streaming/compression_lz4.h \
streaming/compression_zstd.c \
streaming/compression_zstd.h \
streaming/sender.c \
streaming/receiver.c \
streaming/replication.h \
Expand Down Expand Up @@ -1143,6 +1150,7 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_MQTT_LIBS) \
$(OPTIONAL_UV_LIBS) \
$(OPTIONAL_LZ4_LIBS) \
$(OPTIONAL_ZSTD_LIBS) \
$(OPTIONAL_DATACHANNEL_LIBS) \
libjudy.a \
$(OPTIONAL_SSL_LIBS) \
Expand Down
19 changes: 16 additions & 3 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,28 @@ OPTIONAL_UV_LIBS="${UV_LIBS}"

AC_CHECK_LIB(
[lz4],
[LZ4_initStream],
[LZ4_createStream],
[LZ4_LIBS_FAST="-llz4"]
)

AC_CHECK_LIB(
[lz4],
[LZ4_compress_default],
[LZ4_compress_fast_continue],
[LZ4_LIBS="-llz4"]
)

# -----------------------------------------------------------------------------
# zstd

AC_CHECK_LIB([zstd], [ZSTD_createCStream, ZSTD_compressStream, ZSTD_decompressStream, ZSTD_createDStream],
[LIBZSTD_FOUND=yes],
[LIBZSTD_FOUND=no])

if test "x$LIBZSTD_FOUND" = "xyes"; then
AC_DEFINE([ENABLE_ZSTD], [1], [libzstd usability])
OPTIONAL_ZSTD_LIBS="-lzstd"
fi

# -----------------------------------------------------------------------------
# zlib

Expand Down Expand Up @@ -702,7 +714,7 @@ if test "${enable_lz4}" != "no"; then
AC_TRY_LINK(
[ #include <lz4.h> ],
[
LZ4_stream_t* stream = LZ4_initStream(NULL, 0);
LZ4_stream_t* stream = LZ4_createStream();
],
[ enable_lz4="yes"],
[ enable_lz4="no" ]
Expand Down Expand Up @@ -1900,6 +1912,7 @@ AC_SUBST([OPTIONAL_MATH_LIBS])
AC_SUBST([OPTIONAL_DATACHANNEL_LIBS])
AC_SUBST([OPTIONAL_UV_LIBS])
AC_SUBST([OPTIONAL_LZ4_LIBS])
AC_SUBST([OPTIONAL_ZSTD_LIBS])
AC_SUBST([OPTIONAL_SSL_LIBS])
AC_SUBST([OPTIONAL_JSONC_LIBS])
AC_SUBST([OPTIONAL_YAML_LIBS])
Expand Down
50 changes: 46 additions & 4 deletions daemon/buildinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef enum __attribute__((packed)) {
BIB_FEATURE_CLOUD,
BIB_FEATURE_HEALTH,
BIB_FEATURE_STREAMING,
BIB_FEATURE_BACKFILLING,
BIB_FEATURE_REPLICATION,
BIB_FEATURE_STREAMING_COMPRESSION,
BIB_FEATURE_CONTEXTS,
Expand All @@ -66,6 +67,7 @@ typedef enum __attribute__((packed)) {
BIB_CONNECTIVITY_NATIVE_HTTPS,
BIB_CONNECTIVITY_TLS_HOST_VERIFY,
BIB_LIB_LZ4,
BIB_LIB_ZSTD,
BIB_LIB_ZLIB,
BIB_LIB_JUDY,
BIB_LIB_DLIB,
Expand Down Expand Up @@ -484,6 +486,14 @@ static struct {
.json = "streaming",
.value = NULL,
},
[BIB_FEATURE_BACKFILLING] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "Back-filling (of higher database tiers)",
.json = "back-filling",
.value = NULL,
},
[BIB_FEATURE_REPLICATION] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
Expand All @@ -498,7 +508,7 @@ static struct {
.analytics = "Stream Compression",
.print = "Streaming and Replication Compression",
.json = "stream-compression",
.value = "none",
.value = NULL,
},
[BIB_FEATURE_CONTEXTS] = {
.category = BIC_FEATURE,
Expand Down Expand Up @@ -628,6 +638,14 @@ static struct {
.json = "lz4",
.value = NULL,
},
[BIB_LIB_ZSTD] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "ZSTD (fast, lossless compression algorithm)",
.json = "zstd",
.value = NULL,
},
[BIB_LIB_ZLIB] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
Expand Down Expand Up @@ -1029,6 +1047,23 @@ static void build_info_set_value(BUILD_INFO_SLOT slot, const char *value) {
BUILD_INFO[slot].value = value;
}

static void build_info_append_value(BUILD_INFO_SLOT slot, const char *value) {
size_t size = BUILD_INFO[slot].value ? strlen(BUILD_INFO[slot].value) + 1 : 0;
size += strlen(value);
char buf[size + 1];

if(BUILD_INFO[slot].value) {
strcpy(buf, BUILD_INFO[slot].value);
strcat(buf, " ");
strcat(buf, value);
}
else
strcpy(buf, value);

freez((void *)BUILD_INFO[slot].value);
BUILD_INFO[slot].value = strdupz(buf);
}

static void build_info_set_value_strdupz(BUILD_INFO_SLOT slot, const char *value) {
if(!value) value = "";
build_info_set_value(slot, strdupz(value));
Expand Down Expand Up @@ -1075,14 +1110,18 @@ __attribute__((constructor)) void initialize_build_info(void) {

build_info_set_status(BIB_FEATURE_HEALTH, true);
build_info_set_status(BIB_FEATURE_STREAMING, true);
build_info_set_status(BIB_FEATURE_BACKFILLING, true);
build_info_set_status(BIB_FEATURE_REPLICATION, true);

#ifdef ENABLE_RRDPUSH_COMPRESSION
build_info_set_status(BIB_FEATURE_STREAMING_COMPRESSION, true);
#ifdef ENABLE_LZ4
build_info_set_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");

#ifdef ENABLE_ZSTD
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "zstd");
#endif
#ifdef ENABLE_LZ4
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");
#endif
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "gzip");

build_info_set_status(BIB_FEATURE_CONTEXTS, true);
build_info_set_status(BIB_FEATURE_TIERING, true);
Expand Down Expand Up @@ -1117,6 +1156,9 @@ __attribute__((constructor)) void initialize_build_info(void) {
#ifdef ENABLE_LZ4
build_info_set_status(BIB_LIB_LZ4, true);
#endif
#ifdef ENABLE_ZSTD
build_info_set_status(BIB_LIB_ZSTD, true);
#endif

build_info_set_status(BIB_LIB_ZLIB, true);

Expand Down
7 changes: 7 additions & 0 deletions daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ int julytest(void);
int pluginsd_parser_unittest(void);
void replication_initialize(void);
void bearer_tokens_init(void);
int unittest_rrdpush_compressions(void);

int main(int argc, char **argv) {
// initialize the system clocks
Expand Down Expand Up @@ -1550,6 +1551,10 @@ int main(int argc, char **argv) {
unittest_running = true;
return pluginsd_parser_unittest();
}
else if(strcmp(optarg, "rrdpush_compressions_test") == 0) {
unittest_running = true;
return unittest_rrdpush_compressions();
}
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
optarg += strlen(createdataset_string);
unsigned history_seconds = strtoul(optarg, NULL, 0);
Expand Down Expand Up @@ -1901,6 +1906,8 @@ int main(int argc, char **argv) {
netdata_log_info("Netdata agent version \""VERSION"\" is starting");

ieee754_doubles = is_system_ieee754_double();
if(!ieee754_doubles)
globally_disabled_capabilities |= STREAM_CAP_IEEE754;

aral_judy_init();

Expand Down
15 changes: 4 additions & 11 deletions database/rrdhost.c
Original file line number Diff line number Diff line change
Expand Up @@ -1145,13 +1145,10 @@ static void rrdhost_streaming_sender_structures_init(RRDHOST *host)
host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
host->sender->rrdpush_sender_socket = -1;
host->sender->disabled_capabilities = STREAM_CAP_NONE;

#ifdef ENABLE_RRDPUSH_COMPRESSION
if(default_rrdpush_compression_enabled)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
else
host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
if(!default_rrdpush_compression_enabled)
host->sender->disabled_capabilities |= STREAM_CAP_COMPRESSIONS_AVAILABLE;

spinlock_init(&host->sender->spinlock);
replication_init_sender(host->sender);
Expand All @@ -1167,9 +1164,7 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host)
rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread
cbuffer_free(host->sender->buffer);

#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_compressor_destroy(&host->sender->compressor);
#endif

replication_cleanup_sender(host->sender);

Expand Down Expand Up @@ -1885,9 +1880,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
else
s->stream.status = RRDHOST_STREAM_STATUS_ONLINE;

#ifdef ENABLE_RRDPUSH_COMPRESSION
s->stream.compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor.initialized);
#endif
s->stream.compression = host->sender->compressor.initialized;
}
else {
s->stream.status = RRDHOST_STREAM_STATUS_OFFLINE;
Expand Down
11 changes: 4 additions & 7 deletions libnetdata/libnetdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ extern "C" {
#include <config.h>
#endif

#ifdef ENABLE_LZ4
#define ENABLE_RRDPUSH_COMPRESSION 1
#endif

#ifdef ENABLE_OPENSSL
#define ENABLE_HTTPS 1
#endif
Expand Down Expand Up @@ -681,9 +677,10 @@ static inline BITMAPX *bitmapX_create(uint32_t bits) {
#define bitmap1024_get_bit(ptr, idx) bitmapX_get_bit((BITMAPX *)ptr, idx)
#define bitmap1024_set_bit(ptr, idx, value) bitmapX_set_bit((BITMAPX *)ptr, idx, value)


#define COMPRESSION_MAX_MSG_SIZE 0x4000
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 1024)
#define COMPRESSION_MAX_CHUNK 0x4000
#define COMPRESSION_MAX_OVERHEAD 128
#define COMPRESSION_MAX_MSG_SIZE (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD - 1)
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 768)
int pluginsd_isspace(char c);
int config_isspace(char c);
int group_by_label_isspace(char c);
Expand Down