Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http acl support #1724

Merged
merged 7 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions nanomq/pub_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "include/nanomq.h"
#include "nng/nng.h"
#include "nng/mqtt/packet.h"
#include "nng/supplemental/nanolib/hash_table.h"
#include "nng/supplemental/nanolib/mqtt_db.h"
#include "nng/supplemental/nanolib/cJSON.h"
#include "include/nanomq_rule.h"
Expand Down Expand Up @@ -1103,6 +1104,22 @@ handle_pub(nano_work *work, struct pipe_content *pipe_ct, uint8_t proto,
topic = work->pub_packet->var_header.publish.topic_name.body;
uint32_t len = work->pub_packet->var_header.publish.topic_name.len;

if (work->config != NULL && work->config->auth_http.enable) {
struct topic_queue *tq = topic_queue_init(topic, len);
if (tq == NULL) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only one topic exsits for pub msg

log_error("topic_queue_init failed!");
} else {
int rv = nmq_auth_http_sub_pub(work->cparam, false, tq, &work->config->auth_http);
if (rv != 0) {
log_error("Auth failed! publish packet!");
topic_queue_release(tq);
return NOT_AUTHORIZED;
}
}

topic_queue_release(tq);
}

// deal with topic alias
if (proto == MQTT_PROTOCOL_VERSION_v5) {
property_data *pdata = property_get_value(
Expand Down
183 changes: 117 additions & 66 deletions nanomq/sub_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,39 @@
int
decode_sub_msg(nano_work *work)
{
uint8_t *variable_ptr, *payload_ptr;
int vpos = 0; // pos in variable
int bpos = 0; // pos in payload
size_t len_of_varint = 0, len_of_property = 0, len_of_properties = 0;
int len_of_str = 0, len_of_topic = 0;
uint8_t property_id;
size_t bpos = 0; // pos in msg_body
size_t ppos = 0; // pos in payload
uint8_t *payload_ptr = NULL;

topic_node * tn, *_tn;
topic_node *tn = NULL;
topic_node *newtn = NULL;

nng_msg * msg = work->msg;
size_t remaining_len = nng_msg_remaining_len(msg);
const uint8_t proto_ver = work->proto_ver;
size_t remaining_len = 0;
packet_subscribe *sub_pkt = NULL;

// handle variable header
variable_ptr = nng_msg_body(msg);
if (work->msg == NULL || work->sub_pkt == NULL) {
return PROTOCOL_ERROR;
}

remaining_len = nng_msg_remaining_len(work->msg);

packet_subscribe *sub_pkt = work->sub_pkt;
sub_pkt = work->sub_pkt;
sub_pkt->node = NULL;
NNI_GET16(variable_ptr + vpos, sub_pkt->packet_id);
if (sub_pkt->packet_id == 0)
sub_pkt->prop_len = 0;
sub_pkt->properties = NULL;
NNI_GET16((uint8_t *)(nng_msg_body(work->msg)), sub_pkt->packet_id);
if (sub_pkt->packet_id == 0) {
return PROTOCOL_ERROR; // packetid should be non-zero
}
// TODO packetid should be checked if it's unused
vpos += 2;
bpos += 2;

sub_pkt->properties = NULL;
sub_pkt->prop_len = 0;
// Only Mqtt_v5 include property.
if (MQTT_PROTOCOL_VERSION_v5 == proto_ver) {
sub_pkt->properties =
decode_properties(msg, (uint32_t *)&vpos, &sub_pkt->prop_len, true);
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
sub_pkt->properties = decode_properties(work->msg,
(uint32_t *)&bpos,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird format??

&sub_pkt->prop_len,
true);
if (check_properties(sub_pkt->properties) != SUCCESS) {
return PROTOCOL_ERROR;
}
Expand All @@ -65,61 +68,80 @@ decode_sub_msg(nano_work *work)
log_debug("remainLen: [%ld] packetid : [%d]", remaining_len,
sub_pkt->packet_id);
// handle payload
payload_ptr = nng_msg_payload_ptr(msg);
payload_ptr = nng_msg_payload_ptr(work->msg);
if (payload_ptr == NULL) {
log_error("payload_ptr is NULL");
return PROTOCOL_ERROR;
}

if ((tn = nng_zalloc(sizeof(topic_node))) == NULL) {
tn = nng_zalloc(sizeof(topic_node));
if (tn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = NULL;
sub_pkt->node = tn;
sub_pkt->node = tn;

while (1) {
_tn = tn;

tn->reason_code = GRANTED_QOS_2; // default
tn->next = NULL;
tn->topic.len = 0;
tn->reason_code = GRANTED_QOS_2; // default

// TODO Decoding topic has potential buffer overflow
tn->topic.body = (char *) copyn_utf8_str(payload_ptr,
(uint32_t *) &bpos, &len_of_topic, remaining_len);
tn->topic.len = len_of_topic;
log_info("topic: [%s] len: [%d] pid [%d]", tn->topic.body, len_of_topic, sub_pkt->packet_id);
len_of_topic = 0;
tn->topic.body = (char *)copyn_utf8_str(payload_ptr,
(uint32_t *)&ppos, &tn->topic.len, remaining_len);
if (tn->topic.body == NULL) {
log_error("tn->topic.body is NULL");
} else {
log_info("topic: [%s] len: [%d] pid [%d]",
tn->topic.body, tn->topic.len, sub_pkt->packet_id);
}

if (tn->topic.len < 1 || tn->topic.body == NULL) {
log_error("NOT utf8-encoded string OR null string.");
tn->reason_code = UNSPECIFIED_ERROR;
if (MQTT_PROTOCOL_VERSION_v5 == proto_ver)
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
tn->reason_code = TOPIC_FILTER_INVALID;
bpos += 1; // ignore option
goto next;
}
ppos += 1; // ignore option
if (ppos < remaining_len - bpos) {
newtn = nng_zalloc(sizeof(topic_node));
if (newtn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = newtn;
tn = newtn;
continue;
} else {
break;
}
}

tn->rap = 1; // Default Setting
memcpy(tn, payload_ptr + bpos, 1);
memcpy(tn, payload_ptr + ppos, 1);
if (tn->retain_handling > 2) {
log_error("error in retain_handling");
tn->reason_code = UNSPECIFIED_ERROR;
return PROTOCOL_ERROR;
}
bpos ++;
ppos++;

// Setting no_local on shared subscription is invalid
if (MQTT_VERSION_V5 == proto_ver &&
if (work->proto_ver == MQTT_PROTOCOL_VERSION_v5 &&
strncmp(tn->topic.body, "$share/", strlen("$share/")) == 0 &&
tn->no_local == 1) {
tn->reason_code = UNSPECIFIED_ERROR;
return PROTOCOL_ERROR;
}

next:
if (bpos < (int) (remaining_len - vpos)) {
if (NULL == (tn = nng_zalloc(sizeof(topic_node)))) {
if (ppos < remaining_len - bpos) {
newtn = nng_zalloc(sizeof(topic_node));
if (newtn == NULL) {
log_error("nng_zalloc");
return NNG_ENOMEM;
}
tn->next = NULL;
_tn->next = tn;
tn->next = newtn;
tn = newtn;
} else {
break;
}
Expand Down Expand Up @@ -185,7 +207,6 @@ encode_suback_msg(nng_msg *msg, nano_work *work)
return PROTOCOL_ERROR;
}
tn = tn->next;
log_debug("reason_code: [%x]", reason_code);
}

// If NOT find any reason codes
Expand Down Expand Up @@ -226,23 +247,57 @@ encode_suback_msg(nng_msg *msg, nano_work *work)
int
sub_ctx_handle(nano_work *work)
{
int topic_len = 0;
int topic_exist = 0;
char *topic_str = NULL;
bool auth_http_reject = false;
topic_node *tn = NULL;

if (!work->sub_pkt || !work->sub_pkt->node) {
return -1;
}
topic_node *tn = work->sub_pkt->node;

char *topic_str = NULL;
int topic_len = 0, topic_exist = 0;

if (work->sub_pkt->packet_id == 0) {
return -2;
}

tn = work->sub_pkt->node;
if (work->config->auth_http.enable) {
topic_queue *tq = NULL;
tn = work->sub_pkt->node;
tq = init_topic_queue_with_topic_node(tn);
if (tq == NULL) {
log_error("topic_queue is NULL");
} else {
int rv = nmq_auth_http_sub_pub(work->cparam, true, tq, &work->config->auth_http);
if (rv != 0) {
log_error("Auth failed! subscribe packet!");
/*
* Currently, we support bulk upload of topics,
* but there is only one return code, so we don't
* know which topic failed to authenticate, and
* the topics uploaded together should be set to NMQ_AUTH_SUB_ERROR
*/
auth_http_reject = true;
tn = work->sub_pkt->node;
while (tn != NULL) {
tn->reason_code = NMQ_AUTH_SUB_ERROR;
log_warn("topic: [%s] HTTP AUTH fail, set SUBACK reason_code: [%d]", tn->topic.body, tn->reason_code);
tn = tn->next;
}
} else {
log_info("Auth success! subscribe packet!");
}
topic_queue_release(tq);
}
}

#ifdef STATISTICS
// TODO
#endif
nng_msg **retain = work->msg_ret;
while (tn) {
tn = work->sub_pkt->node;
while (tn != NULL && auth_http_reject == false) {
topic_len = tn->topic.len;
topic_str = tn->topic.body;
log_debug("topicLen: [%d] body: [%s]", topic_len, topic_str);
Expand Down Expand Up @@ -292,17 +347,11 @@ sub_ctx_handle(nano_work *work)
#if defined(NNG_SUPP_SQLITE)
if (work->config->sqlite.enable && work->sqlite_db != NULL) {
if (rh == 0 || (rh == 1 && !topic_exist)) {
nng_msg **msg_vec =
nng_mqtt_qos_db_find_retain(
work->sqlite_db, topic_str);

nng_msg **msg_vec = nng_mqtt_qos_db_find_retain(work->sqlite_db, topic_str);
if (msg_vec != NULL) {
for (size_t i = 0;
i < cvector_size(msg_vec); i++) {
for (size_t i = 0; i < cvector_size(msg_vec); i++) {
if (msg_vec[i] != NULL) {
cvector_push_back(
work->msg_ret,
msg_vec[i]);
cvector_push_back(work->msg_ret, msg_vec[i]);
}
}
cvector_free(msg_vec);
Expand All @@ -311,25 +360,27 @@ sub_ctx_handle(nano_work *work)
goto next;
}
#endif
if (rh == 0 || (rh == 1 && !topic_exist))
if (rh == 0 || (rh == 1 && !topic_exist)) {
retain = dbtree_find_retain(work->db_ret, topic_str);
}
work->msg_ret = (work->msg_ret == NULL) ? retain : work->msg_ret;

for (size_t i = 0; retain != NULL &&
i < cvector_size(retain) && work->msg_ret != retain;
i++) {
if (!retain[i])
i < cvector_size(retain) &&
work->msg_ret != retain;
i++) {
if (!retain[i]) {
continue;
}
cvector_push_back(work->msg_ret, retain[i]);
}
if (retain != work->msg_ret) {
cvector_free(retain);
retain = NULL;
}


if (!work->msg_ret)
if (!work->msg_ret) {
goto next;
}

next:
tn = tn->next;
Expand Down
2 changes: 1 addition & 1 deletion nng
Submodule nng updated 41 files
+50 −0 include/nng/iceoryx_shm/iceoryx_shm.h
+4 −0 include/nng/nng.h
+4 −0 include/nng/protocol/mqtt/mqtt_parser.h
+2 −2 include/nng/supplemental/nanolib/conf.h
+7 −0 include/nng/supplemental/nanolib/hash_table.h
+6 −3 src/core/platform.h
+0 −11 src/core/taskq.c
+1 −0 src/mqtt/protocol/CMakeLists.txt
+1 −0 src/mqtt/protocol/exchange/exchange_server.c
+18 −0 src/mqtt/protocol/iceoryx_shm/CMakeLists.txt
+623 −0 src/mqtt/protocol/iceoryx_shm/iceoryx_shm.c
+0 −1 src/mqtt/protocol/mqtt/CMakeLists.txt
+146 −37 src/mqtt/protocol/mqtt/mqtt_client.c
+10 −1 src/mqtt/transport/tcp/mqtt_tcp.c
+13 −3 src/mqtt/transport/tls/mqtt_tls.c
+14 −0 src/nng.c
+1 −0 src/platform/posix/CMakeLists.txt
+50 −17 src/platform/posix/posix_clock.c
+2 −1 src/platform/posix/posix_rand_getrandom.c
+16 −1 src/platform/windows/win_clock.c
+2 −1 src/sp/protocol/mqtt/CMakeLists.txt
+84 −92 src/sp/protocol/mqtt/auth_http.c
+96 −0 src/sp/protocol/mqtt/auth_http_test.c
+131 −6 src/sp/protocol/mqtt/mqtt_parser.c
+2 −1 src/sp/protocol/mqtt/nmq_mqtt.c
+7 −6 src/sp/transport/mqtt/broker_tcp.c
+19 −15 src/sp/transport/mqttws/nmq_websocket.c
+1 −0 src/supplemental/CMakeLists.txt
+12 −0 src/supplemental/iceoryx/CMakeLists.txt
+269 −0 src/supplemental/iceoryx/iceoryx_api.c
+51 −0 src/supplemental/iceoryx/iceoryx_api.h
+169 −0 src/supplemental/iceoryx/iceoryx_api_test.c
+1 −0 src/supplemental/mqtt/mqtt_public.c
+3 −3 src/supplemental/nanolib/conf.c
+1 −0 src/supplemental/nanolib/conf_ver2.c
+82 −0 src/supplemental/nanolib/hash_table.c
+32 −0 src/supplemental/nanolib/hocon.c
+1 −2 src/supplemental/nanolib/mqtt_db.c
+13 −7 src/supplemental/nanolib/parser.c
+7 −0 src/supplemental/tls/tls_common.c
+1 −1 src/testing/nuts.h
Loading