diff --git a/nanomq/aws_bridge.c b/nanomq/aws_bridge.c index 314d2341f..5800bc8f0 100644 --- a/nanomq/aws_bridge.c +++ b/nanomq/aws_bridge.c @@ -412,9 +412,9 @@ subscribe_to_topic(MQTTContext_t *mqtt_ctx, conf_bridge_node *node) /* This example subscribes to only one topic and uses QOS1. */ for (size_t i = 0; i < node->sub_count; i++) { - sub_list[i].qos = node->sub_list[i].qos; - sub_list[i].pTopicFilter = node->sub_list[i].topic; - sub_list[i].topicFilterLength = node->sub_list[i].topic_len; + sub_list[i].qos = node->sub_list[i]->qos; + sub_list[i].pTopicFilter = node->sub_list[i]->topic; + sub_list[i].topicFilterLength = node->sub_list[i]->topic_len; } /* Generate packet identifier for the SUBSCRIBE packet. */ diff --git a/nanomq/bridge.c b/nanomq/bridge.c index 5e3af15ad..a3edc900e 100644 --- a/nanomq/bridge.c +++ b/nanomq/bridge.c @@ -195,11 +195,11 @@ bridge_connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg) nng_mqtt_topic_qos_array_create(param->config->sub_count); for (size_t i = 0; i < param->config->sub_count; i++) { nng_mqtt_topic_qos_array_set(topic_qos, i, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Bridge client subscribed topic %s (qos %d).", - param->config->sub_list[i].topic, - param->config->sub_list[i].qos); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos); } nng_mqtt_client *client = param->client; @@ -593,13 +593,13 @@ quic_ack_cb(void *arg) nng_mqtt_topic_qos *topic_qos = nng_mqtt_topic_qos_array_create(1); nng_mqtt_topic_qos_array_set(topic_qos, 0, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Quic bridge client subscribe to " "topic (QoS " "%d)%s.", - param->config->sub_list[i].qos, - param->config->sub_list[i].topic); + param->config->sub_list[i]->qos, + param->config->sub_list[i]->topic); nng_mqtt_subscribe_async( client, topic_qos, 1, NULL); nng_mqtt_topic_qos_array_free(topic_qos, 1); @@ -610,12 +610,12 @@ quic_ack_cb(void *arg) param->config->sub_count); for (size_t i = 0; i < param->config->sub_count; i++) { nng_mqtt_topic_qos_array_set(topic_qos, i, - param->config->sub_list[i].topic, - param->config->sub_list[i].qos, 1, 0, 0); + param->config->sub_list[i]->topic, + param->config->sub_list[i]->qos, 1, 0, 0); log_info("Quic bridge client subscribed topic " "(q%d)%s.", - param->config->sub_list[i].qos, - param->config->sub_list[i].topic); + param->config->sub_list[i]->qos, + param->config->sub_list[i]->topic); } // TODO support MQTT V5 nng_mqtt_subscribe_async( diff --git a/nanomq/conf_api.c b/nanomq/conf_api.c index 45f290231..b6a6d44fa 100644 --- a/nanomq/conf_api.c +++ b/nanomq/conf_api.c @@ -366,11 +366,11 @@ get_bridge_config(conf_bridge *bridge, const char *node_name) cJSON *sub_infos = cJSON_CreateArray(); for (size_t j = 0; j < node->sub_count; j++) { - cJSON *sub_obj = cJSON_CreateObject(); - topics sub = node->sub_list[j]; + cJSON * sub_obj = cJSON_CreateObject(); + topics *sub = node->sub_list[j]; cJSON_AddStringOrNullToObject( - sub_obj, "topic", sub.topic); - cJSON_AddNumberToObject(sub_obj, "qos", sub.qos); + sub_obj, "topic", sub->topic); + cJSON_AddNumberToObject(sub_obj, "qos", sub->qos); cJSON_AddItemToArray(sub_infos, sub_obj); } diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index 8593d880a..b35a4304b 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -348,6 +348,8 @@ static http_msg post_mqtt_msg_batch( http_msg *msg, nng_socket *sock, handle_mqtt_msg_cb cb); static http_msg get_mqtt_bridge(http_msg *msg, const char *name); static http_msg put_mqtt_bridge(http_msg *msg, const char *name); +static http_msg post_mqtt_bridge_sub(http_msg *msg, const char *name); +static http_msg post_mqtt_bridge_unsub(http_msg *msg, const char *name); static int properties_parse(property **properties, cJSON *json); static int handle_publish_msg(cJSON *pub_obj, nng_socket *sock); @@ -849,13 +851,28 @@ process_request(http_msg *msg, conf_http_server *config, nng_socket *sock) strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "publish") == 0) { ret = post_mqtt_msg(msg, sock, handle_publish_msg); + } else if (uri_ct->sub_count == 4 && + uri_ct->sub_tree[3]->end && + strcmp(uri_ct->sub_tree[1]->node, "bridges") == 0 && + strcmp(uri_ct->sub_tree[2]->node, "sub") == 0 + ) { + ret = post_mqtt_bridge_sub( + msg, uri_ct->sub_tree[3]->node); + } else if (uri_ct->sub_count == 4 && + uri_ct->sub_tree[3]->end && + strcmp(uri_ct->sub_tree[1]->node, "bridges") == 0 && + strcmp(uri_ct->sub_tree[2]->node, "unsub") == 0) { + ret = post_mqtt_bridge_unsub( + msg, uri_ct->sub_tree[3]->node); } else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "publish_batch") == 0) { ret = post_mqtt_msg_batch(msg, sock, handle_publish_msg); - } /* else if (uri_ct->sub_count == 3 && + } + + /* else if (uri_ct->sub_count == 3 && uri_ct->sub_tree[2]->end && strcmp(uri_ct->sub_tree[1]->node, "mqtt") == 0 && strcmp(uri_ct->sub_tree[2]->node, "subscribe") == 0) { @@ -2651,6 +2668,7 @@ handle_publish_msg(cJSON *pub_obj, nng_socket *sock) if (cJSON_IsObject(json_prop)) { rv = properties_parse(&props, json_prop); if (rv != 0) { + property_free(props); goto out; } } @@ -2931,4 +2949,367 @@ put_mqtt_bridge(http_msg *msg, const char *name) return error_response( msg, NNG_HTTP_STATUS_NOT_FOUND, REQ_PARAM_ERROR); } +} + +static void +free_user_property(conf_user_property **prop, size_t sz) +{ + if (sz > 0 && prop) { + for (size_t i = 0; i < sz; i++) { + if (prop[i]) { + if (prop[i]->key) { + free(prop[i]->key); + } + if (prop[i]->value) { + free(prop[i]->value); + } + free(prop[i]); + } + } + cvector_free(prop); + prop = NULL; + } +} + +static void +free_sub_property(conf_bridge_sub_properties *prop) +{ + if (prop) { + free_user_property( + prop->user_property, prop->user_property_size); + prop->user_property_size = 0; + prop->identifier = 0; + free(prop); + prop = NULL; + } +} + +static void +free_topic_list(topics **list, size_t count) +{ + if (list && count > 0) { + for (size_t i = 0; i < count; i++) { + if (list[i]->topic) { + nng_free(list[i]->topic, list[i]->topic_len); + list[i]->topic = NULL; + } + nng_free(list[i], sizeof(topics)); + } + cvector_free(list); + list = NULL; + } +} + +static void +free_string_list(char **list, size_t count) +{ + if (list && count > 0) { + for (size_t i = 0; i < count; i++) { + if (list[i]) { + free(list[i]); + list[i] = NULL; + } + } + cvector_free(list); + list = NULL; + } +} + +static nng_mqtt_topic_qos * +convert_topic_qos(topics **list, size_t count) +{ + nng_mqtt_topic_qos *topics = nng_mqtt_topic_qos_array_create(count); + for (size_t i = 0; i < count; i++) { + nng_mqtt_topic_qos_array_set( + topics, i, list[i]->topic, list[i]->qos, 1, 0, 0); + } + return topics; +} + +static nng_mqtt_topic * +convert_topic(char **list, size_t count) +{ + nng_mqtt_topic *topics = nng_mqtt_topic_array_create(count); + + for (size_t i = 0; i < count; i++) { + nng_mqtt_topic_array_set(topics, i, list[i]); + } + return topics; +} + +static http_msg +post_mqtt_bridge_sub(http_msg *msg, const char *name) +{ + // node, [topic, qos], property + http_msg res = { .status = NNG_HTTP_STATUS_OK }; + enum nng_http_status status = NNG_HTTP_STATUS_OK; + int code = SUCCEED; + + cJSON *req = cJSON_ParseWithLength(msg->data, msg->data_len); + if (!cJSON_IsObject(req)) { + goto out; + } + + cJSON *data_obj = cJSON_GetObjectItem(req, "data"); + if (!cJSON_IsObject(data_obj)) { + goto out; + } + + cJSON *sub_array = cJSON_GetObjectItem(data_obj, "subscription"); + + if (!cJSON_IsArray(sub_array)) { + goto out; + } + + size_t array_size = cJSON_GetArraySize(sub_array); + + topics **sub_topics = NULL; + size_t sub_count = 0; + + cJSON *item; + int rv = 0; + + // Get topic list + for (size_t i = 0; i < array_size; i++) { + topics *tp = nng_zalloc(sizeof(topics)); + cJSON * sub_item = cJSON_GetArrayItem(sub_array, i); + getNumberValue(sub_item, item, "qos", tp->qos, rv); + char *topic = NULL; + getStringValue(sub_item, item, "topic", topic, rv); + if (rv == 0) { + tp->topic = nng_strdup(topic); + tp->topic_len = strlen(tp->topic); + } else { + continue; + } + cvector_push_back(sub_topics, tp); + sub_count++; + } + + // Get properties + cJSON *json_prop = cJSON_GetObjectItem(data_obj, "sub_properties"); + conf_bridge_sub_properties *sub_props = NULL; + + if (cJSON_IsObject(json_prop)) { + sub_props = nng_zalloc(sizeof(conf_bridge_sub_properties)); + getNumberValue( + json_prop, item, "identifier", sub_props->identifier, rv); + cJSON *up_array = + cJSON_GetObjectItem(json_prop, "user_properties"); + size_t up_count = cJSON_GetArraySize(up_array); + + conf_user_property **conf_ups = NULL; + + for (size_t i = 0; i < up_count; i++) { + char *key = NULL; + char *value = NULL; + + getStringValue(json_prop, item, "key", key, rv); + if (rv == 0) { + getStringValue( + json_prop, item, "value", value, rv); + if (rv == 0) { + conf_user_property *up = nng_zalloc( + sizeof(conf_user_property)); + up->key = nng_strdup(key); + up->value = nng_strdup(value); + cvector_push_back(conf_ups, up); + } + } + } + sub_props->user_property = conf_ups; + sub_props->user_property_size = cvector_size(conf_ups); + } + + conf *config = get_global_conf(); + + bool found = false; + conf_bridge *bridge = &config->bridge; + for (size_t i = 0; i < bridge->count; i++) { + conf_bridge_node *node = bridge->nodes[i]; + if (name != NULL && strcmp(node->name, name) != 0) { + continue; + } + + // Decode properties to nng_mqtt_property + property *prop_list = NULL; + if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) { + if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + } + } + + if (sub_count > 0) { + // TODO handle repeated topics + cvector_copy(sub_topics, node->sub_list); + node->sub_count += sub_count; + } + if (sub_props != NULL) { + free_sub_property(node->sub_properties); + node->sub_properties = sub_props; + } + cvector_free(sub_topics); + found = true; + + // convert sub_topics to nng_mqtt_topic_qos + // TODO uncomment the following line if you want to use nng_mqtt_topic_qos + // nng_mqtt_topic_qos *topic_list = convert_topic_qos(node->sub_list, node->sub_count); + + // TODO @Wangha handle subscribe + // TODO params: config, node, node->sock, topic_list, sub_count, prop_list + break; + } + + if (!found) { + status = NNG_HTTP_STATUS_NOT_FOUND; + code = REQ_PARAM_ERROR; + free_sub_property(sub_props); + free_topic_list(sub_topics, sub_count); + goto out; + } + + cJSON *res_obj = cJSON_CreateObject(); + cJSON_AddNumberToObject(res_obj, "code", SUCCEED); + char *dest = cJSON_PrintUnformatted(res_obj); + + put_http_msg( + &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + + cJSON_Delete(req); + cJSON_free(dest); + cJSON_Delete(res_obj); + return res; + +out: + if (cJSON_IsObject(req)) { + cJSON_Delete(req); + } + + return error_response(msg, + status == NNG_HTTP_STATUS_NOT_FOUND ? status + : NNG_HTTP_STATUS_BAD_REQUEST, + (code == REQ_PARAM_ERROR ? code : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); +} + +static http_msg +post_mqtt_bridge_unsub(http_msg *msg, const char *name) +{ + http_msg res = { .status = NNG_HTTP_STATUS_OK }; + enum nng_http_status status = NNG_HTTP_STATUS_OK; + int code = SUCCEED; + + cJSON *req = cJSON_ParseWithLength(msg->data, msg->data_len); + if (!cJSON_IsObject(req)) { + goto out; + } + + cJSON *data_obj = cJSON_GetObjectItem(req, "data"); + if (!cJSON_IsObject(data_obj)) { + goto out; + } + + // Get unsub topic list + cJSON *unsub_array = cJSON_GetObjectItem(data_obj, "unsubscription"); + if (!cJSON_IsArray(unsub_array)) { + goto out; + } + + char ** unsub_topics = NULL; + size_t unsub_count = 0; + + cJSON *item; + int rv = 0; + + size_t array_size = cJSON_GetArraySize(unsub_array); + + for (size_t i = 0; i < array_size; i++) { + cJSON * unsub_item = cJSON_GetArrayItem(unsub_array, i); + char *topic = NULL; + getStringValue(unsub_item, item, "topic", topic, rv); + if (rv == 0) { + topic = nng_strdup(topic); + } else { + continue; + } + cvector_push_back(unsub_topics, topic); + unsub_count++; + } + + conf *config = get_global_conf(); + + bool found = false; + conf_bridge *bridge = &config->bridge; + for (size_t i = 0; i < bridge->count; i++) { + conf_bridge_node *node = bridge->nodes[i]; + if (name != NULL && strcmp(node->name, name) != 0) { + continue; + } + + // Get properties + property *prop_list = NULL; + if (node->proto_ver == MQTT_VERSION_V5) { + cJSON *json_prop = + cJSON_GetObjectItem(data_obj, "unsub_properties"); + + if (cJSON_IsObject(json_prop)) { + properties_parse(&prop_list, json_prop); + } + } + + for (size_t i = 0; i < unsub_count; i++) { + char *unsub_topic = unsub_topics[i]; + for (size_t j = 0; j < node->sub_count; j++) { + topics *sub_topic = node->sub_list[j]; + if (strcmp(unsub_topic, + sub_topic->topic) == 0) { + + cvector_erase(node->sub_list, j); + node->sub_count--; + nng_free(sub_topic->topic, + sub_topic->topic_len); + nng_free(sub_topic, sizeof(topics)); + break; + } + } + } + + found = true; + // convert unsub_topics to nng_mqtt_topic + // TODO uncomment the following line if you want to use nng_mqtt_topic + // nng_mqtt_topic *topic_list = convert_topic(unsub_topics, unsub_count); + + // TODO @Wangha handle unsubscribe + // TODO params: config, node, node->sock, topic_list, unsub_count, prop_list + break; + } + + free_string_list(unsub_topics, unsub_count); + + if (!found) { + status = NNG_HTTP_STATUS_NOT_FOUND; + code = REQ_PARAM_ERROR; + goto out; + } + + cJSON *res_obj = cJSON_CreateObject(); + cJSON_AddNumberToObject(res_obj, "code", SUCCEED); + char *dest = cJSON_PrintUnformatted(res_obj); + + put_http_msg( + &res, "application/json", NULL, NULL, NULL, dest, strlen(dest)); + + cJSON_Delete(req); + cJSON_free(dest); + cJSON_Delete(res_obj); + return res; + +out: + if (cJSON_IsObject(req)) { + cJSON_Delete(req); + } + + return error_response(msg, + status == NNG_HTTP_STATUS_NOT_FOUND ? status + : NNG_HTTP_STATUS_BAD_REQUEST, + (code == REQ_PARAM_ERROR ? code : REQ_PARAMS_JSON_FORMAT_ILLEGAL)); } \ No newline at end of file