Skip to content

Commit

Permalink
Prometheus remote write backend (#6062)
Browse files Browse the repository at this point in the history
* Add Prometheus remote write backend prototype

* Fix autotools issues

* Send HTTP POST request

* Add parameters to HTTP header

* Discard HTTP responce 200

* Update CMake build configuration

* Fix Codacy issue

* Check for C++ binary

* Fix compilation without remote write backend

* Add options to the installer script

* Fix configure script warning

* Fix make dist

* Downgrade to ByteSize for better compatibility

* Integrate remote write more tightly into the existing backends code

* Cleanup

* Fix build error

* Parse host tags

* Fix Codacy issue

* Fix counters for buffered data

* Rename preprocessor symbol

* Better error handling

* Cleanup

* Update the documentation
  • Loading branch information
vlvkobal authored and cakrit committed Jun 7, 2019
1 parent 56c502b commit 77781d0
Show file tree
Hide file tree
Showing 20 changed files with 732 additions and 34 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -59,6 +59,10 @@ xenstat.plugin
cgroup-network
!cgroup-network/

# protoc generated files
*.pb.cc
*.pb.h

# installation artifacts
packaging/installer/.environment.sh
*.tar.*
Expand Down
51 changes: 50 additions & 1 deletion CMakeLists.txt
Expand Up @@ -241,6 +241,27 @@ find_library(HAVE_KINESIS aws-cpp-sdk-kinesis)
# later we use:
# ${HAVE_KINESIS}


# -----------------------------------------------------------------------------
# Detect libprotobuf

pkg_check_modules(PROTOBUF protobuf)
# later we use:
# ${PROTOBUF_LIBRARIES}
# ${PROTOBUF_CFLAGS_OTHER}
# ${PROTOBUF_INCLUDE_DIRS}


# -----------------------------------------------------------------------------
# Detect libsnappy

pkg_check_modules(SNAPPY snappy)
# later we use:
# ${SNAPPY_LIBRARIES}
# ${SNAPPY_CFLAGS_OTHER}
# ${SNAPPY_INCLUDE_DIRS}


# -----------------------------------------------------------------------------
# netdata files

Expand Down Expand Up @@ -547,6 +568,11 @@ set(KINESIS_BACKEND_FILES
backends/aws_kinesis/aws_kinesis_put_record.h
)

set(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES
backends/prometheus/remote_write/remote_write.cc
backends/prometheus/remote_write/remote_write.h
)

set(DAEMON_FILES
daemon/common.c
daemon/common.h
Expand Down Expand Up @@ -611,6 +637,29 @@ ELSE()
message(STATUS "kinesis backend: disabled (requires AWS SDK for C++)")
ENDIF()

# -----------------------------------------------------------------------------
# prometheus remote write backend

IF(PROTOBUF_LIBRARIES AND SNAPPY_LIBRARIES)
SET(ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE True)
ELSE()
SET(ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE False)
ENDIF()

IF(ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE)
message(STATUS "prometheus remote write backend: enabled")

find_package(Protobuf REQUIRED)
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS backends/prometheus/remote_write/remote_write.proto)

list(APPEND NETDATA_FILES ${PROMETHEUS_REMOTE_WRITE_BACKEND_FILES} ${PROTO_SRCS} ${PROTO_HDRS})
list(APPEND NETDATA_COMMON_LIBRARIES ${PROTOBUF_LIBRARIES} ${SNAPPY_LIBRARIES})
list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${PROTOBUF_INCLUDE_DIRS} ${SNAPPY_INCLUDE_DIRS} ${CMAKE_CURRENT_BINARY_DIR})
list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER} ${SNAPPY_CFLAGS_OTHER})
ELSE()
message(STATUS "prometheus remote write backend: disabled (requires protobuf and snappy libraries)")
ENDIF()

# -----------------------------------------------------------------------------
# netdata

Expand Down Expand Up @@ -648,7 +697,7 @@ ELSEIF(MACOS)

ENDIF()

IF(ENABLE_BACKEND_KINESIS)
IF(ENABLE_BACKEND_KINESIS OR ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE)
set_property(TARGET netdata PROPERTY CXX_STANDARD 11)
set_property(TARGET netdata PROPERTY CMAKE_CXX_STANDARD_REQUIRED ON)
ENDIF()
Expand Down
24 changes: 22 additions & 2 deletions Makefile.am
Expand Up @@ -441,6 +441,12 @@ KINESIS_BACKEND_FILES = \
backends/aws_kinesis/aws_kinesis_put_record.h \
$(NULL)

PROMETHEUS_REMOTE_WRITE_BACKEND_FILES = \
backends/prometheus/remote_write/remote_write.cc \
backends/prometheus/remote_write/remote_write.h \
backends/prometheus/remote_write/remote_write.proto \
$(NULL)

DAEMON_FILES = \
daemon/common.c \
daemon/common.h \
Expand Down Expand Up @@ -505,14 +511,13 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_JUDY_LIBS) \
$(OPTIONAL_SSL_LIBS) \
$(NULL)
# TODO: Find more graceful way to add libs for AWS Kinesis

sbin_PROGRAMS += netdata
netdata_SOURCES = $(NETDATA_FILES)
netdata_LDADD = \
$(NETDATA_COMMON_LIBS) \
$(NULL)
if ENABLE_BACKEND_KINESIS
if ENABLE_CXX_LINKER
netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@
else
netdata_LINK = $(CCLD) $(CFLAGS) $(LDFLAGS) -o $@
Expand Down Expand Up @@ -575,3 +580,18 @@ if ENABLE_BACKEND_KINESIS
netdata_SOURCES += $(KINESIS_BACKEND_FILES)
netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
endif

if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES)
netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
BUILT_SOURCES = \
backends/prometheus/remote_write/remote_write.pb.cc \
backends/prometheus/remote_write/remote_write.pb.h \
$(NULL)
nodist_netdata_SOURCES = $(BUILT_SOURCES)

backends/prometheus/remote_write/remote_write.pb.cc \
backends/prometheus/remote_write/remote_write.pb.h: backends/prometheus/remote_write/remote_write.proto
$(PROTOC) --proto_path=$(srcdir) --cpp_out=$(builddir) $^

endif
8 changes: 7 additions & 1 deletion backends/README.md
Expand Up @@ -32,6 +32,12 @@ X seconds (though, it can send them per second if you need it to).

- **prometheus** is described at [prometheus page](prometheus/) since it pulls data from netdata.

- **prometheus remote write** (a binary snappy-compressed protocol buffer encoding over HTTP used by
a lot of [storage providers](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage))

metrics are labeled in the format, which is used by Netdata for the [plaintext prometheus protocol](prometheus/).
Notes on using the remote write backend are [here](prometheus/remote_write/).

- **AWS Kinesis Data Streams**

metrics are sent to the service in `JSON` format.
Expand Down Expand Up @@ -70,7 +76,7 @@ of `netdata.conf` from your netdata):
```
[backend]
enabled = yes | no
type = graphite | opentsdb | json | kinesis
type = graphite | opentsdb | json | prometheus_remote_write | kinesis
host tags = list of TAG=VALUE
destination = space separated list of [PROTOCOL:]HOST[:PORT] - the first working will be used, or a region for kinesis
data source = average | sum | as collected
Expand Down
144 changes: 123 additions & 21 deletions backends/backends.c
Expand Up @@ -260,6 +260,12 @@ void *backends_main(void *ptr) {
char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
#endif

#if ENABLE_PROMETHEUS_REMOTE_WRITE
int do_prometheus_remote_write = 0;
BUFFER *http_request_header = buffer_create(1);
#endif


// ------------------------------------------------------------------------
// collect configuration options

Expand All @@ -285,6 +291,10 @@ void *backends_main(void *ptr) {
charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);

#if ENABLE_PROMETHEUS_REMOTE_WRITE
const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive");
#endif

// ------------------------------------------------------------------------
// validate configuration options
// and prepare for sending data to our backend
Expand Down Expand Up @@ -337,9 +347,8 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_stored_json_plaintext;

}
#if HAVE_KINESIS
else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {

#if HAVE_KINESIS
do_kinesis = 1;

if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
Expand All @@ -354,15 +363,31 @@ void *backends_main(void *ptr) {
backend_request_formatter = format_dimension_collected_json_plaintext;
else
backend_request_formatter = format_dimension_stored_json_plaintext;
#else
error("AWS Kinesis support isn't compiled");
#endif /* HAVE_KINESIS */
}
else if (!strcmp(type, "prometheus_remote_write")) {
#if ENABLE_PROMETHEUS_REMOTE_WRITE
do_prometheus_remote_write = 1;

backend_response_checker = process_prometheus_remote_write_response;

init_write_request();
#else
error("Prometheus remote write support isn't compiled");
#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
}
#endif /* HAVE_KINESIS */
else {
error("BACKEND: Unknown backend type '%s'", type);
goto cleanup;
}

#if ENABLE_PROMETHEUS_REMOTE_WRITE
if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) {
#else
if(backend_request_formatter == NULL || backend_response_checker == NULL) {
#endif
error("BACKEND: backend is misconfigured - disabling it.");
goto cleanup;
}
Expand Down Expand Up @@ -451,6 +476,9 @@ void *backends_main(void *ptr) {
size_t count_charts_total = 0;
size_t count_dims_total = 0;

#if ENABLE_PROMETHEUS_REMOTE_WRITE
clear_write_request();
#endif
rrd_rdlock();
RRDHOST *host;
rrdhost_foreach_read(host) {
Expand Down Expand Up @@ -478,26 +506,45 @@ void *backends_main(void *ptr) {

const char *__hostname = (host == localhost)?hostname:host->hostname;

RRDSET *st;
rrdset_foreach_read(st, host) {
if(likely(backends_can_send_rrdset(global_backend_options, st))) {
rrdset_rdlock(st);

count_charts++;

RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if (likely(rd->last_collected_time.tv_sec >= after)) {
chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
count_dims++;
}
else {
debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
count_dims_skipped++;
#if ENABLE_PROMETHEUS_REMOTE_WRITE
if(do_prometheus_remote_write) {
rrd_stats_remote_write_allmetrics_prometheus(
host
, __hostname
, global_backend_prefix
, global_backend_options
, after
, before
, &count_charts
, &count_dims
, &count_dims_skipped
);
chart_buffered_metrics += count_dims;
}
else
#endif
{
RRDSET *st;
rrdset_foreach_read(st, host) {
if(likely(backends_can_send_rrdset(global_backend_options, st))) {
rrdset_rdlock(st);

count_charts++;

RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if (likely(rd->last_collected_time.tv_sec >= after)) {
chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
count_dims++;
}
else {
debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
count_dims_skipped++;
}
}
}

rrdset_unlock(st);
rrdset_unlock(st);
}
}
}

Expand Down Expand Up @@ -672,6 +719,43 @@ void *backends_main(void *ptr) {
flags += MSG_NOSIGNAL;
#endif

#if ENABLE_PROMETHEUS_REMOTE_WRITE
if(do_prometheus_remote_write) {
size_t data_size = get_write_request_size();

if(unlikely(!data_size)) {
error("BACKEND: write request size is out of range");
continue;
}

buffer_flush(b);
buffer_need_bytes(b, data_size);
if(unlikely(pack_write_request(b->buffer, &data_size))) {
error("BACKEND: cannot pack write request");
continue;
}
b->len = data_size;
chart_buffered_bytes = (collected_number)buffer_strlen(b);

buffer_flush(http_request_header);
buffer_sprintf(http_request_header,
"POST %s HTTP/1.1\r\n"
"Host: %s\r\n"
"Accept: */*\r\n"
"Content-Length: %zu\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n\r\n",
remote_write_path,
hostname,
data_size
);

len = buffer_strlen(http_request_header);
send(sock, buffer_tostring(http_request_header), len, flags);

len = data_size;
}
#endif

ssize_t written = send(sock, buffer_tostring(b), len, flags);
// chart_backend_latency += now_monotonic_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
Expand Down Expand Up @@ -711,6 +795,16 @@ void *backends_main(void *ptr) {
}
}


#if ENABLE_PROMETHEUS_REMOTE_WRITE
if(failures) {
(void) buffer_on_failures;
failures = 0;
chart_lost_bytes = chart_buffered_bytes = get_write_request_size(); // estimated write request size
chart_data_lost_events++;
chart_lost_metrics = chart_buffered_metrics;
}
#else
if(failures > buffer_on_failures) {
// too bad! we are going to lose data
chart_lost_bytes += buffer_strlen(b);
Expand All @@ -720,6 +814,7 @@ void *backends_main(void *ptr) {
chart_data_lost_events++;
chart_lost_metrics = chart_buffered_metrics;
}
#endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */

if(unlikely(netdata_exit)) break;

Expand Down Expand Up @@ -775,6 +870,13 @@ void *backends_main(void *ptr) {
}
#endif

#if ENABLE_PROMETHEUS_REMOTE_WRITE
if(do_prometheus_remote_write) {
buffer_free(http_request_header);
protocol_buffers_shutdown();
}
#endif

if(sock != -1)
close(sock);

Expand Down
4 changes: 4 additions & 0 deletions backends/backends.h
Expand Up @@ -53,4 +53,8 @@ extern int discard_response(BUFFER *b, const char *backend);
#include "backends/aws_kinesis/aws_kinesis.h"
#endif

#if ENABLE_PROMETHEUS_REMOTE_WRITE
#include "backends/prometheus/remote_write/remote_write.h"
#endif

#endif /* NETDATA_BACKENDS_H */

0 comments on commit 77781d0

Please sign in to comment.