From 10a278d39ce9a09eed0b1050ba471fc1b061b547 Mon Sep 17 00:00:00 2001 From: alvin1221 Date: Thu, 4 May 2023 15:49:19 +0800 Subject: [PATCH 1/5] * MDF [bridge] adapt to changes from NNG --- nanomq/aws_bridge.c | 6 +++--- nanomq/bridge.c | 24 ++++++++++++------------ nanomq/conf_api.c | 8 ++++---- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/nanomq/aws_bridge.c b/nanomq/aws_bridge.c index 314d2341f..5800bc8f0 100644 --- a/nanomq/aws_bridge.c +++ b/nanomq/aws_bridge.c @@ -412,9 +412,9 @@ subscribe_to_topic(MQTTContext_t *mqtt_ctx, conf_bridge_node *node) /* This example subscribes to only one topic and uses QOS1. */ for (size_t i = 0; i < node->sub_count; i++) { - sub_list[i].qos = node->sub_list[i].qos; - sub_list[i].pTopicFilter = node->sub_list[i].topic; - sub_list[i].topicFilterLength = node->sub_list[i].topic_len; + sub_list[i].qos = node->sub_list[i]->qos; + sub_list[i].pTopicFilter = node->sub_list[i]->topic; + sub_list[i].topicFilterLength = node->sub_list[i]->topic_len; } /* Generate packet identifier for the SUBSCRIBE packet. */ diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 5e3af15ad..a3edc900e 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -195,11 +195,11 @@ bridge_connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg) nng_mqtt_topic_qos_array_create(param->config->sub_count); for (size_t i = 0; i < param->config->sub_count; i++) { nng_mqtt_topic_qos_array_set(topic_qos, i, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Bridge client subscribed topic %s (qos %d).", - param->config->sub_list[i].topic, - param->config->sub_list[i].qos); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos); } nng_mqtt_client *client = param->client; @@ -593,13 +593,13 @@ quic_ack_cb(void *arg) nng_mqtt_topic_qos *topic_qos = nng_mqtt_topic_qos_array_create(1); nng_mqtt_topic_qos_array_set(topic_qos, 0, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Quic bridge client subscribe to " "topic (QoS " "%d)%s.", - param->config->sub_list[i].qos, - param->config->sub_list[i].topic); + param->config->sub_list[i]->qos, + param->config->sub_list[i]->topic); nng_mqtt_subscribe_async( client, topic_qos, 1, NULL); nng_mqtt_topic_qos_array_free(topic_qos, 1); @@ -610,12 +610,12 @@ quic_ack_cb(void *arg) param->config->sub_count); for (size_t i = 0; i < param->config->sub_count; i++) { nng_mqtt_topic_qos_array_set(topic_qos, i, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Quic bridge client subscribed topic " "(q%d)%s.", - param->config->sub_list[i].qos, - param->config->sub_list[i].topic); + param->config->sub_list[i]->qos, + param->config->sub_list[i]->topic); } // TODO support MQTT V5 nng_mqtt_subscribe_async( diff --git a/nanomq/conf_api.c b/nanomq/conf_api.c index 45f290231..b6a6d44fa 100644 --- a/nanomq/conf_api.c +++ b/nanomq/conf_api.c @@ -366,11 +366,11 @@ get_bridge_config(conf_bridge *bridge, const char *node_name) cJSON *sub_infos = cJSON_CreateArray(); for (size_t j = 0; j < node->sub_count; j++) { - cJSON *sub_obj = cJSON_CreateObject(); - topics sub = node->sub_list[j]; + cJSON * sub_obj = cJSON_CreateObject(); + topics *sub = node->sub_list[j]; cJSON_AddStringOrNullToObject( - sub_obj, "topic", sub.topic); - cJSON_AddNumberToObject(sub_obj, "qos", sub.qos); + sub_obj, "topic", sub->topic); + cJSON_AddNumberToObject(sub_obj, "qos", sub->qos); cJSON_AddItemToArray(sub_infos, sub_obj); } From 8f63162e667b393fc8f74423b40cd9b672d80302 Mon Sep 17 00:00:00 2001 From: alvin1221 Date: Fri, 5 May 2023 10:23:00 +0800 Subject: [PATCH 2/5] * NEW [rest] add subscribe API for bridge --- nanomq/rest_api.c | 203 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 202 insertions(+), 1 deletion(-) diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index 8593d880a..1277e8fd8 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -348,6 +348,7 @@ static http_msg post_mqtt_msg_batch( http_msg *msg, nng_socket *sock, handle_mqtt_msg_cb cb); static http_msg get_mqtt_bridge(http_msg *msg, const char *name); static http_msg put_mqtt_bridge(http_msg *msg, const char *name); +static http_msg post_mqtt_bridge_sub(http_msg *msg, const char *name); static int properties_parse(property **properties, cJSON *json); static int handle_publish_msg(cJSON *pub_obj, nng_socket *sock); @@ -849,13 +850,22 @@ process_request(http_msg *msg, conf_http_server *config, nng_socket *sock) strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "publish") == 0) { ret = post_mqtt_msg(msg, sock, handle_publish_msg); + } else if (uri_ct->sub_count == 4 && + uri_ct->sub_tree[3]->end && + strcmp(uri_ct->sub_tree[1]->node, "bridges") == 0 && + strcmp(uri_ct->sub_tree[2]->node, "sub") == 0 + ) { + ret = post_mqtt_bridge_sub( + msg, uri_ct->sub_tree[3]->node); } else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "publish_batch") == 0) { ret = post_mqtt_msg_batch(msg, sock, handle_publish_msg); - } /* else if (uri_ct->sub_count == 3 && + } + + /* else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "subscribe") == 0) { @@ -2931,4 +2941,195 @@ put_mqtt_bridge(http_msg *msg, const char *name) return error_response( msg, NNG_HTTP_STATUS_NOT_FOUND, REQ_PARAM_ERROR); } +} + +static void +free_user_property(conf_user_property **prop, size_t sz) +{ + if (sz > 0 && prop) { + for (size_t i = 0; i < sz; i++) { + if (prop[i]) { + if (prop[i]->key) { + free(prop[i]->key); + } + if (prop[i]->value) { + free(prop[i]->value); + } + free(prop[i]); + } + } + cvector_free(prop); + prop = NULL; + } +} + +static void +free_sub_property(conf_bridge_sub_properties *prop) +{ + if (prop) { + free_user_property( + prop->user_property, prop->user_property_size); + prop->user_property_size = 0; + prop->identifier = 0; + free(prop); + prop = NULL; + } +} + +static void +free_topic_list(topics **list, size_t count) +{ + if (list && count > 0) { + for (size_t i = 0; i < count; i++) { + if (list[i]->topic) { + free(list[i]->topic); + list[i]->topic = NULL; + } + } + cvector_free(list); + list = NULL; + } +} + +static http_msg +post_mqtt_bridge_sub(http_msg *msg, const char *name) +{ + // node, [topic, qos], property + http_msg res = { .status = NNG_HTTP_STATUS_OK }; + enum nng_http_status status = NNG_HTTP_STATUS_OK; + int code = SUCCEED; + + cJSON *req = cJSON_ParseWithLength(msg->data, msg->data_len); + if (!cJSON_IsObject(req)) { + goto out; + } + + cJSON *data_obj = cJSON_GetObjectItem(req, "data"); + if (!cJSON_IsObject(data_obj)) { + goto out; + } + + cJSON *sub_array = cJSON_GetObjectItem(data_obj, "subscription"); + + if (!cJSON_IsArray(sub_array)) { + goto out; + } + + size_t array_size = cJSON_GetArraySize(sub_array); + topics **sub_topics = NULL; + size_t sub_count = 0; + + cJSON *item; + int rv = 0; + + for (size_t i = 0; i < array_size; i++) { + topics *tp = nng_zalloc(sizeof(topics)); + cJSON * sub_item = cJSON_GetArrayItem(sub_array, i); + getNumberValue(sub_item, item, "qos", tp->qos, rv); + char *topic = NULL; + getStringValue(sub_item, item, "topic", topic, rv); + if (rv == 0) { + tp->topic = nng_strdup(topic); + tp->topic_len = strlen(tp->topic); + } else { + continue; + } + cvector_push_back(sub_topics, tp); + sub_count++; + } + + // properties + cJSON *json_prop = cJSON_GetObjectItem(data_obj, "sub_properties"); + conf_bridge_sub_properties *sub_props = NULL; + if (cJSON_IsObject(json_prop)) { + sub_props = nng_zalloc(sizeof(conf_bridge_sub_properties)); + getNumberValue( + json_prop, item, "identifier", sub_props->identifier, rv); + cJSON *up_array = + cJSON_GetObjectItem(json_prop, "user_properties"); + size_t up_count = cJSON_GetArraySize(up_array); + + conf_user_property **conf_ups = NULL; + + for (size_t i = 0; i < up_count; i++) { + char *key = NULL; + char *value = NULL; + + getStringValue(json_prop, item, "key", key, rv); + if (rv == 0) { + getStringValue( + json_prop, item, "value", value, rv); + if (rv == 0) { + conf_user_property *up = nng_zalloc( + sizeof(conf_user_property)); + up->key = nng_strdup(key); + up->value = nng_strdup(value); + cvector_push_back(conf_ups, up); + } + } + } + sub_props->user_property = conf_ups; + sub_props->user_property_size = cvector_size(conf_ups); + } + + conf *config = get_global_conf(); + + bool found = false; + conf_bridge *bridge = &config->bridge; + for (size_t i = 0; i < bridge->count; i++) { + conf_bridge_node *node = bridge->nodes[i]; + if (name != NULL && strcmp(node->name, name) != 0) { + continue; + } + if (sub_count > 0) { + cvector_copy(sub_topics, node->sub_list); + node->sub_count += sub_count; + } + if (sub_props != NULL) { + free_sub_property(node->sub_properties); + node->sub_properties = sub_props; + } + found = true; + // TODO @Wangha handle subscribe + // TODO params: config, node, node->sock, sub_topics, sub_count + break; + } + + if (!found) { + status = NNG_HTTP_STATUS_NOT_FOUND; + code = REQ_PARAM_ERROR; + free_topic_list(sub_topics, sub_count); + free_sub_property(sub_props); + goto out; + } + + cJSON_Delete(req); + + cJSON *res_obj = cJSON_CreateObject(); + cJSON_AddNumberToObject(res_obj, "code", SUCCEED); + char *dest = cJSON_PrintUnformatted(res_obj); + + put_http_msg( + &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + + cJSON_free(dest); + return res; + +out: + if (cJSON_IsObject(req)) { + cJSON_Delete(req); + } + + return error_response(msg, + code == NNG_HTTP_STATUS_NOT_FOUND ? code + : NNG_HTTP_STATUS_BAD_REQUEST, + (uint16_t) (status == REQ_PARAM_ERROR + ? status + : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); +} + +static http_msg +post_mqtt_bridge_unsub(http_msg *msg, const char *name) +{ + } \ No newline at end of file From afa52ffc42499015c1219627c5ce8cff5429560a Mon Sep 17 00:00:00 2001 From: alvin1221 Date: Fri, 5 May 2023 12:08:04 +0800 Subject: [PATCH 3/5] * NEW [rest] add unsubscribe API for bridge --- nanomq/rest_api.c | 131 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index 1277e8fd8..d84fe7e65 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -349,6 +349,7 @@ static http_msg post_mqtt_msg_batch( static http_msg get_mqtt_bridge(http_msg *msg, const char *name); static http_msg put_mqtt_bridge(http_msg *msg, const char *name); static http_msg post_mqtt_bridge_sub(http_msg *msg, const char *name); +static http_msg post_mqtt_bridge_unsub(http_msg *msg, const char *name); static int properties_parse(property **properties, cJSON *json); static int handle_publish_msg(cJSON *pub_obj, nng_socket *sock); @@ -857,6 +858,12 @@ process_request(http_msg *msg, conf_http_server *config, nng_socket *sock) ) { ret = post_mqtt_bridge_sub( msg, uri_ct->sub_tree[3]->node); + } else if (uri_ct->sub_count == 4 && + uri_ct->sub_tree[3]->end && + strcmp(uri_ct->sub_tree[1]->node, "bridges") == 0 && + strcmp(uri_ct->sub_tree[2]->node, "unsub") == 0) { + ret = post_mqtt_bridge_unsub( + msg, uri_ct->sub_tree[3]->node); } else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && @@ -864,7 +871,7 @@ process_request(http_msg *msg, conf_http_server *config, nng_socket *sock) ret = post_mqtt_msg_batch(msg, sock, handle_publish_msg); } - + /* else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && @@ -3041,7 +3048,10 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) // properties cJSON *json_prop = cJSON_GetObjectItem(data_obj, "sub_properties"); conf_bridge_sub_properties *sub_props = NULL; + property *prop_list = NULL; if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + sub_props = nng_zalloc(sizeof(conf_bridge_sub_properties)); getNumberValue( json_prop, item, "identifier", sub_props->identifier, rv); @@ -3082,6 +3092,7 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) continue; } if (sub_count > 0) { + // TODO handle repeated topics cvector_copy(sub_topics, node->sub_list); node->sub_count += sub_count; } @@ -3090,8 +3101,9 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) node->sub_properties = sub_props; } found = true; + // TODO convert sub_topics to nng_mqtt_topic_qos // TODO @Wangha handle subscribe - // TODO params: config, node, node->sock, sub_topics, sub_count + // TODO params: config, node, node->sock, nng_mqtt_topic_qos, sub_count, prop_list break; } @@ -3100,6 +3112,7 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) code = REQ_PARAM_ERROR; free_topic_list(sub_topics, sub_count); free_sub_property(sub_props); + property_free(prop_list); goto out; } @@ -3131,5 +3144,119 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) static http_msg post_mqtt_bridge_unsub(http_msg *msg, const char *name) { + http_msg res = { .status = NNG_HTTP_STATUS_OK }; + enum nng_http_status status = NNG_HTTP_STATUS_OK; + int code = SUCCEED; + + cJSON *req = cJSON_ParseWithLength(msg->data, msg->data_len); + if (!cJSON_IsObject(req)) { + goto out; + } + + cJSON *data_obj = cJSON_GetObjectItem(req, "data"); + if (!cJSON_IsObject(data_obj)) { + goto out; + } + + cJSON *unsub_array = cJSON_GetObjectItem(data_obj, "unsubscription"); + + if (!cJSON_IsArray(unsub_array)) { + goto out; + } + + topics **unsub_topics = NULL; + size_t unsub_count = 0; + + cJSON *item; + int rv = 0; + + size_t array_size = cJSON_GetArraySize(unsub_array); + for (size_t i = 0; i < array_size; i++) { + topics *tp = nng_zalloc(sizeof(topics)); + cJSON * unsub_item = cJSON_GetArrayItem(unsub_array, i); + char *topic = NULL; + getStringValue(unsub_item, item, "topic", topic, rv); + if (rv == 0) { + tp->topic = nng_strdup(topic); + tp->topic_len = strlen(tp->topic); + } else { + continue; + } + cvector_push_back(unsub_topics, tp); + unsub_count++; + } + + // properties + cJSON *json_prop = cJSON_GetObjectItem(data_obj, "unsub_properties"); + property *prop_list = NULL; + if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + } + + conf *config = get_global_conf(); + + bool found = false; + conf_bridge *bridge = &config->bridge; + for (size_t i = 0; i < bridge->count; i++) { + conf_bridge_node *node = bridge->nodes[i]; + if (name != NULL && strcmp(node->name, name) != 0) { + continue; + } + + for (size_t i = 0; i < unsub_count; i++) { + topics *unsub_topic = unsub_topics[i]; + for (size_t j = 0; j < node->sub_count; j++) + { + topics *sub_topic = node->sub_list[j]; + if (strcmp(unsub_topic->topic, sub_topic->topic) == 0) { + + cvector_erase(node->sub_list, j); + node->sub_count--; + nng_free(sub_topic->topic, sub_topic->topic_len); + nng_free(sub_topic, sizeof(topics)); + break; + } + } + } + + found = true; + // TODO convert unsub_topics to nng_mqtt_topic + + // TODO @Wangha handle unsubscribe + // TODO params: config, node, node->sock, nng_mqtt_topic, unsub_count + break; + } + + if (!found) { + status = NNG_HTTP_STATUS_NOT_FOUND; + code = REQ_PARAM_ERROR; + free_topic_list(unsub_topics, unsub_count); + property_free(prop_list); + goto out; + } + + cJSON_Delete(req); + + cJSON *res_obj = cJSON_CreateObject(); + cJSON_AddNumberToObject(res_obj, "code", SUCCEED); + char *dest = cJSON_PrintUnformatted(res_obj); + + put_http_msg( + &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + + cJSON_free(dest); + return res; + +out: + if (cJSON_IsObject(req)) { + cJSON_Delete(req); + } + + return error_response(msg, + code == NNG_HTTP_STATUS_NOT_FOUND ? code + : NNG_HTTP_STATUS_BAD_REQUEST, + (uint16_t) (status == REQ_PARAM_ERROR + ? status + : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); } \ No newline at end of file From 454c9aed797a40a33f959696797846439e47626a Mon Sep 17 00:00:00 2001 From: alvin1221 Date: Fri, 5 May 2023 15:24:40 +0800 Subject: [PATCH 4/5] * FIX [rest] fix incorrect status code & memleaks --- nanomq/rest_api.c | 110 ++++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 42 deletions(-) diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index d84fe7e65..0fdf2cd97 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -2668,6 +2668,7 @@ handle_publish_msg(cJSON *pub_obj, nng_socket *sock) if (cJSON_IsObject(json_prop)) { rv = properties_parse(&props, json_prop); if (rv != 0) { + property_free(props); goto out; } } @@ -2989,9 +2990,25 @@ free_topic_list(topics **list, size_t count) if (list && count > 0) { for (size_t i = 0; i < count; i++) { if (list[i]->topic) { - free(list[i]->topic); + nng_free(list[i]->topic, list[i]->topic_len); list[i]->topic = NULL; } + nng_free(list[i], sizeof(topics)); + } + cvector_free(list); + list = NULL; + } +} + +static void +free_string_list(char **list, size_t count) +{ + if (list && count > 0) { + for (size_t i = 0; i < count; i++) { + if (list[i]) { + free(list[i]); + list[i] = NULL; + } } cvector_free(list); list = NULL; @@ -3023,12 +3040,14 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) } size_t array_size = cJSON_GetArraySize(sub_array); + topics **sub_topics = NULL; size_t sub_count = 0; cJSON *item; int rv = 0; + // Get topic list for (size_t i = 0; i < array_size; i++) { topics *tp = nng_zalloc(sizeof(topics)); cJSON * sub_item = cJSON_GetArrayItem(sub_array, i); @@ -3045,13 +3064,11 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) sub_count++; } - // properties + // Get properties cJSON *json_prop = cJSON_GetObjectItem(data_obj, "sub_properties"); conf_bridge_sub_properties *sub_props = NULL; - property *prop_list = NULL; + if (cJSON_IsObject(json_prop)) { - properties_parse(&prop_list, json_prop); - sub_props = nng_zalloc(sizeof(conf_bridge_sub_properties)); getNumberValue( json_prop, item, "identifier", sub_props->identifier, rv); @@ -3091,6 +3108,15 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) if (name != NULL && strcmp(node->name, name) != 0) { continue; } + + // Decode properties to nng_mqtt_property + property *prop_list = NULL; + if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) { + if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + } + } + if (sub_count > 0) { // TODO handle repeated topics cvector_copy(sub_topics, node->sub_list); @@ -3100,7 +3126,9 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) free_sub_property(node->sub_properties); node->sub_properties = sub_props; } + cvector_free(sub_topics); found = true; + // TODO convert sub_topics to nng_mqtt_topic_qos // TODO @Wangha handle subscribe // TODO params: config, node, node->sock, nng_mqtt_topic_qos, sub_count, prop_list @@ -3110,14 +3138,11 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) if (!found) { status = NNG_HTTP_STATUS_NOT_FOUND; code = REQ_PARAM_ERROR; - free_topic_list(sub_topics, sub_count); free_sub_property(sub_props); - property_free(prop_list); + free_topic_list(sub_topics, sub_count); goto out; } - cJSON_Delete(req); - cJSON *res_obj = cJSON_CreateObject(); cJSON_AddNumberToObject(res_obj, "code", SUCCEED); char *dest = cJSON_PrintUnformatted(res_obj); @@ -3125,7 +3150,9 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) put_http_msg( &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + cJSON_Delete(req); cJSON_free(dest); + cJSON_Delete(res_obj); return res; out: @@ -3134,11 +3161,9 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) } return error_response(msg, - code == NNG_HTTP_STATUS_NOT_FOUND ? code - : NNG_HTTP_STATUS_BAD_REQUEST, - (uint16_t) (status == REQ_PARAM_ERROR - ? status - : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); + status == NNG_HTTP_STATUS_NOT_FOUND ? status + : NNG_HTTP_STATUS_BAD_REQUEST, + (code == REQ_PARAM_ERROR ? code : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); } static http_msg @@ -3158,13 +3183,13 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) goto out; } + // Get unsub topic list cJSON *unsub_array = cJSON_GetObjectItem(data_obj, "unsubscription"); - if (!cJSON_IsArray(unsub_array)) { goto out; } - topics **unsub_topics = NULL; + char ** unsub_topics = NULL; size_t unsub_count = 0; cJSON *item; @@ -3173,27 +3198,18 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) size_t array_size = cJSON_GetArraySize(unsub_array); for (size_t i = 0; i < array_size; i++) { - topics *tp = nng_zalloc(sizeof(topics)); cJSON * unsub_item = cJSON_GetArrayItem(unsub_array, i); char *topic = NULL; getStringValue(unsub_item, item, "topic", topic, rv); if (rv == 0) { - tp->topic = nng_strdup(topic); - tp->topic_len = strlen(tp->topic); + topic = nng_strdup(topic); } else { continue; } - cvector_push_back(unsub_topics, tp); + cvector_push_back(unsub_topics, topic); unsub_count++; } - // properties - cJSON *json_prop = cJSON_GetObjectItem(data_obj, "unsub_properties"); - property *prop_list = NULL; - if (cJSON_IsObject(json_prop)) { - properties_parse(&prop_list, json_prop); - } - conf *config = get_global_conf(); bool found = false; @@ -3204,16 +3220,28 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) continue; } + // Get properties + property *prop_list = NULL; + if (node->proto_ver == MQTT_VERSION_V5) { + cJSON *json_prop = + cJSON_GetObjectItem(data_obj, "unsub_properties"); + + if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + } + } + for (size_t i = 0; i < unsub_count; i++) { - topics *unsub_topic = unsub_topics[i]; - for (size_t j = 0; j < node->sub_count; j++) - { + char *unsub_topic = unsub_topics[i]; + for (size_t j = 0; j < node->sub_count; j++) { topics *sub_topic = node->sub_list[j]; - if (strcmp(unsub_topic->topic, sub_topic->topic) == 0) { - + if (strcmp(unsub_topic, + sub_topic->topic) == 0) { + cvector_erase(node->sub_list, j); node->sub_count--; - nng_free(sub_topic->topic, sub_topic->topic_len); + nng_free(sub_topic->topic, + sub_topic->topic_len); nng_free(sub_topic, sizeof(topics)); break; } @@ -3228,16 +3256,14 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) break; } + free_string_list(unsub_topics, unsub_count); + if (!found) { status = NNG_HTTP_STATUS_NOT_FOUND; code = REQ_PARAM_ERROR; - free_topic_list(unsub_topics, unsub_count); - property_free(prop_list); goto out; } - cJSON_Delete(req); - cJSON *res_obj = cJSON_CreateObject(); cJSON_AddNumberToObject(res_obj, "code", SUCCEED); char *dest = cJSON_PrintUnformatted(res_obj); @@ -3245,7 +3271,9 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) put_http_msg( &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + cJSON_Delete(req); cJSON_free(dest); + cJSON_Delete(res_obj); return res; out: @@ -3254,9 +3282,7 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) } return error_response(msg, - code == NNG_HTTP_STATUS_NOT_FOUND ? code - : NNG_HTTP_STATUS_BAD_REQUEST, - (uint16_t) (status == REQ_PARAM_ERROR - ? status - : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); + status == NNG_HTTP_STATUS_NOT_FOUND ? status + : NNG_HTTP_STATUS_BAD_REQUEST, + (code == REQ_PARAM_ERROR ? code : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); } \ No newline at end of file From 60d12e260a1002a1e75e239e1fd8e6039a200065 Mon Sep 17 00:00:00 2001 From: alvin1221 Date: Fri, 5 May 2023 15:43:11 +0800 Subject: [PATCH 5/5] * NEW [rest] add conversion function --- nanomq/rest_api.c | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index 0fdf2cd97..b35a4304b 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -3015,6 +3015,28 @@ free_string_list(char **list, size_t count) } } +static nng_mqtt_topic_qos * +convert_topic_qos(topics **list, size_t count) +{ + nng_mqtt_topic_qos *topics = nng_mqtt_topic_qos_array_create(count); + for (size_t i = 0; i < count; i++) { + nng_mqtt_topic_qos_array_set( + topics, i, list[i]->topic, list[i]->qos, 1, 0, 0); + } + return topics; +} + +static nng_mqtt_topic * +convert_topic(char **list, size_t count) +{ + nng_mqtt_topic *topics = nng_mqtt_topic_array_create(count); + + for (size_t i = 0; i < count; i++) { + nng_mqtt_topic_array_set(topics, i, list[i]); + } + return topics; +} + static http_msg post_mqtt_bridge_sub(http_msg *msg, const char *name) { @@ -3129,9 +3151,12 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) cvector_free(sub_topics); found = true; - // TODO convert sub_topics to nng_mqtt_topic_qos + // convert sub_topics to nng_mqtt_topic_qos + // TODO uncomment the following line if you want to use nng_mqtt_topic_qos + // nng_mqtt_topic_qos *topic_list = convert_topic_qos(node->sub_list, node->sub_count); + // TODO @Wangha handle subscribe - // TODO params: config, node, node->sock, nng_mqtt_topic_qos, sub_count, prop_list + // TODO params: config, node, node->sock, topic_list, sub_count, prop_list break; } @@ -3249,10 +3274,12 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) } found = true; - // TODO convert unsub_topics to nng_mqtt_topic + // convert unsub_topics to nng_mqtt_topic + // TODO uncomment the following line if you want to use nng_mqtt_topic + // nng_mqtt_topic *topic_list = convert_topic(unsub_topics, unsub_count); // TODO @Wangha handle unsubscribe - // TODO params: config, node, node->sock, nng_mqtt_topic, unsub_count + // TODO params: config, node, node->sock, topic_list, unsub_count, prop_list break; }