diff --git a/nanomq/rest_api.c b/nanomq/rest_api.c index 34c934f0c..3ee593b8c 100644 --- a/nanomq/rest_api.c +++ b/nanomq/rest_api.c @@ -1156,6 +1156,7 @@ get_client_cb(void *key, void *value, void *json_obj) const uint8_t *cid = conn_param_get_clientid(cp); if (info->client_id != NULL) { if (strcmp(info->client_id, (const char *) cid) != 0) { + conn_param_free(cp); return; } } @@ -1163,6 +1164,7 @@ get_client_cb(void *key, void *value, void *json_obj) if (info->username != NULL) { if (user_name == NULL || strcmp(info->username, (const char *) user_name) != 0) { + conn_param_free(cp); return; } } @@ -1190,6 +1192,8 @@ get_client_cb(void *key, void *value, void *json_obj) // ctxt->recv_cnt != NULL ? // nng_atomic_get64(ctxt->recv_cnt) : 0); #endif cJSON_AddItemToArray(info->array, data_info_elem); + + conn_param_free(cp); } static void @@ -1205,6 +1209,8 @@ get_metric_cb(void *key, void *value, void *stats) const uint8_t *cid = conn_param_get_clientid(cp); if (!status) s->connections++; + conn_param_free(cp); + // #ifdef STATISTICS // cJSON_AddNumberToObject(data_info_elem, "recv_msg", // ctxt->recv_cnt != NULL ? @@ -1532,12 +1538,13 @@ get_subscriptions( if (cp) { cid = (const char *) conn_param_get_clientid( cp); - if (client_id) { - if (strcmp(client_id, cid) != 0) { + conn_param_free(cp); + if (client_id) { + if (strcmp(client_id, cid) != 0) { goto skip; } - } - } + } + } // topic_queue *tn = pt[i]->topic; topic_queue *tq = dbhash_copy_topic_queue(pt[i]->pipe); @@ -2377,7 +2384,9 @@ get_client_info_cb(uint32_t pid) nng_pipe pipe = { .id = pid }; conn_param *cp = nng_pipe_cparam(pipe); - return (void *) conn_param_get_clientid(cp); + uint8_t *clientid = conn_param_get_clientid(cp); + conn_param_free(cp); + return (void *) clientid; } static http_msg @@ -3394,8 +3403,9 @@ convert_topic_qos(topics **list, size_t count) { nng_mqtt_topic_qos *topics = nng_mqtt_topic_qos_array_create(count); for (size_t i = 0; i < count; i++) { - nng_mqtt_topic_qos_array_set( - topics, i, list[i]->remote_topic, list[i]->qos, 1, 0, 0); + nng_mqtt_topic_qos_array_set(topics, i, list[i]->remote_topic, + list[i]->qos, 1, list[i]->retain_as_published, + list[i]->retain_handling); } return topics; } @@ -3445,17 +3455,34 @@ post_mqtt_bridge_sub(http_msg *msg, const char *name) // Get topic list for (size_t i = 0; i < array_size; i++) { - topics *tp = nng_zalloc(sizeof(topics)); - cJSON * sub_item = cJSON_GetArrayItem(sub_array, i); - getNumberValue(sub_item, item, "qos", tp->qos, rv); - char *topic = NULL; - getStringValue(sub_item, item, "topic", topic, rv); + topics *tp = nng_zalloc(sizeof(topics)); + // default value for qos, rap and rh. + uint8_t qos = 0; + uint8_t rap = 1; + uint8_t rh = 0; + char *remote_topic = NULL; + char *local_topic = NULL; + cJSON *sub_item = cJSON_GetArrayItem(sub_array, i); + getNumberValue(sub_item, item, "qos", qos, rv); + getNumberValue(sub_item, item, "retain_as_published", rap, rv); + getNumberValue(sub_item, item, "retain_handling", rh, rv); + getStringValue(sub_item, item, "remote_topic", remote_topic, rv); if (rv == 0) { - tp->remote_topic = nng_strdup(topic); + tp->remote_topic = nng_strdup(remote_topic); tp->remote_topic_len = strlen(tp->remote_topic); } else { continue; } + getStringValue(sub_item, item, "local_topic", local_topic, rv); + if (rv == 0) { + tp->local_topic = nng_strdup(local_topic); + tp->local_topic_len = strlen(tp->local_topic); + } else { + continue; + } + tp->qos = qos; + tp->retain_as_published = rap; + tp->retain_handling = rh; cvector_push_back(sub_topics, tp); sub_count++; } @@ -3680,6 +3707,8 @@ post_mqtt_bridge_unsub(http_msg *msg, const char *name) node->sub_count--; nng_free(sub_topic->remote_topic, sub_topic->remote_topic_len); + nng_free(sub_topic->local_topic, + sub_topic->local_topic_len); nng_free(sub_topic, sizeof(topics)); break; } diff --git a/nanomq/tests/http_server_test.c b/nanomq/tests/http_server_test.c index 1e717d493..d6536ce4b 100644 --- a/nanomq/tests/http_server_test.c +++ b/nanomq/tests/http_server_test.c @@ -165,7 +165,7 @@ test_get_subscriptions_clientid() { char *cmd = "curl -i --basic -u admin_test:pw_test -X GET " - "'http://localhost:8081/api/v4/subscriptions/client-id-test'"; + "'http://localhost:8081/api/v4/subscriptions/clientid-test'"; FILE *fd = popen(cmd, "r"); bool rv = check_http_return(fd, STATUS_CODE_OK, SUCCEED); pclose(fd); @@ -281,8 +281,8 @@ test_put_bridges_sub() "'http://localhost:8081/api/v4/bridges/sub/emqx' " "--basic -u admin_test:pw_test -d '{" "\"data\": {" - "\"subscription\": [{\"topic\": " - "\"cmd/topic4\"},{\"topic\": \"cmd/topic5\"}]," + "\"subscription\": [{\"remote_topic\": " + "\"cmd/topic4\",\"local_topic\": \"cmd_lo/topic4\"}]," "\"sub_properties\": {\"user_properties\": [{\"key\": " "\"key1\",\"value\": \"value1\"},{\"key\": " "\"key2\",\"value\": \"value2\"}]}}}'"; @@ -299,8 +299,8 @@ test_put_bridges_unsub() "'http://localhost:8081/api/v4/bridges/unsub/emqx' " "--basic -u admin_test:pw_test -d '{" "\"data\": {" - "\"unsubscription\": [{\"remote_topic\": \"cmd/topic1\", \"local_topic\": \"cmd_lo/topic1\"}," - "{\"remote_topic\": \"cmd/topic2\", \"local_topic\": \"cmd_lo/topic2\"}" + "\"unsubscription\": [{\"topic\": \"cmd/topic1\"}," + "{\"topic\": \"cmd/topic2\"}]," "\"unsub_properties\": {\"user_properties\": [{\"key\": " "\"key1\",\"value\": \"value1\"},{\"key\": " "\"key2\",\"value\": \"value2\"}]}}}'"; @@ -583,21 +583,25 @@ test_misuse_of_method() int main() { - char *cmd = "mosquitto_sub -h 127.0.0.1 -p 1881 -t topic-test -u " - "user-test -i clientid-test"; - char *cmd2 = "mosquitto_sub -h 127.0.0.1 -p 1881 -t topic-test2 -u " - "user-test -i clientid-test"; + char *cmd[] = { "mosquitto_sub", "-h", "127.0.0.1", "-p", "1881", "-t", + "topic-test", "-u", "user-test", "-i", "clientid-test", + NULL }; + char *cmd2[] = { "mosquitto_sub", "-h", "127.0.0.1", "-p", "1881", "-t", + "topic-test2", "-u", "user-test2", "-i", "clientid-test2", + NULL }; nng_thread *nmq; conf *conf; - FILE *fd; - FILE *fd2; + pid_t pid_sub; + pid_t pid_sub2; + int outfp; + int outfp2; conf = get_test_conf(ALL_FEATURE_CONF); assert(conf != NULL); nng_thread_create(&nmq, (void *) broker_start_with_conf, (void *) conf); - // nng_msleep(100); // wait a while for broker to init - fd = popen(cmd, "r"); - fd2 = popen(cmd2, "r"); + nng_msleep(100); // wait a while for broker to init + pid_sub = popen_sub_with_cmd(&outfp, cmd); + pid_sub2 = popen_sub_with_cmd(&outfp2, cmd2); nng_msleep(50); // wait a while after sub // TODO: there is a potential connection refuse case & although they @@ -635,10 +639,9 @@ main() assert(test_get_bridges()); assert(test_get_bridge()); - // TODO: rest api need change for topic reflection in bridge. - assert(test_put_bridges_sub()); // this is not 100% right due to the new topic reflection - // assert(test_put_bridges_unsub()); // bridge unsub has to change for topic reflection - assert(test_put_bridges()); + assert(test_put_bridges_sub()); + assert(test_put_bridges_unsub()); // the usage of unsub rest api may need further discussion. + // assert(test_put_bridges()); TODO: there is a potential bug here with will_flag. assert(test_post_rules()); assert(test_get_rules()); @@ -664,8 +667,8 @@ main() // // tested now. // assert(test_restart()); // assert(test_stop()); - pclose(fd); - pclose(fd2); + kill(pid_sub, SIGKILL); + kill(pid_sub2, SIGKILL); nng_thread_destroy(nmq); } \ No newline at end of file diff --git a/nanomq/tests/nanomq_test.conf b/nanomq/tests/nanomq_test.conf index 142af3e98..4fa5f9cf1 100644 --- a/nanomq/tests/nanomq_test.conf +++ b/nanomq/tests/nanomq_test.conf @@ -245,6 +245,44 @@ bridges.mqtt.emqx { qos = 2 } ] + conn_properties = { + maximum_packet_size = 1024 + + receive_maximum = 65535 + + topic_alias_maximum = 0 + + request_problem_infomation = 1 + + request_response_infomation = 0 + + session_expiry_interval = 0 + + user_property = { + key1 = value1 + key2 = value2 + } + } + + will { + topic = "will_topic" + qos = 1 + retain = false + payload = "will_message" + + properties = { + payload_format_indicator = 0 + message_expiry_interval = 0 + content_type = "" + response_topic = "" + correlation_data = "" + will_delay_interval = 0 + user_property = { + key1 = value1 + key2 = value2 + } + } + } max_parallel_processes = 2 max_send_queue_len = 1024 max_recv_queue_len = 1024