Skip to content

Commit

Permalink
Revert "Revert changes since v1.21 in pereparation for hotfix release."
Browse files Browse the repository at this point in the history
This reverts commit e287432.
  • Loading branch information
Ferroin committed Apr 13, 2020
1 parent 5a12b4a commit 983a26d
Show file tree
Hide file tree
Showing 56 changed files with 1,647 additions and 442 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -1112,6 +1112,8 @@ endif()
-Wl,--wrap=recv
-Wl,--wrap=send
-Wl,--wrap=connect_to_one_of
-Wl,--wrap=create_main_rusage_chart
-Wl,--wrap=send_main_rusage
${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS}
${KINESIS_LINK_OPTIONS}
${MONGODB_LINK_OPTIONS}
Expand Down
32 changes: 6 additions & 26 deletions Makefile.am
Expand Up @@ -573,6 +573,7 @@ NETDATA_FILES = \
$(LIBNETDATA_FILES) \
$(API_PLUGIN_FILES) \
$(BACKENDS_PLUGIN_FILES) \
$(EXPORTING_ENGINE_FILES) \
$(CHECKS_PLUGIN_FILES) \
$(HEALTH_PLUGIN_FILES) \
$(IDLEJITTER_PLUGIN_FILES) \
Expand Down Expand Up @@ -608,12 +609,6 @@ if LINUX

endif

if ENABLE_EXPORTING
NETDATA_FILES += \
$(EXPORTING_ENGINE_FILES) \
$(NULL)
endif

NETDATA_COMMON_LIBS = \
$(OPTIONAL_MATH_LIBS) \
$(OPTIONAL_ZLIB_LIBS) \
Expand Down Expand Up @@ -745,23 +740,13 @@ if ENABLE_PLUGIN_SLABINFO
$(NULL)
endif

if ENABLE_EXPORTING
if ENABLE_BACKEND_KINESIS
netdata_SOURCES += $(KINESIS_EXPORTING_FILES)
netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
endif
endif

if ENABLE_BACKEND_KINESIS
netdata_SOURCES += $(KINESIS_BACKEND_FILES)
netdata_SOURCES += $(KINESIS_BACKEND_FILES) $(KINESIS_EXPORTING_FILES)
netdata_LDADD += $(OPTIONAL_KINESIS_LIBS)
endif

if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
if ENABLE_EXPORTING
netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
endif
netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES)
netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
BUILT_SOURCES = \
exporting/prometheus/remote_write/remote_write.pb.cc \
Expand All @@ -775,15 +760,8 @@ exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote

endif

if ENABLE_EXPORTING
if ENABLE_BACKEND_MONGODB
netdata_SOURCES += $(MONGODB_EXPORTING_FILES)
netdata_LDADD += $(OPTIONAL_MONGOC_LIBS)
endif
endif

if ENABLE_BACKEND_MONGODB
netdata_SOURCES += $(MONGODB_BACKEND_FILES)
netdata_SOURCES += $(MONGODB_BACKEND_FILES) $(MONGODB_EXPORTING_FILES)
netdata_LDADD += $(OPTIONAL_MONGOC_LIBS)
endif

Expand Down Expand Up @@ -895,6 +873,8 @@ if ENABLE_UNITTESTS
-Wl,--wrap=recv \
-Wl,--wrap=send \
-Wl,--wrap=connect_to_one_of \
-Wl,--wrap=create_main_rusage_chart \
-Wl,--wrap=send_main_rusage \
$(TEST_LDFLAGS) \
$(NULL)
exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
Expand Down
3 changes: 2 additions & 1 deletion README.md
@@ -1,6 +1,7 @@
<!--
---
title: "Netdata"
date: 2020-04-06
custom_edit_url: https://github.com/netdata/netdata/edit/master/README.md
---
-->
Expand Down Expand Up @@ -33,7 +34,7 @@ granularity. Run this long-term storage autonomously, or integrate Netdata with
Netdata is **fast** and **efficient**, designed to permanently run on all systems (**physical** and **virtual** servers,
**containers**, **IoT** devices), without disrupting their core function.

Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **MacOS**, along with
Netdata is **free, open-source software** and it currently runs on **Linux**, **FreeBSD**, and **macOS**, along with
other systems derived from them, such as **Kubernetes** and **Docker**.

Netdata is not hosted by the CNCF but is the 3rd most starred open-source project in the [Cloud Native Computing
Expand Down
120 changes: 67 additions & 53 deletions aclk/agent_cloud_link.c
Expand Up @@ -23,6 +23,8 @@ static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer

static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -185,7 +187,7 @@ static int create_private_key()
* should be called with
*
* mode 0 to reset the delay
* mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
* mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
*
*/
unsigned long int aclk_reconnect_delay(int mode)
Expand All @@ -208,8 +210,6 @@ unsigned long int aclk_reconnect_delay(int mode)
delay = (delay * 1000) + (random() % 1000);
}

// sleep_usec(USEC_PER_MS * delay);

return delay;
}

Expand Down Expand Up @@ -306,7 +306,7 @@ int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 1;
return 0;
}

if (last_query)
Expand Down Expand Up @@ -750,8 +750,8 @@ int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;

aclk_create_header(local_buffer, "http", this_query->msg_id);

aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data);

buffer_sprintf(
Expand Down Expand Up @@ -821,11 +821,6 @@ int aclk_process_query()
aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
break;

case ACLK_CMD_ALARMS:
debug(D_ACLK, "EXECUTING an alarms update command");
aclk_send_alarm_metadata();
break;

case ACLK_CMD_CLOUD:
debug(D_ACLK, "EXECUTING a cloud command");
aclk_execute_query(this_query);
Expand Down Expand Up @@ -868,18 +863,22 @@ int aclk_process_queries()
static void aclk_query_thread_cleanup(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

info("cleaning up...");

COLLECTOR_LOCK;

_reset_collector_list();
freez(collector_list);

COLLECTOR_UNLOCK;
// Clean memory for pending queries if any
struct aclk_query *this_query;

static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
do {
this_query = aclk_queue_pop();
aclk_query_free(this_query);
} while (this_query);

freez(static_thread->thread);
freez(static_thread);
}

/**
Expand Down Expand Up @@ -916,7 +915,7 @@ void *aclk_query_main_thread(void *ptr)
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
errno = 0;
error("ACLK failed to queue on_connect command");
aclk_metadata_submitted = 0;
aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
}
}

Expand All @@ -939,7 +938,6 @@ void *aclk_query_main_thread(void *ptr)
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
char payload[512];
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

Expand All @@ -952,24 +950,11 @@ static void aclk_main_cleanup(void *ptr)
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
// Send a graceful disconnect message
char *msg_id = create_uuid();

usec_t time_created_offset_usec = now_realtime_usec();
time_t time_created = time_created_offset_usec / USEC_PER_SEC;
time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;

snprintfz(
payload, 511,
"{ \"type\": \"disconnect\","
" \"msg-id\": \"%s\","
" \"timestamp\": %ld,"
" \"timestamp-offset-usec\": %llu,"
" \"version\": %d,"
" \"payload\": \"graceful\" }",
msg_id, time_created, time_created_offset_usec, ACLK_VERSION);

aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
freez(msg_id);
BUFFER *b = buffer_create(512);
aclk_create_header(b, "disconnect", NULL, 0, 0);
buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
buffer_free(b);

event_loop_timeout = now_realtime_sec() + 5;
write_q = 1;
Expand All @@ -990,7 +975,6 @@ static void aclk_main_cleanup(void *ptr)
}
}

info("Disconnected");

static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
Expand Down Expand Up @@ -1295,7 +1279,6 @@ void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;

netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
if (!netdata_cloud_setting) {
info("Killing ACLK thread -> cloud functionality has been disabled");
return NULL;
Expand Down Expand Up @@ -1335,10 +1318,11 @@ void *aclk_main(void *ptr)
sleep_usec(USEC_PER_SEC * 60);
}
create_publish_base_topic();
create_private_key();

usec_t reconnect_expiry = 0; // In usecs

netdata_thread_disable_cancelability();

while (!netdata_exit) {
static int first_init = 0;
size_t write_q, write_q_bytes, read_q;
Expand Down Expand Up @@ -1392,7 +1376,8 @@ void *aclk_main(void *ptr)
}
} // forever
exited:
aclk_shutdown();
// Wakeup query thread to cleanup
QUERY_THREAD_WAKEUP;

freez(aclk_username);
freez(aclk_password);
Expand All @@ -1401,7 +1386,7 @@ void *aclk_main(void *ptr)
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);

netdata_thread_cleanup_pop(1);
aclk_main_cleanup(ptr);
return NULL;
}

Expand Down Expand Up @@ -1514,7 +1499,7 @@ void aclk_shutdown()
info("Shutdown complete");
}

inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
{
uuid_t uuid;
char uuid_str[36 + 1];
Expand All @@ -1525,21 +1510,24 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
msg_id = uuid_str;
}

usec_t time_created_offset_usec = now_realtime_usec();
time_t time_created = time_created_offset_usec / USEC_PER_SEC;
time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
if (ts_secs == 0) {
ts_us = now_realtime_usec();
ts_secs = ts_us / USEC_PER_SEC;
ts_us = ts_us % USEC_PER_SEC;
}

buffer_sprintf(
dest,
"\t{\"type\": \"%s\",\n"
"\t\"msg-id\": \"%s\",\n"
"\t\"timestamp\": %ld,\n"
"\t\"timestamp-offset-usec\": %llu,\n"
"\t\"version\": %d,\n"
"\t\"payload\": ",
type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
"\t\"connect\": %ld,\n"
"\t\"connect-offset-usec\": %llu,\n"
"\t\"version\": %d",
type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);

debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
}

/*
Expand Down Expand Up @@ -1599,7 +1587,15 @@ void aclk_send_alarm_metadata()

debug(D_ACLK, "Metadata alarms start");

aclk_create_header(local_buffer, "connect_alarms", msg_id);
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (aclk_metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
else
aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");

buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
health_alarms2json(localhost, local_buffer, 1);
Expand Down Expand Up @@ -1635,7 +1631,16 @@ int aclk_send_info_metadata()
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;

aclk_create_header(local_buffer, "connect", msg_id);
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (aclk_metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "connect", msg_id, 0, 0);
else
aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");

buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
Expand Down Expand Up @@ -1728,7 +1733,9 @@ int aclk_send_single_chart(char *hostname, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;

aclk_create_header(local_buffer, "chart", msg_id);
aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");

rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");

Expand Down Expand Up @@ -1793,7 +1800,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();

buffer_flush(local_buffer);
aclk_create_header(local_buffer, "status-change", msg_id);
aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");

netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
Expand Down Expand Up @@ -1863,6 +1871,12 @@ int aclk_handle_cloud_request(char *payload)
return 1;
}

// Checked to be "http", not needed anymore
if (likely(cloud_to_agent.type_id)) {
freez(cloud_to_agent.type_id);
cloud_to_agent.type_id = NULL;
}

if (unlikely(aclk_submit_request(&cloud_to_agent)))
debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);

Expand Down

0 comments on commit 983a26d

Please sign in to comment.