Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACLK: Implemented Last Will and Testament #8410

Merged
merged 14 commits into from Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions aclk/aclk_lws_wss_client.c
Expand Up @@ -6,6 +6,8 @@
#include "../daemon/common.h"
#include "aclk_common.h"

extern int aclk_shutting_down;

static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);

struct aclk_lws_wss_perconnect_data {
Expand Down Expand Up @@ -320,6 +322,8 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
return "LWS_CALLBACK_CLIENT_ESTABLISHED";
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
default:
// Not using an internal buffer here for thread-safety with unknown calling context.
error("Unknown LWS callback %u", reason);
Expand All @@ -331,6 +335,13 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
UNUSED(user);
struct lws_wss_packet_buffer *data;
int retval = 0;
static int lws_shutting_down = 0;

if (unlikely(aclk_shutting_down && !lws_shutting_down)) {
lws_shutting_down = 1;
retval = -1;
engine_instance->upstream_reconnect_request = 0;
}

// Callback servicing is forced when we are closed from above.
if (engine_instance->upstream_reconnect_request) {
Expand Down
53 changes: 48 additions & 5 deletions aclk/agent_cloud_link.c
Expand Up @@ -5,6 +5,7 @@
#include "aclk_lws_https_client.h"
#include "aclk_common.h"

int aclk_shutting_down = 0;
// State-machine for the on-connect metadata transmission.
// TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
// agent startup phase.
Expand Down Expand Up @@ -43,6 +44,8 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)

void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);

/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
Expand Down Expand Up @@ -936,13 +939,54 @@ void *aclk_query_main_thread(void *ptr)
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
char payload[512];
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

info("cleaning up...");

// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
if (is_agent_claimed() && aclk_connected) {
size_t write_q, write_q_bytes, read_q;
time_t event_loop_timeout;

// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
// Send a graceful disconnect message
time_t time_created = now_realtime_sec();
char *msg_id = create_uuid();

snprintfz(
payload, 511,
"{ \"type\": \"disconnect\","
" \"msg-id\": \"%s\","
" \"timestamp\": %ld,"
" \"version\": %d,"
" \"payload\": \"graceful\" }",
msg_id, time_created, ACLK_VERSION);

aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
freez(msg_id);

event_loop_timeout = now_realtime_sec() + 5;
write_q = 1;
while (write_q && event_loop_timeout > now_realtime_sec()) {
_link_event_loop();
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
}

aclk_shutting_down = 1;
_link_shutdown();
aclk_lws_wss_mqtt_layer_disconect_notif();

write_q = 1;
event_loop_timeout = now_realtime_sec() + 5;
while (write_q && event_loop_timeout > now_realtime_sec()) {
_link_event_loop();
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
}
}

info("Disconnected");

static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
Expand Down Expand Up @@ -1243,7 +1287,6 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
*
* @return It always returns NULL
*/
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
Expand Down Expand Up @@ -1295,8 +1338,8 @@ void *aclk_main(void *ptr)
size_t write_q, write_q_bytes, read_q;
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
//info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
// first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
if (unlikely(!aclk_connected)) {
// first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
if (unlikely(!netdata_exit && !aclk_connected)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
first_init = 1;
Expand Down
1 change: 1 addition & 0 deletions aclk/agent_cloud_link.h
Expand Up @@ -78,6 +78,7 @@ extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
//char *get_base_topic();

extern char *is_agent_claimed(void);
extern void aclk_lws_wss_mqtt_layer_disconect_notif();
char *create_uuid();

// callbacks for agent cloud link
Expand Down
43 changes: 40 additions & 3 deletions aclk/mqtt.c
Expand Up @@ -131,6 +131,8 @@ static int _mqtt_create_connection(char *username, char *password)
return MOSQ_ERR_UNKNOWN;
}

_link_set_lwt("outbound/meta", 2);

mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_disconnect_callback_set(mosq, disconnect_callback);
mosquitto_publish_callback_set(mosq, publish_callback);
Expand Down Expand Up @@ -174,6 +176,10 @@ static inline void _link_mosquitto_write()
{
int rc;

if (unlikely(!mosq)) {
return;
}

rc = mosquitto_loop_misc(mosq);
if (unlikely(rc != MOSQ_ERR_SUCCESS))
debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
Expand Down Expand Up @@ -234,6 +240,9 @@ void _link_shutdown()
{
int rc;

if (likely(!mosq))
return;

rc = mosquitto_disconnect(mosq);
switch (rc) {
case MOSQ_ERR_SUCCESS:
Expand All @@ -243,11 +252,39 @@ void _link_shutdown()
info("MQTT invalid structure");
break;
};
}


mosquitto_destroy(mosq);
mosq = NULL;
int _link_set_lwt(char *sub_topic, int qos)
{
int rc;
char topic[ACLK_MAX_TOPIC + 1];
char payload[512];
char *final_topic;

aclk_lws_wss_client_destroy();
final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
if (unlikely(!final_topic)) {
errno = 0;
error("Unable to build outgoing topic; truncated?");
return 1;
}

time_t time_created = now_realtime_sec();
char *msg_id = create_uuid();

snprintfz(
payload, 511,
"{ \"type\": \"disconnect\","
" \"msg-id\": \"%s\","
" \"timestamp\": %ld,"
" \"version\": %d,"
" \"payload\": \"unexpected\" }",
msg_id, time_created, ACLK_VERSION);

freez(msg_id);

rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0);
return rc;
}

int _link_subscribe(char *topic, int qos)
Expand Down
3 changes: 3 additions & 0 deletions aclk/mqtt.h
Expand Up @@ -16,7 +16,10 @@ int _mqtt_lib_init();
int _link_subscribe(char *topic, int qos);
int _link_send_message(char *topic, unsigned char *message, int *mid);
const char *_link_strerror(int rc);
int _link_set_lwt(char *topic, int qos);


int aclk_handle_cloud_request(char *);
extern char *get_topic(char *sub_topic, char *final_topic, int max_size);

#endif //NETDATA_MQTT_H