From 6976a54c3528f8f5f693ddd12e9c909ad8da4f3a Mon Sep 17 00:00:00 2001 From: Sami Date: Wed, 23 Nov 2022 13:18:13 +0100 Subject: [PATCH 1/4] fix persist add store transaction --- filestore.go | 8 ++++++++ mongostore.go | 9 +++++++++ session.go | 8 +++----- sqlstore.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ store.go | 9 +++++++++ 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/filestore.go b/filestore.go index 976273754..46a83e6b4 100644 --- a/filestore.go +++ b/filestore.go @@ -301,6 +301,14 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { return nil } +func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { + err := store.SaveMessage(seqNum, msg) + if err != nil { + return err + } + return store.IncrNextSenderMsgSeqNum() +} + func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) { msgInfo, found := store.offsets[seqNum] if !found { diff --git a/mongostore.go b/mongostore.go index 15c419347..e07cc5c7b 100644 --- a/mongostore.go +++ b/mongostore.go @@ -248,6 +248,15 @@ func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { return } +func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { + // TODO add transaction + err := store.SaveMessage(seqNum, msg) + if err != nil { + return err + } + return store.IncrNextSenderMsgSeqNum() +} + func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) { msgFilter := generateMessageFilter(&store.sessionID) //Marshal into database form diff --git a/session.go b/session.go index 6ef143f2a..5ba2ee29d 100644 --- a/session.go +++ b/session.go @@ -321,12 +321,10 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes func (s *session) persist(seqNum int, msgBytes []byte) error { if !s.DisableMessagePersist { - if err := s.store.SaveMessage(seqNum, msgBytes); err != nil { - return err - } + return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes) + } else { + return s.store.IncrNextSenderMsgSeqNum() } - - return s.store.IncrNextSenderMsgSeqNum() } func (s *session) sendQueued() { diff --git a/sqlstore.go b/sqlstore.go index a57281dd8..5aef634f3 100644 --- a/sqlstore.go +++ b/sqlstore.go @@ -274,6 +274,53 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error { return err } +func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { + s := store.sessionID + + tx, err := store.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.Exec(sqlString(`INSERT INTO messages ( + msgseqnum, message, + beginstring, session_qualifier, + sendercompid, sendersubid, senderlocid, + targetcompid, targetsubid, targetlocid) + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), + seqNum, string(msg), + s.BeginString, s.Qualifier, + s.SenderCompID, s.SenderSubID, s.SenderLocationID, + s.TargetCompID, s.TargetSubID, s.TargetLocationID) + if err != nil { + return err + } + + if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil { + return errors.Wrap(err, "cache incr next") + } + + next := store.cache.NextSenderMsgSeqNum() + _, err = store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? + WHERE beginstring=? AND session_qualifier=? + AND sendercompid=? AND sendersubid=? AND senderlocid=? + AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + next, s.BeginString, s.Qualifier, + s.SenderCompID, s.SenderSubID, s.SenderLocationID, + s.TargetCompID, s.TargetSubID, s.TargetLocationID) + if err != nil { + return err + } + + err = store.cache.SetNextSenderMsgSeqNum(next) + if err != nil { + return err + } + + return tx.Commit() +} + func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { s := store.sessionID var msgs [][]byte diff --git a/store.go b/store.go index 41b6bc0c0..b743a46cc 100644 --- a/store.go +++ b/store.go @@ -20,6 +20,7 @@ type MessageStore interface { CreationTime() time.Time SaveMessage(seqNum int, msg []byte) error + SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) Refresh() error @@ -97,6 +98,14 @@ func (store *memoryStore) SaveMessage(seqNum int, msg []byte) error { return nil } +func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { + err := store.SaveMessage(seqNum, msg) + if err != nil { + return err + } + return store.IncrNextSenderMsgSeqNum() +} + func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { var msgs [][]byte for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ { From 588c4c2b3b1eaa7e94c3612189a556254bb9491b Mon Sep 17 00:00:00 2001 From: Sami Date: Wed, 23 Nov 2022 13:26:02 +0100 Subject: [PATCH 2/4] add missing tx --- sqlstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlstore.go b/sqlstore.go index 5aef634f3..d34cad8b7 100644 --- a/sqlstore.go +++ b/sqlstore.go @@ -302,7 +302,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b } next := store.cache.NextSenderMsgSeqNum() - _, err = store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? + _, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? WHERE beginstring=? AND session_qualifier=? AND sendercompid=? AND sendersubid=? AND senderlocid=? AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), From f5da9feaa7c1d39ce0c9f1a2a86dd697de010bb9 Mon Sep 17 00:00:00 2001 From: Sami Date: Wed, 23 Nov 2022 13:41:37 +0100 Subject: [PATCH 3/4] fix lint --- session.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/session.go b/session.go index 5ba2ee29d..73557cf45 100644 --- a/session.go +++ b/session.go @@ -322,9 +322,9 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes func (s *session) persist(seqNum int, msgBytes []byte) error { if !s.DisableMessagePersist { return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes) - } else { - return s.store.IncrNextSenderMsgSeqNum() } + + return s.store.IncrNextSenderMsgSeqNum() } func (s *session) sendQueued() { From dfc17b07ff6e78ab83bceaa3e968ef4630d3fd96 Mon Sep 17 00:00:00 2001 From: Sami Date: Wed, 23 Nov 2022 17:14:24 +0100 Subject: [PATCH 4/4] move increment after tx commit --- sqlstore.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sqlstore.go b/sqlstore.go index d34cad8b7..a176b7509 100644 --- a/sqlstore.go +++ b/sqlstore.go @@ -297,11 +297,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b return err } - if err := store.cache.IncrNextSenderMsgSeqNum(); err != nil { - return errors.Wrap(err, "cache incr next") - } - - next := store.cache.NextSenderMsgSeqNum() + next := store.cache.NextSenderMsgSeqNum() + 1 _, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? WHERE beginstring=? AND session_qualifier=? AND sendercompid=? AND sendersubid=? AND senderlocid=? @@ -313,12 +309,17 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b return err } + err = tx.Commit() + if err != nil { + return err + } + err = store.cache.SetNextSenderMsgSeqNum(next) if err != nil { return err } - return tx.Commit() + return nil } func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {