From 359401580fa443d182a333e763ca2f8741db5a40 Mon Sep 17 00:00:00 2001 From: Alexandros Kyriakakis Date: Wed, 31 Jan 2024 18:35:00 +0200 Subject: [PATCH 1/2] TRD-1749: Escape send queued when blocked on connection side (#18) --- session.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/session.go b/session.go index b359245e2..db5198e04 100644 --- a/session.go +++ b/session.go @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error { s.toSend = append(s.toSend, msgBytes) + s.notifyMessageOut() + + return nil +} + +func (s *session) notifyMessageOut() { select { case s.messageEvent <- true: default: } - - return nil } // send will validate, persist, queue the message. If the session is logged on, send all messages in the queue. @@ -347,8 +351,12 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { } func (s *session) sendQueued() { - for _, msgBytes := range s.toSend { - s.sendBytes(msgBytes) + for i, msgBytes := range s.toSend { + if !s.sendBytes(msgBytes) { + s.toSend = s.toSend[i:] + s.notifyMessageOut() + return + } } s.dropQueued() @@ -366,15 +374,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) { s.sendQueued() } -func (s *session) sendBytes(msg []byte) { +func (s *session) sendBytes(msg []byte) bool { if s.messageOut == nil { s.log.OnEventf("Failed to send: disconnected") - return + return false } - s.log.OnOutgoing(msg) - s.messageOut <- msg - s.stateTimer.Reset(s.HeartBtInt) + select { + case s.messageOut <- msg: + s.log.OnOutgoing(msg) + s.stateTimer.Reset(s.HeartBtInt) + return true + default: + return false + } } func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) { From d7f296817876225b1063f0e4e3ac5ed6800cc7fa Mon Sep 17 00:00:00 2001 From: AlexandrosKyriakakis Date: Tue, 30 Jan 2024 19:08:45 +0200 Subject: [PATCH 2/2] Escape send queued when blocked on connection side use default instead of a timer refactor send queued In case of Log On messages block until sent --- logon_state_test.go | 4 ++-- session.go | 46 ++++++++++++++++++++++++++++++++------------- session_state.go | 2 +- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/logon_state_test.go b/logon_state_test.go index 96afc2e90..ee47a2be6 100644 --- a/logon_state_test.go +++ b/logon_state_test.go @@ -333,7 +333,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooHigh() { s.Require().Nil(err) s.MessageType(string(msgTypeLogon), sentMessage) - s.session.sendQueued() + s.session.sendQueued(true) s.MessageType(string(msgTypeResendRequest), s.MockApp.lastToAdmin) s.FieldEquals(tagBeginSeqNo, 1, s.MockApp.lastToAdmin.Body) @@ -373,7 +373,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooLow() { s.Require().Nil(err) s.MessageType(string(msgTypeLogout), sentMessage) - s.session.sendQueued() + s.session.sendQueued(true) s.MessageType(string(msgTypeLogout), s.MockApp.lastToAdmin) s.FieldEquals(tagText, "MsgSeqNum too low, expecting 2 but received 1", s.MockApp.lastToAdmin.Body) } diff --git a/session.go b/session.go index b359245e2..1bf60b122 100644 --- a/session.go +++ b/session.go @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error { s.toSend = append(s.toSend, msgBytes) + s.notifyMessageOut() + + return nil +} + +func (s *session) notifyMessageOut() { select { case s.messageEvent <- true: default: } - - return nil } // send will validate, persist, queue the message. If the session is logged on, send all messages in the queue. @@ -261,7 +265,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { } s.toSend = append(s.toSend, msgBytes) - s.sendQueued() + s.sendQueued(true) return nil } @@ -290,7 +294,7 @@ func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error { s.dropQueued() s.toSend = append(s.toSend, msgBytes) - s.sendQueued() + s.sendQueued(true) return nil } @@ -346,9 +350,13 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { return s.store.IncrNextSenderMsgSeqNum() } -func (s *session) sendQueued() { - for _, msgBytes := range s.toSend { - s.sendBytes(msgBytes) +func (s *session) sendQueued(blockUntilSent bool) { + for i, msgBytes := range s.toSend { + if !s.sendBytes(msgBytes, blockUntilSent) { + s.toSend = s.toSend[i:] + s.notifyMessageOut() + return + } } s.dropQueued() @@ -363,18 +371,30 @@ func (s *session) EnqueueBytesAndSend(msg []byte) { defer s.sendMutex.Unlock() s.toSend = append(s.toSend, msg) - s.sendQueued() + s.sendQueued(true) } -func (s *session) sendBytes(msg []byte) { +func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool { if s.messageOut == nil { s.log.OnEventf("Failed to send: disconnected") - return + return false + } + + if blockUntilSent { + s.messageOut <- msg + s.log.OnOutgoing(msg) + s.stateTimer.Reset(s.HeartBtInt) + return true } - s.log.OnOutgoing(msg) - s.messageOut <- msg - s.stateTimer.Reset(s.HeartBtInt) + select { + case s.messageOut <- msg: + s.log.OnOutgoing(msg) + s.stateTimer.Reset(s.HeartBtInt) + return true + default: + return false + } } func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) { diff --git a/session_state.go b/session_state.go index 230ac8613..527556209 100644 --- a/session_state.go +++ b/session_state.go @@ -105,7 +105,7 @@ func (sm *stateMachine) SendAppMessages(session *session) { defer session.sendMutex.Unlock() if session.IsLoggedOn() { - session.sendQueued() + session.sendQueued(false) } else { session.dropQueued() }