Skip to content

Commit

Permalink
* MDF [sub_handler] Move acl_http check and optimize the sub_handler …
Browse files Browse the repository at this point in the history
…logic

Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
  • Loading branch information
RanMaoyi committed Apr 14, 2024
1 parent 28fdd8e commit ab8a8d0
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 91 deletions.
24 changes: 0 additions & 24 deletions nanomq/apps/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,30 +354,6 @@ server_cb(void *arg)
smsg = work->msg;
work->msg_ret = NULL;

if (work->config->auth_http.enable) {
topic_queue *tq = NULL;
rv = nmq_subtopic_decode(smsg, work->proto_ver, &tq);
if (rv <= 0 || tq == NULL) {
log_error("Sub topic decode failed!");
work->code = rv;
work->state = CLOSE;
nng_aio_finish(work->aio, 0);
break;
}

rv = nmq_auth_http_sub_pub(work->cparam, true, tq, &work->config->auth_http);
if (rv != 0) {
log_error("Auth failed! subscribe packet!");
topic_queue_release(tq);
conn_param_free(work->cparam);
work->code = rv;
work->state = CLOSE;
nng_aio_finish(work->aio, 0);
break;
}
topic_queue_release(tq);
}

if ((work->sub_pkt = nng_alloc(
sizeof(packet_subscribe))) == NULL)
log_error("nng_alloc");
Expand Down
186 changes: 119 additions & 67 deletions nanomq/sub_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,99 +27,122 @@
int
decode_sub_msg(nano_work *work)
{
uint8_t *variable_ptr, *payload_ptr;
int vpos = 0; // pos in variable
int bpos = 0; // pos in payload
size_t len_of_varint = 0, len_of_property = 0, len_of_properties = 0;
int len_of_str = 0, len_of_topic = 0;
uint8_t property_id;
size_t bpos = 0; // pos in msg_body
size_t ppos = 0; // pos in payload
uint8_t *payload_ptr = NULL;

topic_node * tn, *_tn;
topic_node *tn = NULL;
topic_node *newtn = NULL;

nng_msg * msg = work->msg;
size_t remaining_len = nng_msg_remaining_len(msg);
const uint8_t proto_ver = work->proto_ver;
size_t remaining_len = 0;
packet_subscribe *sub_pkt = NULL;

// handle variable header
variable_ptr = nng_msg_body(msg);
if (work->msg == NULL || work->sub_pkt == NULL) {
return PROTOCOL_ERROR;
}

remaining_len = nng_msg_remaining_len(work->msg);

packet_subscribe *sub_pkt = work->sub_pkt;
sub_pkt = work->sub_pkt;
sub_pkt->node = NULL;
NNI_GET16(variable_ptr + vpos, sub_pkt->packet_id);
if (sub_pkt->packet_id == 0)
sub_pkt->prop_len = 0;
sub_pkt->properties = NULL;
NNI_GET16((uint8_t *)(nng_msg_body(work->msg)), sub_pkt->packet_id);
if (sub_pkt->packet_id == 0) {
return PROTOCOL_ERROR; // packetid should be non-zero
}
// TODO packetid should be checked if it's unused
vpos += 2;
bpos += 2;

sub_pkt->properties = NULL;
sub_pkt->prop_len = 0;
// Only Mqtt_v5 include property.
if (MQTT_PROTOCOL_VERSION_v5 == proto_ver) {
sub_pkt->properties =
decode_properties(msg, (uint32_t *)&vpos, &sub_pkt->prop_len, true);
if (check_properties(sub_pkt->properties) != SUCCESS) {
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
sub_pkt->properties = decode_properties(work->msg,
(uint32_t *)&bpos,
&sub_pkt->prop_len,
true);
if (sub_pkt->properties == NULL ||
check_properties(sub_pkt->properties) != SUCCESS) {
return PROTOCOL_ERROR;
}
}

log_debug("remainLen: [%ld] packetid : [%d]", remaining_len,
sub_pkt->packet_id);
// handle payload
payload_ptr = nng_msg_payload_ptr(msg);
payload_ptr = nng_msg_payload_ptr(work->msg);
if (payload_ptr == NULL) {
log_error("payload_ptr is NULL");
return PROTOCOL_ERROR;
}

if ((tn = nng_zalloc(sizeof(topic_node))) == NULL) {
tn = nng_zalloc(sizeof(topic_node));
if (tn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = NULL;
sub_pkt->node = tn;
sub_pkt->node = tn;

while (1) {
_tn = tn;

tn->reason_code = GRANTED_QOS_2; // default
tn->next = NULL;
tn->topic.len = 0;
tn->reason_code = GRANTED_QOS_2; // default

// TODO Decoding topic has potential buffer overflow
tn->topic.body = (char *) copyn_utf8_str(payload_ptr,
(uint32_t *) &bpos, &len_of_topic, remaining_len);
tn->topic.len = len_of_topic;
log_info("topic: [%s] len: [%d] pid [%d]", tn->topic.body, len_of_topic, sub_pkt->packet_id);
len_of_topic = 0;
tn->topic.body = (char *)copyn_utf8_str(payload_ptr,
(uint32_t *)&ppos, &tn->topic.len, remaining_len);
if (tn->topic.body == NULL) {
log_error("tn->topic.body is NULL");
} else {
log_info("topic: [%s] len: [%d] pid [%d]",
tn->topic.body, tn->topic.len, sub_pkt->packet_id);
}

if (tn->topic.len < 1 || tn->topic.body == NULL) {
log_error("NOT utf8-encoded string OR null string.");
tn->reason_code = UNSPECIFIED_ERROR;
if (MQTT_PROTOCOL_VERSION_v5 == proto_ver)
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
tn->reason_code = TOPIC_FILTER_INVALID;
bpos += 1; // ignore option
goto next;
}
ppos += 1; // ignore option
if (ppos < remaining_len - bpos) {
newtn = nng_zalloc(sizeof(topic_node));
if (newtn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = newtn;
tn = newtn;
continue;
} else {
break;
}
}

tn->rap = 1; // Default Setting
memcpy(tn, payload_ptr + bpos, 1);
memcpy(tn, payload_ptr + ppos, 1);
if (tn->retain_handling > 2) {
log_error("error in retain_handling");
tn->reason_code = UNSPECIFIED_ERROR;
return PROTOCOL_ERROR;
}
bpos ++;
ppos++;

// Setting no_local on shared subscription is invalid
if (MQTT_VERSION_V5 == proto_ver &&
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5 &&
strncmp(tn->topic.body, "$share/", strlen("$share/")) == 0 &&
tn->no_local == 1) {
tn->reason_code = UNSPECIFIED_ERROR;
return PROTOCOL_ERROR;
}

next:
if (bpos < (int) (remaining_len - vpos)) {
if (NULL == (tn = nng_zalloc(sizeof(topic_node)))) {
if (ppos < remaining_len - bpos) {
newtn = nng_zalloc(sizeof(topic_node));
if (newtn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = NULL;
_tn->next = tn;
tn->next = newtn;
tn = newtn;
} else {
break;
}
Expand Down Expand Up @@ -185,7 +208,6 @@ encode_suback_msg(nng_msg *msg, nano_work *work)
return PROTOCOL_ERROR;
}
tn = tn->next;
log_debug("reason_code: [%x]", reason_code);
}

// If NOT find any reason codes
Expand Down Expand Up @@ -226,23 +248,57 @@ encode_suback_msg(nng_msg *msg, nano_work *work)
int
sub_ctx_handle(nano_work *work)
{
int topic_len = 0;
int topic_exist = 0;
char *topic_str = NULL;
bool auth_http_reject = false;
topic_node *tn = NULL;

if (!work->sub_pkt || !work->sub_pkt->node) {
return -1;
}
topic_node *tn = work->sub_pkt->node;

char *topic_str = NULL;
int topic_len = 0, topic_exist = 0;

if (work->sub_pkt->packet_id == 0) {
return -2;
}

tn = work->sub_pkt->node;
if (work->config->auth_http.enable) {
topic_queue *tq = NULL;
tn = work->sub_pkt->node;
tq = init_topic_queue_with_topic_node(tn);
if (tq == NULL) {
log_error("topic_queue is NULL");
} else {
int rv = nmq_auth_http_sub_pub(work->cparam, true, tq, &work->config->auth_http);
if (rv != 0) {
log_error("Auth failed! subscribe packet!");
/*
* Currently, we support bulk upload of topics,
* but there is only one return code, so we don't
* know which topic failed to authenticate, and
* the topics uploaded together should be set to NMQ_AUTH_SUB_ERROR
*/
auth_http_reject = true;
tn = work->sub_pkt->node;
while (tn != NULL) {
tn->reason_code = NMQ_AUTH_SUB_ERROR;
log_warn("topic: [%s] HTTP AUTH fail, set SUBACK reason_code: [%d]", tn->topic.body, tn->reason_code);
tn = tn->next;
}
} else {
log_info("Auth success! subscribe packet!");
}
topic_queue_release(tq);
}
}

#ifdef STATISTICS
// TODO
#endif
nng_msg **retain = work->msg_ret;
while (tn) {
tn = work->sub_pkt->node;
while (tn != NULL && auth_http_reject == false) {
topic_len = tn->topic.len;
topic_str = tn->topic.body;
log_debug("topicLen: [%d] body: [%s]", topic_len, topic_str);
Expand Down Expand Up @@ -292,17 +348,11 @@ sub_ctx_handle(nano_work *work)
#if defined(NNG_SUPP_SQLITE)
if (work->config->sqlite.enable && work->sqlite_db != NULL) {
if (rh == 0 || (rh == 1 && !topic_exist)) {
nng_msg **msg_vec =
nng_mqtt_qos_db_find_retain(
work->sqlite_db, topic_str);

nng_msg **msg_vec = nng_mqtt_qos_db_find_retain(work->sqlite_db, topic_str);
if (msg_vec != NULL) {
for (size_t i = 0;
i < cvector_size(msg_vec); i++) {
for (size_t i = 0; i < cvector_size(msg_vec); i++) {
if (msg_vec[i] != NULL) {
cvector_push_back(
work->msg_ret,
msg_vec[i]);
cvector_push_back(work->msg_ret, msg_vec[i]);
}
}
cvector_free(msg_vec);
Expand All @@ -311,25 +361,27 @@ sub_ctx_handle(nano_work *work)
goto next;
}
#endif
if (rh == 0 || (rh == 1 && !topic_exist))
if (rh == 0 || (rh == 1 && !topic_exist)) {
retain = dbtree_find_retain(work->db_ret, topic_str);
}
work->msg_ret = (work->msg_ret == NULL) ? retain : work->msg_ret;

for (size_t i = 0; retain != NULL &&
i < cvector_size(retain) && work->msg_ret != retain;
i++) {
if (!retain[i])
i < cvector_size(retain) &&
work->msg_ret != retain;
i++) {
if (!retain[i]) {
continue;
}
cvector_push_back(work->msg_ret, retain[i]);
}
if (retain != work->msg_ret) {
cvector_free(retain);
retain = NULL;
}


if (!work->msg_ret)
if (!work->msg_ret) {
goto next;
}

next:
tn = tn->next;
Expand Down

0 comments on commit ab8a8d0

Please sign in to comment.