Skip to content

Commit

Permalink
Migrate old retained messages to new subjects
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 1, 2023
1 parent 7469038 commit 007565f
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const (
mqttJSAIdTokenPos = 3
mqttJSATokenPos = 4
mqttJSAStreamCreate = "SC"
mqttJSAStreamUpdate = "SU"
mqttJSAStreamLookup = "SL"
mqttJSAStreamDel = "SD"
mqttJSAConsumerCreate = "CC"
Expand Down Expand Up @@ -1169,7 +1170,17 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
if err != nil {
return nil, err
}
as.transferRetainedToPerKeySubjectStream(s)
}
} else {
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
si.Config.Subjects = []string{wantedSubj}
if _, err := jsa.updateStream(&si.Config); err != nil {
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
}
as.transferRetainedToPerKeySubjectStream(s)
}

var lastSeq uint64
Expand Down Expand Up @@ -1354,6 +1365,19 @@ func (jsa *mqttJSA) createStream(cfg *StreamConfig) (*StreamInfo, bool, error) {
return scr.StreamInfo, scr.DidCreate, scr.ToError()
}

func (jsa *mqttJSA) updateStream(cfg *StreamConfig) (*StreamInfo, error) {
cfgb, err := json.Marshal(cfg)
if err != nil {
return nil, err
}
scri, err := jsa.newRequest(mqttJSAStreamUpdate, fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), 0, cfgb)
if err != nil {
return nil, err
}
scr := scri.(*JSApiStreamUpdateResponse)
return scr.StreamInfo, scr.ToError()
}

func (jsa *mqttJSA) lookupStream(name string) (*StreamInfo, error) {
slri, err := jsa.newRequest(mqttJSAStreamLookup, fmt.Sprintf(JSApiStreamInfoT, name), 0, nil)
if err != nil {
Expand Down Expand Up @@ -1386,6 +1410,20 @@ func (jsa *mqttJSA) loadLastMsgFor(streamName string, subject string) (*StoredMs
return lmr.Message, lmr.ToError()
}

func (jsa *mqttJSA) loadNextMsgFor(streamName string, subject string) (*StoredMsg, error) {
mreq := &JSApiMsgGetRequest{NextFor: subject}
req, err := json.Marshal(mreq)
if err != nil {
return nil, err
}
lmri, err := jsa.newRequest(mqttJSAMsgLoad, fmt.Sprintf(JSApiMsgGetT, streamName), 0, req)
if err != nil {
return nil, err
}
lmr := lmri.(*JSApiMsgGetResponse)
return lmr.Message, lmr.ToError()
}

func (jsa *mqttJSA) loadMsg(streamName string, seq uint64) (*StoredMsg, error) {
mreq := &JSApiMsgGetRequest{Seq: seq}
req, err := json.Marshal(mreq)
Expand Down Expand Up @@ -1465,6 +1503,12 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
resp.Error = NewJSInvalidJSONError()
}
ch <- resp
case mqttJSAStreamUpdate:
var resp = &JSApiStreamUpdateResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
}
ch <- resp
case mqttJSAStreamLookup:
var resp = &JSApiStreamInfoResponse{}
if err := json.Unmarshal(msg, &resp); err != nil {
Expand Down Expand Up @@ -2261,6 +2305,57 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
retry = false
}

func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) {
jsa := &as.jsa
var count, errors int

// Set retry to true, will be set to false on success.
defer func() {
if errors > 0 {
next := mqttDefaultTransferRetry
log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next)
time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) })
} else if count > 0 {
log.Noticef("Transfer of %d MQTT retained messages done!", count)
}
}()

for {
// Try and look up messages on the original undivided "$MQTT.rmsgs" subject.
// If nothing is returned here, we assume to have migrated all old messages.
smsg, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs")
if err != nil {
if IsNatsErr(err, JSNoMessageFoundErr) {
// We've ran out of messages to transfer so give up.
break
}
log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err)
return
}
// Unmarshal the message so that we can obtain the subject name.
var rmsg mqttRetainedMsg
if err := json.Unmarshal(smsg.Data, &rmsg); err != nil {
log.Warnf(" Unable to unmarshal retained message with sequence %d, skipping", smsg.Sequence)
errors++
continue
}
// Store the message again, this time with the new per-key subject.
subject := mqttRetainedMsgsStreamSubject + as.domainTk + rmsg.Subject
if _, err := jsa.storeMsgWithKind(mqttJSASessPersist, subject, 0, smsg.Data); err != nil {
log.Errorf(" Unable to transfer the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
continue
}
// Delete the original message.
if err := jsa.deleteMsg(mqttRetainedMsgsStreamName, smsg.Sequence, true); err != nil {
log.Errorf(" Unable to clean up the retained message with sequence %d: %v", smsg.Sequence, err)
errors++
continue
}
count++
}
}

//////////////////////////////////////////////////////////////////////////////
//
// MQTT session related functions
Expand Down

0 comments on commit 007565f

Please sign in to comment.