Skip to content

Commit

Permalink
Send ML feature information with UpdateNodeInfo. (#11913)
Browse files Browse the repository at this point in the history
* Send ML feature information with UpdateNodeInfo.

We achieve this by adding the `ml_{capable,enabled}` fields in
`system_info`. When streaming, these fields allow a parent to understand if
the child has ML and if it runs ML for itself.

The UpdateNodeInfo includes this information about a child, plus a
boolean that is set to true when the parent runs ML for the child.

* Fix unit test and building with --disable-ml.

* Refactoring to use the new MachineLearningInfo message

* Update aclk-schemas repository to include latest ML info message.
  • Loading branch information
vkalintiris committed Dec 22, 2021
1 parent e167d2d commit df8930d
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 2 deletions.
2 changes: 1 addition & 1 deletion aclk/aclk-schemas
8 changes: 8 additions & 0 deletions aclk/schema-wrappers/node_info.cc
Expand Up @@ -62,6 +62,10 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
if (data->machine_guid)
info->set_machine_guid(data->machine_guid);

nodeinstance::info::v1::MachineLearningInfo *ml_info = info->mutable_ml_info();
ml_info->set_ml_capable(data->ml_info.ml_capable);
ml_info->set_ml_enabled(data->ml_info.ml_enabled);

map = info->mutable_host_labels();
label = data->host_labels_head;
while (label) {
Expand All @@ -86,6 +90,10 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
msg.set_machine_guid(info->machine_guid);
msg.set_child(info->child);

nodeinstance::info::v1::MachineLearningInfo *ml_info = msg.mutable_ml_info();
ml_info->set_ml_capable(info->ml_info.ml_capable);
ml_info->set_ml_enabled(info->ml_info.ml_enabled);

*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
Expand Down
9 changes: 9 additions & 0 deletions aclk/schema-wrappers/node_info.h
Expand Up @@ -11,6 +11,11 @@
extern "C" {
#endif

struct machine_learning_info {
bool ml_capable;
bool ml_enabled;
};

struct aclk_node_info {
char *name;

Expand Down Expand Up @@ -49,6 +54,8 @@ struct aclk_node_info {
char *machine_guid;

struct label *host_labels_head;

struct machine_learning_info ml_info;
};

struct update_node_info {
Expand All @@ -58,6 +65,8 @@ struct update_node_info {
struct timeval updated_at;
char *machine_guid;
int child;

struct machine_learning_info ml_info;
};

char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
Expand Down
2 changes: 2 additions & 0 deletions database/rrd.h
Expand Up @@ -754,6 +754,8 @@ struct rrdhost_system_info {
char *container_detection;
char *is_k8s_node;
uint16_t hops;
bool ml_capable;
bool ml_enabled;
};

struct rrdhost {
Expand Down
12 changes: 12 additions & 0 deletions database/rrdhost.c
Expand Up @@ -382,7 +382,19 @@ RRDHOST *rrdhost_create(const char *hostname,
else localhost = host;
}

// ------------------------------------------------------------------------
// init new ML host and update system_info to let upstreams know
// about ML functionality

ml_new_host(host);
if (is_localhost && host->system_info) {
#ifndef ENABLE_ML
host->system_info->ml_capable = 0;
#else
host->system_info->ml_capable = 1;
#endif
host->system_info->ml_enabled = host->ml_host != NULL;
}

info("Host '%s' (at registry as '%s') with guid '%s' initialized"
", os '%s'"
Expand Down
4 changes: 4 additions & 0 deletions database/sqlite/sqlite_aclk_node.c
Expand Up @@ -22,6 +22,8 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.claim_id = is_agent_claimed();
node_info.machine_guid = wc->host_guid;
node_info.child = (wc->host != localhost);
node_info.ml_info.ml_capable = localhost->system_info->ml_capable;
node_info.ml_info.ml_enabled = wc->host->ml_host != NULL;
now_realtime_timeval(&node_info.updated_at);

RRDHOST *host = wc->host;
Expand All @@ -46,6 +48,8 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.services = NULL; // char **
node_info.data.service_count = 0;
node_info.data.machine_guid = wc->host_guid;
node_info.data.ml_info.ml_capable = host->system_info->ml_capable;
node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled;

struct label_index *labels = &host->labels;
netdata_rwlock_wrlock(&labels->labels_rwlock);
Expand Down
4 changes: 4 additions & 0 deletions streaming/rrdpush.c
Expand Up @@ -522,6 +522,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
utc_offset = (int32_t)strtol(value, NULL, 0);
else if(!strcmp(name, "hops"))
system_info->hops = (uint16_t) strtoul(value, NULL, 0);
else if(!strcmp(name, "ml_capable"))
system_info->ml_capable = strtoul(value, NULL, 0);
else if(!strcmp(name, "ml_enabled"))
system_info->ml_enabled = strtoul(value, NULL, 0);
else if(!strcmp(name, "tags"))
tags = value;
else if(!strcmp(name, "ver"))
Expand Down
18 changes: 17 additions & 1 deletion streaming/sender.c
Expand Up @@ -214,7 +214,21 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po

char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
"STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u"
"STREAM "
"key=%s"
"&hostname=%s"
"&registry_hostname=%s"
"&machine_guid=%s"
"&update_every=%d"
"&os=%s"
"&timezone=%s"
"&abbrev_timezone=%s"
"&utc_offset=%d"
"&hops=%d"
"&ml_capable=%d"
"&ml_enabled=%d"
"&tags=%s"
"&ver=%u"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
Expand Down Expand Up @@ -253,6 +267,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->abbrev_timezone
, host->utc_offset
, host->system_info->hops + 1
, host->system_info->ml_capable
, host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
, STREAMING_PROTOCOL_CURRENT_VERSION
, se.os_name
Expand Down

0 comments on commit df8930d

Please sign in to comment.