Skip to content

Commit

Permalink
[FIXED] JetStream republished messages not being received by MQTT QoS…
Browse files Browse the repository at this point in the history
…0 subscribers (#4303)

Only discard messages from MQTT QoS0 from internal JetStream clients if
really a QoS1 JetStream publish, not just a JetStream client.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4291
  • Loading branch information
derekcollison committed Jul 13, 2023
2 parents 0c8552c + 1f39d74 commit 77189b0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
6 changes: 3 additions & 3 deletions server/mqtt.go
@@ -1,4 +1,4 @@
// Copyright 2020-2021 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -3407,8 +3407,8 @@ func mqttSubscribeTrace(pi uint16, filters []*mqttFilter) string {
// message and this is the callback for a QoS1 subscription because in
// that case, it will be handled by the other callback. This avoid getting
// duplicate deliveries.
func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, _ string, rmsg []byte) {
if pc.kind == JETSTREAM {
func mqttDeliverMsgCbQos0(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) {
if pc.kind == JETSTREAM && len(reply) > 0 && strings.HasPrefix(reply, jsAckPre) {
return
}

Expand Down
45 changes: 44 additions & 1 deletion server/mqtt_test.go
@@ -1,4 +1,4 @@
// Copyright 2020 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -6367,6 +6367,49 @@ func TestMQTTSubjectWildcardStart(t *testing.T) {
require_True(t, si.State.Msgs == 0)
}

// Issue https://github.com/nats-io/nats-server/issues/4291
func TestMQTTJetStreamRepublishAndQoS0Subscribers(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream: enabled
mqtt {
listen: 127.0.0.1:-1
}
`))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

nc, js := jsClientConnect(t, s)
defer nc.Close()

// Setup stream with republish on it.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
RePublish: &nats.RePublish{
Source: "foo",
Destination: "mqtt.foo",
},
})
require_NoError(t, err)

// Create QoS0 subscriber to catch re-publishes.
mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "mqtt/foo", qos: 0}}, []byte{0})
testMQTTFlush(t, mc, nil, r)

msg := []byte("HELLO WORLD")
_, err = js.Publish("foo", msg)
require_NoError(t, err)

testMQTTCheckPubMsg(t, mc, r, "mqtt/foo", 0, msg)
testMQTTExpectNothing(t, r)
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down

0 comments on commit 77189b0

Please sign in to comment.