From b24941e6c65a1b5d0eaae1d9df57c0a7d90a25c3 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 4 Sep 2023 05:15:52 -0700 Subject: [PATCH] [FIXED] MQTT: more generic names for outgoing stream, etc. --- server/mqtt.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 527436aea9..8cb8126823 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -124,11 +124,12 @@ const ( mqttQoS2IncomingMsgsStreamName = "$MQTT_qos2in" mqttQoS2IncomingMsgsStreamSubjectPrefix = "$MQTT.qos2.in." - // Stream name and subjects for outgoing MQTT QoS2 PUBREL messages - mqttQoS2PubRelStreamName = "$MQTT_qos2out" - mqttQoS2PubRelConsumerDurablePrefix = "$MQTT_PUBREL_" - mqttQoS2PubRelStoredSubjectPrefix = "$MQTT.qos2.out." - mqttQoS2PubRelDeliverySubjectPrefix = "$MQTT.qos2.delivery." + // Stream name and subjects for outgoing MQTT QoS (PUBREL) messages + mqttOutStreamName = "$MQTT_out" + mqttOutSubjectPrefix = "$MQTT.out." + mqttPubRelSubjectPrefix = "$MQTT.out.pubrel." + mqttPubRelDeliverySubjectPrefix = "$MQTT.deliver.pubrel." + mqttPubRelConsumerDurablePrefix = "$MQTT_PUBREL_" // As per spec, MQTT server may not redeliver QoS 1 and 2 messages to // clients, except after client reconnects. However, NATS Server will @@ -1274,15 +1275,15 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc } } - if si, err := lookupStream(mqttQoS2PubRelStreamName, "QoS2 outgoing PUBREL"); err != nil { + if si, err := lookupStream(mqttOutStreamName, "QoS2 outgoing PUBREL"); err != nil { return nil, err } else if si == nil { // Create the stream for the incoming QoS2 messages that have not been // PUBREL-ed by the sender. NATS messages are submitted as // "$MQTT.pubrel." cfg := &StreamConfig{ - Name: mqttQoS2PubRelStreamName, - Subjects: []string{mqttQoS2PubRelStoredSubjectPrefix + ">"}, + Name: mqttOutStreamName, + Subjects: []string{mqttOutSubjectPrefix + ">"}, Storage: FileStorage, Retention: InterestPolicy, Replicas: replicas, @@ -2577,9 +2578,9 @@ func mqttSessionCreate(jsa *mqttJSA, id, idHash string, seq uint64, opts *Option idHash: idHash, seq: seq, maxp: maxp, - pubRelSubject: mqttQoS2PubRelStoredSubjectPrefix + idHash, - pubRelDeliverySubject: mqttQoS2PubRelDeliverySubjectPrefix + idHash, - pubRelDeliverySubjectB: []byte(mqttQoS2PubRelDeliverySubjectPrefix + idHash), + pubRelSubject: mqttPubRelSubjectPrefix + idHash, + pubRelDeliverySubject: mqttPubRelDeliverySubjectPrefix + idHash, + pubRelDeliverySubjectB: []byte(mqttPubRelDeliverySubjectPrefix + idHash), } } @@ -2664,7 +2665,7 @@ func (sess *mqttSession) clear() error { } } if pubRelDur != "" { - _, err := sess.jsa.deleteConsumer(mqttQoS2PubRelStreamName, pubRelDur) + _, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur) if isErrorOtherThan(err, JSConsumerNotFoundErr) { return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err) } @@ -4463,10 +4464,10 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error { } ccr := &CreateConsumerRequest{ - Stream: mqttQoS2PubRelStreamName, + Stream: mqttOutStreamName, Config: ConsumerConfig{ DeliverSubject: sess.pubRelDeliverySubject, - Durable: mqttQoS2PubRelConsumerDurablePrefix + sess.idHash, + Durable: mqttPubRelConsumerDurablePrefix + sess.idHash, AckPolicy: AckExplicit, DeliverPolicy: DeliverNew, FilterSubject: sess.pubRelSubject,