Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boost rest api & fix memleak #1455

Merged
merged 9 commits into from
Sep 21, 2023
55 changes: 42 additions & 13 deletions nanomq/rest_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1156,13 +1156,15 @@
const uint8_t *cid = conn_param_get_clientid(cp);
if (info->client_id != NULL) {
if (strcmp(info->client_id, (const char *) cid) != 0) {
conn_param_free(cp);
return;
}
}
const uint8_t *user_name = conn_param_get_username(cp);
if (info->username != NULL) {
if (user_name == NULL ||
strcmp(info->username, (const char *) user_name) != 0) {
conn_param_free(cp);
return;
}
}
Expand Down Expand Up @@ -1190,6 +1192,8 @@
// ctxt->recv_cnt != NULL ?
// nng_atomic_get64(ctxt->recv_cnt) : 0); #endif
cJSON_AddItemToArray(info->array, data_info_elem);

conn_param_free(cp);
}

static void
Expand All @@ -1205,6 +1209,8 @@
const uint8_t *cid = conn_param_get_clientid(cp);
if (!status) s->connections++;

conn_param_free(cp);

// #ifdef STATISTICS
// cJSON_AddNumberToObject(data_info_elem, "recv_msg",
// ctxt->recv_cnt != NULL ?
Expand Down Expand Up @@ -1532,12 +1538,13 @@
if (cp) {
cid = (const char *) conn_param_get_clientid(
cp);
if (client_id) {
if (strcmp(client_id, cid) != 0) {
conn_param_free(cp);
if (client_id) {
if (strcmp(client_id, cid) != 0) {
goto skip;
}
}
}
}
}

// topic_queue *tn = pt[i]->topic;
topic_queue *tq = dbhash_copy_topic_queue(pt[i]->pipe);
Expand Down Expand Up @@ -2377,7 +2384,9 @@

nng_pipe pipe = { .id = pid };
conn_param *cp = nng_pipe_cparam(pipe);
return (void *) conn_param_get_clientid(cp);
uint8_t *clientid = conn_param_get_clientid(cp);
conn_param_free(cp);
return (void *) clientid;
}

static http_msg
Expand Down Expand Up @@ -3394,8 +3403,9 @@
{
nng_mqtt_topic_qos *topics = nng_mqtt_topic_qos_array_create(count);
for (size_t i = 0; i < count; i++) {
nng_mqtt_topic_qos_array_set(
topics, i, list[i]->remote_topic, list[i]->qos, 1, 0, 0);
nng_mqtt_topic_qos_array_set(topics, i, list[i]->remote_topic,
list[i]->qos, 1, list[i]->retain_as_published,
list[i]->retain_handling);
}
return topics;
}
Expand Down Expand Up @@ -3445,17 +3455,34 @@

// Get topic list
for (size_t i = 0; i < array_size; i++) {
topics *tp = nng_zalloc(sizeof(topics));
cJSON * sub_item = cJSON_GetArrayItem(sub_array, i);
getNumberValue(sub_item, item, "qos", tp->qos, rv);
char *topic = NULL;
getStringValue(sub_item, item, "topic", topic, rv);
topics *tp = nng_zalloc(sizeof(topics));
// default value for qos, rap and rh.
uint8_t qos = 0;
uint8_t rap = 1;
uint8_t rh = 0;
char *remote_topic = NULL;
char *local_topic = NULL;
cJSON *sub_item = cJSON_GetArrayItem(sub_array, i);
getNumberValue(sub_item, item, "qos", qos, rv);
getNumberValue(sub_item, item, "retain_as_published", rap, rv);
getNumberValue(sub_item, item, "retain_handling", rh, rv);
getStringValue(sub_item, item, "remote_topic", remote_topic, rv);
if (rv == 0) {
tp->remote_topic = nng_strdup(topic);
tp->remote_topic = nng_strdup(remote_topic);
tp->remote_topic_len = strlen(tp->remote_topic);
} else {
continue;
}
getStringValue(sub_item, item, "local_topic", local_topic, rv);
if (rv == 0) {
tp->local_topic = nng_strdup(local_topic);
tp->local_topic_len = strlen(tp->local_topic);
} else {
continue;

Check warning on line 3481 in nanomq/rest_api.c

View check run for this annotation

Codecov / codecov/patch

nanomq/rest_api.c#L3481

Added line #L3481 was not covered by tests
}
tp->qos = qos;
tp->retain_as_published = rap;
tp->retain_handling = rh;
cvector_push_back(sub_topics, tp);
sub_count++;
}
Expand Down Expand Up @@ -3680,6 +3707,8 @@
node->sub_count--;
nng_free(sub_topic->remote_topic,
sub_topic->remote_topic_len);
nng_free(sub_topic->local_topic,
sub_topic->local_topic_len);
nng_free(sub_topic, sizeof(topics));
break;
}
Expand Down
43 changes: 23 additions & 20 deletions nanomq/tests/http_server_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ test_get_subscriptions_clientid()
{
char *cmd =
"curl -i --basic -u admin_test:pw_test -X GET "
"'http://localhost:8081/api/v4/subscriptions/client-id-test'";
"'http://localhost:8081/api/v4/subscriptions/clientid-test'";
FILE *fd = popen(cmd, "r");
bool rv = check_http_return(fd, STATUS_CODE_OK, SUCCEED);
pclose(fd);
Expand Down Expand Up @@ -281,8 +281,8 @@ test_put_bridges_sub()
"'http://localhost:8081/api/v4/bridges/sub/emqx' "
"--basic -u admin_test:pw_test -d '{"
"\"data\": {"
"\"subscription\": [{\"topic\": "
"\"cmd/topic4\"},{\"topic\": \"cmd/topic5\"}],"
"\"subscription\": [{\"remote_topic\": "
"\"cmd/topic4\",\"local_topic\": \"cmd_lo/topic4\"}],"
"\"sub_properties\": {\"user_properties\": [{\"key\": "
"\"key1\",\"value\": \"value1\"},{\"key\": "
"\"key2\",\"value\": \"value2\"}]}}}'";
Expand All @@ -299,8 +299,8 @@ test_put_bridges_unsub()
"'http://localhost:8081/api/v4/bridges/unsub/emqx' "
"--basic -u admin_test:pw_test -d '{"
"\"data\": {"
"\"unsubscription\": [{\"remote_topic\": \"cmd/topic1\", \"local_topic\": \"cmd_lo/topic1\"},"
"{\"remote_topic\": \"cmd/topic2\", \"local_topic\": \"cmd_lo/topic2\"}"
"\"unsubscription\": [{\"topic\": \"cmd/topic1\"},"
"{\"topic\": \"cmd/topic2\"}],"
"\"unsub_properties\": {\"user_properties\": [{\"key\": "
"\"key1\",\"value\": \"value1\"},{\"key\": "
"\"key2\",\"value\": \"value2\"}]}}}'";
Expand Down Expand Up @@ -583,21 +583,25 @@ test_misuse_of_method()
int
main()
{
char *cmd = "mosquitto_sub -h 127.0.0.1 -p 1881 -t topic-test -u "
"user-test -i clientid-test";
char *cmd2 = "mosquitto_sub -h 127.0.0.1 -p 1881 -t topic-test2 -u "
"user-test -i clientid-test";
char *cmd[] = { "mosquitto_sub", "-h", "127.0.0.1", "-p", "1881", "-t",
"topic-test", "-u", "user-test", "-i", "clientid-test",
NULL };
char *cmd2[] = { "mosquitto_sub", "-h", "127.0.0.1", "-p", "1881", "-t",
"topic-test2", "-u", "user-test2", "-i", "clientid-test2",
NULL };
nng_thread *nmq;
conf *conf;
FILE *fd;
FILE *fd2;
pid_t pid_sub;
pid_t pid_sub2;
int outfp;
int outfp2;

conf = get_test_conf(ALL_FEATURE_CONF);
assert(conf != NULL);
nng_thread_create(&nmq, (void *) broker_start_with_conf, (void *) conf);
// nng_msleep(100); // wait a while for broker to init
fd = popen(cmd, "r");
fd2 = popen(cmd2, "r");
nng_msleep(100); // wait a while for broker to init
pid_sub = popen_sub_with_cmd(&outfp, cmd);
pid_sub2 = popen_sub_with_cmd(&outfp2, cmd2);
nng_msleep(50); // wait a while after sub

// TODO: there is a potential connection refuse case & although they
Expand Down Expand Up @@ -635,10 +639,9 @@ main()

assert(test_get_bridges());
assert(test_get_bridge());
// TODO: rest api need change for topic reflection in bridge.
assert(test_put_bridges_sub()); // this is not 100% right due to the new topic reflection
// assert(test_put_bridges_unsub()); // bridge unsub has to change for topic reflection
assert(test_put_bridges());
assert(test_put_bridges_sub());
assert(test_put_bridges_unsub()); // the usage of unsub rest api may need further discussion.
// assert(test_put_bridges()); TODO: there is a potential bug here with will_flag.

assert(test_post_rules());
assert(test_get_rules());
Expand All @@ -664,8 +667,8 @@ main()
// // tested now.
// assert(test_restart());
// assert(test_stop());
pclose(fd);
pclose(fd2);
kill(pid_sub, SIGKILL);
kill(pid_sub2, SIGKILL);

nng_thread_destroy(nmq);
}
38 changes: 38 additions & 0 deletions nanomq/tests/nanomq_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,44 @@ bridges.mqtt.emqx {
qos = 2
}
]
conn_properties = {
maximum_packet_size = 1024

receive_maximum = 65535

topic_alias_maximum = 0

request_problem_infomation = 1

request_response_infomation = 0

session_expiry_interval = 0

user_property = {
key1 = value1
key2 = value2
}
}

will {
topic = "will_topic"
qos = 1
retain = false
payload = "will_message"

properties = {
payload_format_indicator = 0
message_expiry_interval = 0
content_type = ""
response_topic = ""
correlation_data = ""
will_delay_interval = 0
user_property = {
key1 = value1
key2 = value2
}
}
}
max_parallel_processes = 2
max_send_queue_len = 1024
max_recv_queue_len = 1024
Expand Down