Skip to content

Commit

Permalink
Fix sequence number bug when storage fails
Browse files Browse the repository at this point in the history
  • Loading branch information
letian0805 committed Feb 8, 2021
1 parent 8f22d9e commit 9d890e4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
24 changes: 12 additions & 12 deletions filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,34 +239,34 @@ func (store *fileStore) NextTargetMsgSeqNum() int {

// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent
func (store *fileStore) SetNextSenderMsgSeqNum(next int) error {
if err := store.cache.SetNextSenderMsgSeqNum(next); err != nil {
return errors.Wrap(err, "cache")
if err := store.setSeqNum(store.senderSeqNumsFile, next); err != nil {
return errors.Wrap(err, "file")
}
return store.setSeqNum(store.senderSeqNumsFile, next)
return store.cache.SetNextSenderMsgSeqNum(next)
}

// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received
func (store *fileStore) SetNextTargetMsgSeqNum(next int) error {
if err := store.cache.SetNextTargetMsgSeqNum(next); err != nil {
return errors.Wrap(err, "cache")
if err := store.setSeqNum(store.targetSeqNumsFile, next); err != nil {
return errors.Wrap(err, "file")
}
return store.setSeqNum(store.targetSeqNumsFile, next)
return store.cache.SetNextTargetMsgSeqNum(next)
}

// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent
func (store *fileStore) IncrNextSenderMsgSeqNum() error {
if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache")
if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "file")
}
return store.setSeqNum(store.senderSeqNumsFile, store.cache.NextSenderMsgSeqNum())
return nil
}

// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received
func (store *fileStore) IncrNextTargetMsgSeqNum() error {
if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache")
if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "file")
}
return store.setSeqNum(store.targetSeqNumsFile, store.cache.NextTargetMsgSeqNum())
return nil
}

// CreationTime returns the creation time of the store
Expand Down
17 changes: 11 additions & 6 deletions mongostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error {

// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent
func (store *mongoStore) IncrNextSenderMsgSeqNum() error {
if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache incr")
if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "save sequence number")
}
return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum())
return nil
}

// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received
func (store *mongoStore) IncrNextTargetMsgSeqNum() error {
if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache incr")
if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "save sequence number")
}
return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum())
return nil
}

// CreationTime returns the creation time of the store
Expand Down Expand Up @@ -266,6 +266,11 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte,
}

iter := store.db.DB(store.mongoDatabase).C(store.messagesCollection).Find(seqFilter).Sort("msgseq").Iter()
if iter.Err() != nil {
err = iter.Err()
_ = iter.Close()
return
}
for iter.Next(msgFilter) {
msgs = append(msgs, msgFilter.Message)
}
Expand Down
12 changes: 6 additions & 6 deletions sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,18 @@ func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {

// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent
func (store *sqlStore) IncrNextSenderMsgSeqNum() error {
if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache incr next")
if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "store next")
}
return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum())
return nil
}

// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received
func (store *sqlStore) IncrNextTargetMsgSeqNum() error {
if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil {
return errors.Wrap(err, "cache incr next")
if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil {
return errors.Wrap(err, "store next")
}
return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum())
return nil
}

// CreationTime returns the creation time of the store
Expand Down

0 comments on commit 9d890e4

Please sign in to comment.