Skip to content

Commit

Permalink
MQTT: Fixed issue that could cause time out storing messages
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 30, 2023
1 parent 5565008 commit 460bfdc
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
7 changes: 7 additions & 0 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3047,6 +3047,13 @@ func (s *Server) mqttProcessPub(c *client, pp *mqttPublish) error {
// Unless we have a publish permission error, if the message is QoS1, then we
// need to store the message (and deliver it to JS durable consumers).
if _, permIssue := c.processInboundClientMsg(msgToSend); !permIssue && mqttGetQoS(pp.flags) > 0 {
// We need to call flushClients now since this we may have called c.addToPCD
// with destination clients (possibly a route). Without calling flushClients
// the following call may then be stuck waiting for a reply that may never
// come because the destination is not flushed (due to c.out.fsp > 0,
// see addToPCD and writeLoop for details).
c.flushClients(0)
// Store this QoS1 message.
_, err = c.mqtt.sess.jsa.storeMsg(mqttStreamSubjectPrefix+string(c.pa.subject), c.pa.hdr, msgToSend)
}
c.pa.subject, c.pa.mapped, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.reply = nil, nil, -1, 0, nil, nil
Expand Down
2 changes: 1 addition & 1 deletion server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2791,7 +2791,7 @@ func TestMQTTCluster(t *testing.T) {
}
})
}
if topTest.restart {
if !t.Failed() && topTest.restart {
cl.stopAll()
cl.restartAll()

Expand Down

0 comments on commit 460bfdc

Please sign in to comment.