Skip to content

Commit

Permalink
[FIXED] MQTT: more generic names for outgoing stream, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Sep 4, 2023
1 parent e11ddb8 commit b24941e
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,12 @@ const (
mqttQoS2IncomingMsgsStreamName = "$MQTT_qos2in"
mqttQoS2IncomingMsgsStreamSubjectPrefix = "$MQTT.qos2.in."

// Stream name and subjects for outgoing MQTT QoS2 PUBREL messages
mqttQoS2PubRelStreamName = "$MQTT_qos2out"
mqttQoS2PubRelConsumerDurablePrefix = "$MQTT_PUBREL_"
mqttQoS2PubRelStoredSubjectPrefix = "$MQTT.qos2.out."
mqttQoS2PubRelDeliverySubjectPrefix = "$MQTT.qos2.delivery."
// Stream name and subjects for outgoing MQTT QoS (PUBREL) messages
mqttOutStreamName = "$MQTT_out"
mqttOutSubjectPrefix = "$MQTT.out."
mqttPubRelSubjectPrefix = "$MQTT.out.pubrel."
mqttPubRelDeliverySubjectPrefix = "$MQTT.deliver.pubrel."
mqttPubRelConsumerDurablePrefix = "$MQTT_PUBREL_"

// As per spec, MQTT server may not redeliver QoS 1 and 2 messages to
// clients, except after client reconnects. However, NATS Server will
Expand Down Expand Up @@ -1274,15 +1275,15 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
}

if si, err := lookupStream(mqttQoS2PubRelStreamName, "QoS2 outgoing PUBREL"); err != nil {
if si, err := lookupStream(mqttOutStreamName, "QoS2 outgoing PUBREL"); err != nil {
return nil, err
} else if si == nil {
// Create the stream for the incoming QoS2 messages that have not been
// PUBREL-ed by the sender. NATS messages are submitted as
// "$MQTT.pubrel.<session hash>"
cfg := &StreamConfig{
Name: mqttQoS2PubRelStreamName,
Subjects: []string{mqttQoS2PubRelStoredSubjectPrefix + ">"},
Name: mqttOutStreamName,
Subjects: []string{mqttOutSubjectPrefix + ">"},
Storage: FileStorage,
Retention: InterestPolicy,
Replicas: replicas,
Expand Down Expand Up @@ -2577,9 +2578,9 @@ func mqttSessionCreate(jsa *mqttJSA, id, idHash string, seq uint64, opts *Option
idHash: idHash,
seq: seq,
maxp: maxp,
pubRelSubject: mqttQoS2PubRelStoredSubjectPrefix + idHash,
pubRelDeliverySubject: mqttQoS2PubRelDeliverySubjectPrefix + idHash,
pubRelDeliverySubjectB: []byte(mqttQoS2PubRelDeliverySubjectPrefix + idHash),
pubRelSubject: mqttPubRelSubjectPrefix + idHash,
pubRelDeliverySubject: mqttPubRelDeliverySubjectPrefix + idHash,
pubRelDeliverySubjectB: []byte(mqttPubRelDeliverySubjectPrefix + idHash),
}
}

Expand Down Expand Up @@ -2664,7 +2665,7 @@ func (sess *mqttSession) clear() error {
}
}
if pubRelDur != "" {
_, err := sess.jsa.deleteConsumer(mqttQoS2PubRelStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
Expand Down Expand Up @@ -4463,10 +4464,10 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
}

ccr := &CreateConsumerRequest{
Stream: mqttQoS2PubRelStreamName,
Stream: mqttOutStreamName,
Config: ConsumerConfig{
DeliverSubject: sess.pubRelDeliverySubject,
Durable: mqttQoS2PubRelConsumerDurablePrefix + sess.idHash,
Durable: mqttPubRelConsumerDurablePrefix + sess.idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: sess.pubRelSubject,
Expand Down

0 comments on commit b24941e

Please sign in to comment.