Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topic reflection #692

Merged
merged 12 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ struct conf_websocket {
typedef struct conf_websocket conf_websocket;

typedef struct {
char *topic;
uint32_t topic_len;
char * remote_topic;
uint32_t remote_topic_len;
char * local_topic;
uint32_t local_topic_len;
uint8_t qos;
uint8_t nolocal;
uint8_t retain_as_published;
Expand Down Expand Up @@ -256,7 +258,7 @@ struct conf_bridge_node {
char *clientid;
char *username;
char *password;
char **forwards;
topics **forwards_list;
uint64_t parallel;
topics **sub_list;
conf_tls tls;
Expand Down
193 changes: 146 additions & 47 deletions src/supplemental/nanolib/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2268,29 +2268,42 @@ conf_bridge_node_parse_subs(
}

char key[128] = "";
char * topic = NULL;
uint8_t qos = 0;
uint8_t rap = 0; // only 1/0
uint8_t rhandling = 0; // only 0/1/2
size_t sub_index = 1;
bool get_topic = false;
bool get_qos = false;
bool get_rap = false;
bool get_rhandling = false;
char * line = NULL;
size_t sz = 0;
char * value = NULL;
char * remote_topic = NULL;
char * local_topic = NULL;
uint8_t qos = 0;
uint8_t rap = 0; // only 1/0
uint8_t rhandling = 0; // only 0/1/2
size_t sub_index = 1;
bool get_remote_topic = false;
bool get_local_topic = false;
bool get_qos = false;
bool get_rap = false;
bool get_rhandling = false;
char * line = NULL;
size_t sz = 0;
char * value = NULL;

node->sub_count = 0;
while (nano_getline(&line, &sz, fp) != -1) {
snprintf(key, 128,
"%s%s.subscription.%ld.topic", prefix, name,
"%s%s.subscription.%ld.remote_topic", prefix, name,
sub_index);
if (!get_topic &&
if (!get_remote_topic &&
(value = get_conf_value(line, sz, key)) != NULL) {
// a potential memleak here
topic = value;
get_topic = true;
remote_topic = value;
get_remote_topic = true;
goto check;
}

snprintf(key, 128,
"%s%s.subscription.%ld.local_topic", prefix, name,
sub_index);
if (!get_local_topic &&
(value = get_conf_value(line, sz, key)) != NULL) {
// a potential memleak here
local_topic = value;
get_local_topic = true;
goto check;
}

Expand Down Expand Up @@ -2328,13 +2341,15 @@ conf_bridge_node_parse_subs(

check:
if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
if (get_topic && get_qos && get_rap && get_rhandling) {
if (get_remote_topic && get_local_topic && get_qos && get_rap && get_rhandling) {
sub_index++;
node->sub_count++;
topics *s = NNI_ALLOC_STRUCT(s);
s->stream_id = 0;
s->topic = topic;
s->topic_len = strlen(topic);
s->remote_topic = remote_topic;
s->remote_topic_len = strlen(remote_topic);
s->local_topic = local_topic;
s->local_topic_len = strlen(local_topic);
s->qos = qos;
s->retain_as_published = rap;
s->retain_handling = rhandling;
Expand All @@ -2344,27 +2359,31 @@ conf_bridge_node_parse_subs(
s->stream_id = sub_index;
#endif
cvector_push_back(node->sub_list, s);
get_topic = false;
get_remote_topic = false;
get_local_topic = false;
get_qos = false;
get_rap = false;
get_rhandling = false;
}
} else {
if (get_topic && get_qos) {
if (get_remote_topic && get_local_topic && get_qos) {
sub_index++;
node->sub_count++;
topics *s = NNI_ALLOC_STRUCT(s);
s->stream_id = 0;
s->topic = topic;
s->topic_len = strlen(topic);
s->remote_topic = remote_topic;
s->local_topic = local_topic;
s->remote_topic_len = strlen(remote_topic);
s->local_topic_len = strlen(local_topic);
s->qos = qos;

#if defined(SUPP_QUIC)
if (node->stream_auto_genid)
s->stream_id = sub_index;
#endif
cvector_push_back(node->sub_list, s);
get_topic = false;
get_remote_topic = false;
get_local_topic = false;
get_qos = false;
}
}
Expand All @@ -2377,6 +2396,73 @@ conf_bridge_node_parse_subs(
fclose(fp);
}

static void
conf_bridge_node_parse_forwards(
conf_bridge_node *node, const char *path, const char *prefix, const char *name)
{
FILE *fp;
if ((fp = fopen(path, "r")) == NULL) {
log_error("File %s open failed", path);
return;
}

char key[128] = "";
char * remote_topic = NULL;
char * local_topic = NULL;
bool get_remote_topic = false;
bool get_local_topic = false;
char * line = NULL;
size_t sz = 0;
char * value = NULL;
size_t fwd_index = 1;

node->forwards_count = 0;
while (nano_getline(&line, &sz, fp) != -1) {
snprintf(key, 128,
"%s%s.forwards.%ld.remote_topic", prefix, name,
fwd_index);
if (!get_remote_topic &&
(value = get_conf_value(line, sz, key)) != NULL) {
remote_topic = value;
get_remote_topic = true;
goto check;
}

snprintf(key, 128,
"%s%s.forwards.%ld.local_topic", prefix, name,
fwd_index);
if (!get_local_topic &&
(value = get_conf_value(line, sz, key)) != NULL) {
local_topic = value;
get_local_topic = true;
goto check;
}

free(line);
line = NULL;

check:
if (get_remote_topic && get_local_topic) {
fwd_index++;
node->forwards_count++;
topics *s = NNI_ALLOC_STRUCT(s);
s->remote_topic = remote_topic;
s->local_topic = local_topic;
s->remote_topic_len = strlen(remote_topic);
s->local_topic_len = strlen(local_topic);

cvector_push_back(node->forwards_list, s);
get_remote_topic = false;
get_local_topic = false;
}
}

if (line) {
free(line);
}

fclose(fp);
}
void
conf_bridge_sub_properties_init(conf_bridge_sub_properties *prop)
{
Expand Down Expand Up @@ -2435,7 +2521,7 @@ conf_bridge_node_init(conf_bridge_node *node)
node->proto_ver = 4;
node->keepalive = 60;
node->forwards_count = 0;
node->forwards = NULL;
node->forwards_list = NULL;
node->sub_count = 0;
node->sub_list = NULL;

Expand Down Expand Up @@ -2676,15 +2762,6 @@ conf_bridge_node_parse_with_name(const char *path,const char *key_prefix, const
key_prefix, name, ".will.qos")) != NULL) {
node->will_qos = (uint8_t) atoi(value);
free(value);
} else if ((value = get_conf_value_with_prefix2(line, sz,
key_prefix, name, ".forwards")) != NULL) {
char *tk = strtok(value, ",");
while (tk != NULL) {
node->forwards_count++;
cvector_push_back(node->forwards, nng_strdup(tk));
tk = strtok(NULL, ",");
}
free(value);
}

free(line);
Expand All @@ -2710,6 +2787,7 @@ conf_bridge_node_parse_with_name(const char *path,const char *key_prefix, const
}

conf_bridge_node_parse_subs(node, path, key_prefix, name);
conf_bridge_node_parse_forwards(node, path, key_prefix, name);

sz = strlen(name) + 2;
char *prefix2 = nng_zalloc(sz);
Expand Down Expand Up @@ -2837,22 +2915,34 @@ conf_bridge_node_destroy(conf_bridge_node *node)
free(node->will_payload);
node->will_payload = NULL;
}
if (node->forwards_count > 0 && node->forwards) {
if (node->forwards_count > 0 && node->forwards_list) {
for (size_t i = 0; i < node->forwards_count; i++) {
if (node->forwards[i]) {
free(node->forwards[i]);
node->forwards[i] = NULL;
topics *s = node->forwards_list[i];
if (s->remote_topic) {
free(s->remote_topic);
s->remote_topic = NULL;
}
if (s->local_topic) {
free(s->local_topic);
s->local_topic = NULL;
}
NNI_FREE_STRUCT(s);

}
cvector_free(node->forwards);
node->forwards = NULL;
node->forwards_count = 0;
cvector_free(node->forwards_list);
node->forwards_list = NULL;
}
if (node->sub_count > 0 && node->sub_list) {
for (size_t i = 0; i < node->sub_count; i++) {
topics *s = node->sub_list[i];
if (s->topic) {
free(s->topic);
s->topic = NULL;
if (s->remote_topic) {
free(s->remote_topic);
s->remote_topic = NULL;
}
if (s->local_topic) {
free(s->local_topic);
s->local_topic = NULL;
}
NNI_FREE_STRUCT(s);
}
Expand Down Expand Up @@ -2978,14 +3068,23 @@ print_bridge_conf(conf_bridge *bridge, const char *prefix)

for (size_t j = 0; j < node->forwards_count; j++) {
log_info(
"\t[%ld] topic: %s", j, node->forwards[j]);
"\t[%ld] remote topic: %.*s", j,
node->forwards_list[j]->remote_topic_len,
node->forwards_list[j]->remote_topic);
log_info(
"\t[%ld] local topic: %.*s", j,
node->forwards_list[j]->local_topic_len,
node->forwards_list[j]->local_topic);
}
log_info(
"%sbridge.mqtt.%s.subscription: ", prefix, node->name);
for (size_t k = 0; k < node->sub_count; k++) {
log_info("\t[%ld] topic: %.*s", k + 1,
node->sub_list[k]->topic_len,
node->sub_list[k]->topic);
log_info("\t[%ld] remote topic: %.*s", k + 1,
node->sub_list[k]->remote_topic_len,
node->sub_list[k]->remote_topic);
log_info("\t[%ld] local topic: %.*s", k + 1,
node->sub_list[k]->local_topic_len,
node->sub_list[k]->local_topic);
log_info("\t[%ld] qos: %d", k + 1,
node->sub_list[k]->qos);
}
Expand Down
46 changes: 37 additions & 9 deletions src/supplemental/nanolib/conf_ver2.c
Original file line number Diff line number Diff line change
Expand Up @@ -887,24 +887,39 @@ conf_bridge_node_parse(
{
node->name = nng_strdup(obj->string);
conf_bridge_connector_parse_ver2(node, obj);
hocon_read_str_arr(node, forwards, obj);
node->forwards_count = cvector_size(node->forwards);
node->sqlite = bridge_sqlite;
node->enable = true;
#if defined(SUPP_QUIC)
conf_bridge_quic_parse_ver2(node, obj);
#endif

cJSON *forwards = hocon_get_obj("forwards", obj);

cJSON *forward = NULL;
cJSON_ArrayForEach(forward, forwards)
{
topics *s = NNI_ALLOC_STRUCT(s);
hocon_read_str(s, remote_topic, forward);
hocon_read_str(s, local_topic, forward);
s->remote_topic_len = strlen(s->remote_topic);
s->local_topic_len = strlen(s->local_topic);
cvector_push_back(node->forwards_list, s);
}
node->forwards_count = cvector_size(node->forwards_list);

cJSON *subscriptions = hocon_get_obj("subscription", obj);

cJSON * subscription = NULL;
cJSON *subscription = NULL;
cJSON_ArrayForEach(subscription, subscriptions)
{
topics *s = NNI_ALLOC_STRUCT(s);
hocon_read_str(s, topic, subscription);
hocon_read_str(s, remote_topic, subscription);
hocon_read_str(s, local_topic, subscription);
hocon_read_num(s, qos, subscription);
hocon_read_num(s, retain_as_published, subscription);
hocon_read_num(s, retain_handling, subscription);
s->topic_len = strlen(s->topic);
s->remote_topic_len = strlen(s->remote_topic);
s->local_topic_len = strlen(s->local_topic);
s->stream_id = 0;
hocon_read_num(s, stream_id, subscription);
cvector_push_back(node->sub_list, s);
Expand Down Expand Up @@ -988,8 +1003,19 @@ conf_aws_bridge_parse_ver2(conf *config, cJSON *jso)
}
}

hocon_read_str_arr(node, forwards, bridge_aws_node);
node->forwards_count = cvector_size(node->forwards);
cJSON *forwards = hocon_get_obj("forwards", bridge_aws_node);

cJSON *forward = NULL;
cJSON_ArrayForEach(forward, forwards)
{
topics *s = NNI_ALLOC_STRUCT(s);
hocon_read_str(s, remote_topic, forward);
hocon_read_str(s, local_topic, forward);
s->remote_topic_len = strlen(s->remote_topic);
s->local_topic_len = strlen(s->local_topic);
cvector_push_back(node->forwards_list, s);
}
node->forwards_count = cvector_size(node->forwards_list);

cJSON *subscriptions =
hocon_get_obj("subscription", bridge_aws_node);
Expand All @@ -998,9 +1024,11 @@ conf_aws_bridge_parse_ver2(conf *config, cJSON *jso)
cJSON_ArrayForEach(subscription, subscriptions)
{
topics *s = NNI_ALLOC_STRUCT(s);
hocon_read_str(s, topic, subscription);
hocon_read_str(s, remote_topic, subscription);
hocon_read_str(s, local_topic, subscription);
hocon_read_num(s, qos, subscription);
s->topic_len = strlen(s->topic);
s->remote_topic_len = strlen(s->remote_topic);
s->local_topic_len = strlen(s->local_topic);
s->stream_id = 0;
hocon_read_num(s, stream_id, subscription);
cvector_push_back(node->sub_list, s);
Expand Down
Loading