diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index 762f0e12e..cff1cbb16 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -379,11 +379,14 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) if (m_aio && nni_mqtt_msg_get_packet_type(tmsg) != NNG_MQTT_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); + } else { + log_error("NULL Aio found in qos msg %ld", packet_id); } nni_msg_free(tmsg); nni_id_remove(&p->sent_unack, packet_id); } nni_msg_clone(msg); + nni_msg_set_timestamp(msg, nni_clock()); if (0 != nni_id_set(&p->sent_unack, packet_id, msg)) { nni_println("Warning! Cache QoS msg failed"); nni_msg_free(msg); @@ -393,14 +396,32 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) default: return NNG_EPROTO; } - if (s->qos_first) + + if (s->qos_first) { + if (ptype == NNG_MQTT_SUBSCRIBE || + ptype == NNG_MQTT_UNSUBSCRIBE) { + nni_mqtt_msg_encode(msg); + log_info("Sending Sub/UnSub msg in high priority"); + quic_aio_send(p->qpipe, aio); + return -1; + } if (qos > 0 && ptype == NNG_MQTT_PUBLISH) { nni_mqtt_msg_encode(msg); + uint32_t topic_len; + uint32_t plen; + char *topic; + uint8_t *payload; + topic = + nni_mqtt_msg_get_publish_topic(msg, &topic_len); + payload = nni_mqtt_msg_get_publish_payload(msg, &plen); + log_info("Pub high priority QoS %d msg to %.*s in " + "parallel %ld payload %.*s", + qos, topic_len, topic, nni_clock(), plen, payload); nni_aio_set_msg(aio, msg); quic_aio_send(p->qpipe, aio); - log_debug("sending highpriority QoS msg in parallel"); return -1; } + } if (!p->busy) { nni_aio_set_msg(&p->send_aio, msg); p->busy = true; @@ -1042,18 +1063,25 @@ mqtt_quic_recv_cb(void *arg) // we have received a PUBLISH qos = nni_mqtt_msg_get_publish_qos(msg); nng_msg_set_cmd_type(msg, CMD_PUBLISH); + uint32_t topic_len; + char *topic; + uint32_t plen; + uint8_t *payload; + topic = nni_mqtt_msg_get_publish_topic(msg, &topic_len); + payload = nni_mqtt_msg_get_publish_payload(msg, &plen); + log_debug("Recv QoS %d Msg from %.*s time %ld " + "payload %.*s", + qos, topic_len, topic, nni_clock(), plen, payload); if (2 > qos) { if (qos == 1) { // QoS 1 return PUBACK nni_mqtt_msg_alloc(&ack, 0); - /* - uint8_t *payload; - uint32_t payload_len; - payload = nng_mqtt_msg_get_publish_payload(msg, &payload_len); - */ - packet_id = nni_mqtt_msg_get_publish_packet_id(msg); - nni_mqtt_msg_set_packet_type(ack, NNG_MQTT_PUBACK); - nni_mqtt_msg_set_puback_packet_id(ack, packet_id); + packet_id = + nni_mqtt_msg_get_publish_packet_id(msg); + nni_mqtt_msg_set_packet_type( + ack, NNG_MQTT_PUBACK); + nni_mqtt_msg_set_puback_packet_id( + ack, packet_id); nni_mqtt_msg_encode(ack); // ignore result of this send ? mqtt_send_msg(NULL, ack, s); @@ -1899,7 +1927,7 @@ mqtt_quic_ctx_recv(void *arg, nni_aio *aio) } else { nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, NULL); - nni_println("ERROR! former aio not finished!"); + log_error("ERROR! former aio not finished!"); nni_aio_finish_error(aio, NNG_EBUSY); } return; diff --git a/src/mqtt/protocol/mqtt/mqttv5_quic.c b/src/mqtt/protocol/mqtt/mqttv5_quic.c index 12ea88fbb..bc36a3c87 100644 --- a/src/mqtt/protocol/mqtt/mqttv5_quic.c +++ b/src/mqtt/protocol/mqtt/mqttv5_quic.c @@ -388,6 +388,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) nni_id_remove(&p->sent_unack, packet_id); } nni_msg_clone(msg); + nni_msg_set_timestamp(msg, nni_clock()); if (0 != nni_id_set(&p->sent_unack, packet_id, msg)) { nni_println("Warning! Cache QoS msg failed"); nni_msg_free(msg); @@ -397,14 +398,31 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) default: return NNG_EPROTO; } - if (s->qos_first) + if (s->qos_first) { + if (ptype == NNG_MQTT_SUBSCRIBE || + ptype == NNG_MQTT_UNSUBSCRIBE) { + nni_mqttv5_msg_encode(msg); + log_info("Sending Sub/UnSub msg in high priority"); + quic_aio_send(p->qpipe, aio); + return -1; + } if (qos > 0 && ptype == NNG_MQTT_PUBLISH) { nni_mqttv5_msg_encode(msg); + uint32_t topic_len; + uint32_t plen; + char *topic; + uint8_t *payload; + topic = + nni_mqtt_msg_get_publish_topic(msg, &topic_len); + payload = nni_mqtt_msg_get_publish_payload(msg, &plen); + log_info("Pub high priority QoS %d msg to %.*s in " + "parallel %ld payload %.*s", + qos, topic_len, topic, nni_clock(), plen, payload); nni_aio_set_msg(aio, msg); quic_aio_send(p->qpipe, aio); - log_debug("sending highpriority QoS msg in parallel"); return -1; } + } if (!p->busy) { nni_aio_set_msg(&p->send_aio, msg); p->busy = true; @@ -1044,15 +1062,19 @@ mqtt_quic_recv_cb(void *arg) // we have received a PUBLISH qos = nni_mqtt_msg_get_publish_qos(msg); nng_msg_set_cmd_type(msg, CMD_PUBLISH); + uint32_t topic_len; + char *topic; + uint32_t plen; + uint8_t *payload; + topic = nni_mqtt_msg_get_publish_topic(msg, &topic_len); + payload = nni_mqtt_msg_get_publish_payload(msg, &plen); + log_debug("Recv QoS %d Msg from %.*s time %ld " + "payload %.*s", + qos, topic_len, topic, nni_clock(), plen, payload); if (2 > qos) { if (qos == 1) { // QoS 1 return PUBACK nni_mqtt_msg_alloc(&ack, 0); - /* - uint8_t *payload; - uint32_t payload_len; - payload = nng_mqtt_msg_get_publish_payload(msg, &payload_len); - */ packet_id = nni_mqtt_msg_get_publish_packet_id(msg); nni_mqtt_msg_set_packet_type(ack, NNG_MQTT_PUBACK); nni_mqtt_msg_set_puback_packet_id(ack, packet_id);