diff --git a/filestore.go b/filestore.go index 6d010f897..0e63ab346 100644 --- a/filestore.go +++ b/filestore.go @@ -239,34 +239,34 @@ func (store *fileStore) NextTargetMsgSeqNum() int { // SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent func (store *fileStore) SetNextSenderMsgSeqNum(next int) error { - if err := store.cache.SetNextSenderMsgSeqNum(next); err != nil { - return errors.Wrap(err, "cache") + if err := store.setSeqNum(store.senderSeqNumsFile, next); err != nil { + return errors.Wrap(err, "file") } - return store.setSeqNum(store.senderSeqNumsFile, next) + return store.cache.SetNextSenderMsgSeqNum(next) } // SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received func (store *fileStore) SetNextTargetMsgSeqNum(next int) error { - if err := store.cache.SetNextTargetMsgSeqNum(next); err != nil { - return errors.Wrap(err, "cache") + if err := store.setSeqNum(store.targetSeqNumsFile, next); err != nil { + return errors.Wrap(err, "file") } - return store.setSeqNum(store.targetSeqNumsFile, next) + return store.cache.SetNextTargetMsgSeqNum(next) } // IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent func (store *fileStore) IncrNextSenderMsgSeqNum() error { - if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache") + if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "file") } - return store.setSeqNum(store.senderSeqNumsFile, store.cache.NextSenderMsgSeqNum()) + return nil } // IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received func (store *fileStore) IncrNextTargetMsgSeqNum() error { - if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache") + if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "file") } - return store.setSeqNum(store.targetSeqNumsFile, store.cache.NextTargetMsgSeqNum()) + return nil } // CreationTime returns the creation time of the store diff --git a/mongostore.go b/mongostore.go index e50af2c16..0933e65a4 100644 --- a/mongostore.go +++ b/mongostore.go @@ -220,18 +220,18 @@ func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { // IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent func (store *mongoStore) IncrNextSenderMsgSeqNum() error { - if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache incr") + if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "save sequence number") } - return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum()) + return nil } // IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received func (store *mongoStore) IncrNextTargetMsgSeqNum() error { - if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache incr") + if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "save sequence number") } - return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum()) + return nil } // CreationTime returns the creation time of the store @@ -266,6 +266,11 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, } iter := store.db.DB(store.mongoDatabase).C(store.messagesCollection).Find(seqFilter).Sort("msgseq").Iter() + if iter.Err() != nil { + err = iter.Err() + _ = iter.Close() + return + } for iter.Next(msgFilter) { msgs = append(msgs, msgFilter.Message) } diff --git a/sqlstore.go b/sqlstore.go index 3569359af..662b3cffa 100644 --- a/sqlstore.go +++ b/sqlstore.go @@ -237,18 +237,18 @@ func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error { // IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent func (store *sqlStore) IncrNextSenderMsgSeqNum() error { - if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache incr next") + if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "store next") } - return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum()) + return nil } // IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received func (store *sqlStore) IncrNextTargetMsgSeqNum() error { - if err := store.cache.IncrNextTargetMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache incr next") + if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { + return errors.Wrap(err, "store next") } - return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum()) + return nil } // CreationTime returns the creation time of the store