From 5ce491140b0143e7ac60b9cb67745eebe0b710bd Mon Sep 17 00:00:00 2001 From: bhaan Date: Mon, 11 Jul 2016 15:03:51 +0000 Subject: [PATCH 1/3] synchronize msg sends with session loop (wip) --- registry.go | 8 +++---- session.go | 55 +++++++++++++++---------------------------------- session_test.go | 53 ----------------------------------------------- 3 files changed, 20 insertions(+), 96 deletions(-) diff --git a/registry.go b/registry.go index eb05ef7a4..ab7a85c4c 100644 --- a/registry.go +++ b/registry.go @@ -41,11 +41,9 @@ func SendToTarget(m Messagable, sessionID SessionID) error { return err } - //NOTE: must queue for send here. otherwise, if not executed in same goroutine as session run loop, - //message may be sent on closed channel or sent outside of valid state - session.queueForSend(msg) - - return nil + request := sendRequest{msg, make(chan error)} + session.toSend <- request + return <-request.err } type sessionActivate struct { diff --git a/session.go b/session.go index 485af1949..3023a8eea 100644 --- a/session.go +++ b/session.go @@ -2,7 +2,6 @@ package quickfix import ( "fmt" - "sync" "time" "github.com/quickfixgo/quickfix/config" @@ -18,11 +17,8 @@ type session struct { sessionID SessionID messageOut chan []byte - - //application messages are queued up to be sent during the run loop. - toSend []Message - //mutex for access to toSend - sendMutex sync.Mutex + messageIn chan fixIn + toSend chan sendRequest sessionEvent chan event messageEvent chan bool @@ -189,35 +185,8 @@ func (s *session) resend(msg Message) { s.sendBytes(msg.rawMessage) } -//queueForSend will queue up a message to be sent by the session during the next iteration of the run loop -func (s *session) queueForSend(msg Message) { - s.sendMutex.Lock() - defer s.sendMutex.Unlock() - s.toSend = append(s.toSend, msg) - select { - case s.messageEvent <- true: - default: - } -} - -//sends queued messages if session is logged on -func (s *session) sendQueued() { - if !s.IsLoggedOn() { - return - } - - s.sendMutex.Lock() - defer s.sendMutex.Unlock() - - for _, msg := range s.toSend { - s.send(msg) - } - - s.toSend = s.toSend[:0] -} - //send should NOT be called outside of the run loop -func (s *session) send(msg Message) { +func (s *session) send(msg Message) error { s.fillDefaultHeader(msg) seqNum := s.store.NextSenderMsgSeqNum() @@ -230,12 +199,13 @@ func (s *session) send(msg Message) { s.application.ToApp(msg, s.sessionID) } if msgBytes, err := msg.Build(); err != nil { - panic(err) + return err } else { s.store.SaveMessage(seqNum, msgBytes) s.sendBytes(msgBytes) s.store.IncrNextSenderMsgSeqNum() } + return nil } func (s *session) sendBytes(msg []byte) { @@ -511,7 +481,13 @@ type fixIn struct { receiveTime time.Time } +type sendRequest struct { + msg Message + err chan error +} + func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { + s.messageIn = msgIn s.messageOut = msgOut defer func() { @@ -549,9 +525,13 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { return } - s.sendQueued() - select { + case request := <-s.toSend: + if s.IsLoggedOn() { + request.err <- s.send(request.msg) + } else { + request.err <- fmt.Errorf("Not logged on") + } case fixIn, ok := <-msgIn: if ok { s.log.OnIncoming(string(fixIn.bytes)) @@ -575,7 +555,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { } case evt := <-s.sessionEvent: s.sessionState = s.Timeout(s, evt) - case <-s.messageEvent: } } } diff --git a/session_test.go b/session_test.go index 824770a63..0132155a5 100644 --- a/session_test.go +++ b/session_test.go @@ -326,56 +326,3 @@ func TestSession_CheckToAppCalled(t *testing.T) { t.Error("Toadmin should not have been called, instead was called", app.adminCalled, "times") } } - -func TestSession_sendQueued(t *testing.T) { - app := new(TestClient) - otherEnd := make(chan []byte) - go func() { - for { - _, ok := <-otherEnd - if !ok { - return - } - } - }() - session := session{ - store: new(memoryStore), - application: app, - messageOut: otherEnd, - log: nullLog{}} - session.queueForSend(buildMessage()) - session.queueForSend(buildMessage()) - session.queueForSend(buildMessage()) - - if len(session.toSend) != 3 { - t.Errorf("Expected %v queued messages, got %v", 3, len(session.toSend)) - } - - var tests = []struct { - sessionState - }{ - {logonState{}}, - {logoutState{}}, - } - - for _, test := range tests { - session.sessionState = test.sessionState - session.sendQueued() - - if app.appCalled != 0 { - t.Fatalf("session state %v should not allow send but sent %v times", session.sessionState, app.appCalled) - } - } - - session.sessionState = inSession{} - - session.sendQueued() - - if app.appCalled != 3 { - t.Errorf("Toapp should have been called %v times, instead was called %v times", 3, app.appCalled) - } - - if len(session.toSend) != 0 { - t.Errorf("Expected no queued messages, got %v", len(session.toSend)) - } -} From 71268149c99bc012f10ff07f1dbfe4818c651428 Mon Sep 17 00:00:00 2001 From: bhaan Date: Mon, 11 Jul 2016 15:40:41 +0000 Subject: [PATCH 2/3] differentiate "user" and "session" space - avoids potential deadlocks (wip) --- in_session.go | 116 +++++++++++++++++++++++------------------------ latent_state.go | 8 ++++ logon_state.go | 30 +++++++++--- logout_state.go | 14 ++++-- resend_state.go | 27 +++++++---- session.go | 46 ++++++++++++++++--- session_state.go | 9 +++- 7 files changed, 163 insertions(+), 87 deletions(-) diff --git a/in_session.go b/in_session.go index 325b8f713..3cdac27a8 100644 --- a/in_session.go +++ b/in_session.go @@ -12,37 +12,65 @@ type inSession struct { func (state inSession) String() string { return "In Session" } func (state inSession) IsLoggedOn() bool { return true } +func (state inSession) VerifyMsgIn(session *session, msg Message) (err MessageRejectError) { + var msgType FIXString + if err := msg.Header.GetField(tagMsgType, &msgType); err == nil { + switch string(msgType) { + case enum.MsgType_LOGON: + return session.verifyLogon(msg) + case enum.MsgType_LOGOUT: + return nil + case enum.MsgType_RESEND_REQUEST: + return session.verifyIgnoreSeqNumTooHighOrLow(msg) + case enum.MsgType_SEQUENCE_RESET: + var gapFillFlag FIXBoolean + msg.Body.GetField(tagGapFillFlag, &gapFillFlag) + return session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag)) + default: + return session.verify(msg) + } + } + return nil +} + func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessionState) { var msgType FIXString if err := msg.Header.GetField(tagMsgType, &msgType); err == nil { switch string(msgType) { - //logon - case "A": - return state.handleLogon(session, msg) - //logout - case "5": - return state.handleLogout(session, msg) - //test request - case "1": + case enum.MsgType_LOGON: + session.handleLogon(msg) + return state + case enum.MsgType_LOGOUT: + session.log.OnEvent("Received logout request") + session.log.OnEvent("Sending logout response") + state.generateLogout(session) + return latentState{} + case enum.MsgType_TEST_REQUEST: return state.handleTestRequest(session, msg) - //resend request - case "2": + case enum.MsgType_RESEND_REQUEST: return state.handleResendRequest(session, msg) - //sequence reset - case "4": + case enum.MsgType_SEQUENCE_RESET: return state.handleSequenceReset(session, msg) - default: - if err := session.verify(msg); err != nil { - return state.processReject(session, msg, err) - } } } session.store.IncrNextTargetMsgSeqNum() - return state } +func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) { + var msgType FIXString + if err := msg.Header.GetField(tagMsgType, &msgType); err == nil { + switch string(msgType) { + case enum.MsgType_LOGON: + return state.initiateLogout(session, "") + case enum.MsgType_LOGOUT: + return latentState{} + } + } + return state.processReject(session, msg, rej) +} + func (state inSession) Timeout(session *session, event event) (nextState sessionState) { switch event { case needHeartbeat: @@ -60,30 +88,22 @@ func (state inSession) Timeout(session *session, event event) (nextState session return state } -func (state inSession) handleLogon(session *session, msg Message) (nextState sessionState) { - if err := session.handleLogon(msg); err != nil { - return state.initiateLogout(session, "") +func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) { + var testReq FIXString + if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil { + session.log.OnEvent("Test Request with no testRequestID") + } else { + heartBt := NewMessage() + heartBt.Header.SetField(tagMsgType, FIXString("0")) + heartBt.Body.SetField(tagTestReqID, testReq) + session.send(heartBt) } + session.store.IncrNextTargetMsgSeqNum() return state } -func (state inSession) handleLogout(session *session, msg Message) (nextState sessionState) { - session.log.OnEvent("Received logout request") - session.log.OnEvent("Sending logout response") - - state.generateLogout(session) - return latentState{} -} - func (state inSession) handleSequenceReset(session *session, msg Message) (nextState sessionState) { - var gapFillFlag FIXBoolean - msg.Body.GetField(tagGapFillFlag, &gapFillFlag) - - if err := session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag)); err != nil { - return state.processReject(session, msg, err) - } - var newSeqNo FIXInt if err := msg.Body.GetField(tagNewSeqNo, &newSeqNo); err == nil { expectedSeqNum := FIXInt(session.store.NextTargetMsgSeqNum()) @@ -97,15 +117,10 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS session.doReject(msg, valueIsIncorrectNoTag()) } } - return state } func (state inSession) handleResendRequest(session *session, msg Message) (nextState sessionState) { - if err := session.verifyIgnoreSeqNumTooHighOrLow(msg); err != nil { - return state.processReject(session, msg, err) - } - var err error var beginSeqNoField FIXInt if err = msg.Body.GetField(tagBeginSeqNo, &beginSeqNoField); err != nil { @@ -167,26 +182,6 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int } } -func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) { - if err := session.verify(msg); err != nil { - return state.processReject(session, msg, err) - } - - var testReq FIXString - if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil { - session.log.OnEvent("Test Request with no testRequestID") - } else { - heartBt := NewMessage() - heartBt.Header.SetField(tagMsgType, FIXString("0")) - heartBt.Body.SetField(tagTestReqID, testReq) - session.send(heartBt) - } - - session.store.IncrNextTargetMsgSeqNum() - - return state -} - func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) { switch TypedError := rej.(type) { case targetTooHigh: @@ -197,6 +192,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR case resendState: //assumes target too high reject already sent } + session.messageStash[TypedError.ReceivedTarget] = msg return resendState{} diff --git a/latent_state.go b/latent_state.go index c9023efea..c4274a43c 100644 --- a/latent_state.go +++ b/latent_state.go @@ -5,11 +5,19 @@ type latentState struct{} func (state latentState) String() string { return "Latent State" } func (state latentState) IsLoggedOn() bool { return false } +func (state latentState) VerifyMsgIn(session *session, msg Message) MessageRejectError { + return InvalidMessageType() +} + func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) { session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg) return state } +func (state latentState) FixMsgInRej(session *session, msg Message, err MessageRejectError) (nextState sessionState) { + return state.FixMsgIn(session, msg) +} + func (state latentState) Timeout(*session, event) (nextState sessionState) { return state } diff --git a/logon_state.go b/logon_state.go index 8675feb08..208ebea04 100644 --- a/logon_state.go +++ b/logon_state.go @@ -1,22 +1,40 @@ package quickfix +import "github.com/quickfixgo/quickfix/enum" + type logonState struct{} func (state logonState) String() string { return "Logon State" } func (s logonState) IsLoggedOn() bool { return false } -func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) { +func (s logonState) VerifyMsgIn(session *session, msg Message) MessageRejectError { var msgType FIXString - if err := msg.Header.GetField(tagMsgType, &msgType); err == nil && string(msgType) == "A" { - if err := session.handleLogon(msg); err != nil { + if err := msg.Header.GetField(tagMsgType, &msgType); err != nil { + return RequiredTagMissing(tagMsgType) + } + + switch string(msgType) { + case enum.MsgType_LOGON: + err := session.verifyLogon(msg) + if err != nil { session.log.OnEvent(err.Error()) - return latentState{} } + return err + default: + session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg) + return InvalidMessageType() + } +} - return inSession{} +func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) { + if err := session.handleLogon(msg); err != nil { + session.log.OnEvent(err.Error()) + return latentState{} } + return inSession{} +} - session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg) +func (s logonState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState { return latentState{} } diff --git a/logout_state.go b/logout_state.go index d4811676c..35c58e161 100644 --- a/logout_state.go +++ b/logout_state.go @@ -1,11 +1,14 @@ package quickfix -type logoutState struct { -} +import "github.com/quickfixgo/quickfix/enum" + +type logoutState struct{} func (state logoutState) String() string { return "Logout State" } func (s logoutState) IsLoggedOn() bool { return false } +func (state logoutState) VerifyMsgIn(session *session, msg Message) MessageRejectError { return nil } + func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) { var msgType FIXString if err := msg.Header.GetField(tagMsgType, &msgType); err != nil { @@ -13,8 +16,7 @@ func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sess } switch string(msgType) { - //logout - case "5": + case enum.MsgType_LOGOUT: session.log.OnEvent("Received logout response") return latentState{} default: @@ -22,6 +24,10 @@ func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sess } } +func (state logoutState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState { + return state.FixMsgIn(session, msg) +} + func (state logoutState) Timeout(session *session, event event) (nextState sessionState) { switch event { case logoutTimeout: diff --git a/resend_state.go b/resend_state.go index 37a8425a1..d622d23b2 100644 --- a/resend_state.go +++ b/resend_state.go @@ -4,20 +4,29 @@ type resendState struct { inSession } +func (state resendState) String() string { return "Resend" } + func (state resendState) FixMsgIn(session *session, msg Message) (nextState sessionState) { - for ok := true; ok; { - nextState = state.inSession.FixMsgIn(session, msg) + return state.handleNextState(session, state.inSession.FixMsgIn(session, msg)) +} - if !nextState.IsLoggedOn() { - return - } +func (state resendState) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) { + return state.handleNextState(session, state.inSession.FixMsgInRej(session, msg, rej)) +} - msg, ok = session.messageStash[session.store.NextTargetMsgSeqNum()] +func (state resendState) handleNextState(session *session, nextState sessionState) sessionState { + if !nextState.IsLoggedOn() || len(session.messageStash) == 0 { + return nextState } - if len(session.messageStash) != 0 { - nextState = resendState{} + targetSeqNum := session.store.NextTargetMsgSeqNum() + if msg, ok := session.messageStash[targetSeqNum]; ok { + delete(session.messageStash, targetSeqNum) + + // FIXME add a "resend" channel to the session loop to differentiate between + // new incoming fix messages, and stashed messages from the resend state + go func() { session.messageIn <- fixIn{msg.rawMessage, msg.ReceiveTime} }() } - return + return resendState{} } diff --git a/session.go b/session.go index 3023a8eea..54dbba085 100644 --- a/session.go +++ b/session.go @@ -228,13 +228,13 @@ func (s *session) doTargetTooHigh(reject targetTooHigh) { s.send(resend) } -func (s *session) handleLogon(msg Message) error { +func (s *session) verifyLogon(msg Message) MessageRejectError { //Grab default app ver id from fixt.1.1 logon if s.sessionID.BeginString == enum.BeginStringFIXT11 { var targetApplVerID FIXString if err := msg.Body.GetField(tagDefaultApplVerID, &targetApplVerID); err != nil { - return err + return RequiredTagMissing(tagDefaultApplVerID) } s.targetDefaultApplVerID = string(targetApplVerID) @@ -254,10 +254,13 @@ func (s *session) handleLogon(msg Message) error { } } - if err := s.verifyIgnoreSeqNumTooHigh(msg); err != nil { - return err - } + return s.verifyIgnoreSeqNumTooHigh(msg) + } + return nil +} +func (s *session) handleLogon(msg Message) error { + if !s.initiateLogon { reply := NewMessage() reply.Header.SetField(tagMsgType, FIXString("A")) reply.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString)) @@ -271,6 +274,8 @@ func (s *session) handleLogon(msg Message) error { reply.Body.SetField(tagHeartBtInt, heartBtInt) } + var resetSeqNumFlag FIXBoolean + msg.Body.GetField(tagResetSeqNumFlag, &resetSeqNumFlag) if resetSeqNumFlag { reply.Body.SetField(tagResetSeqNumFlag, resetSeqNumFlag) } @@ -342,7 +347,7 @@ func (s *session) verifySelect(msg Message, checkTooHigh bool, checkTooLow bool) } } - return s.fromCallback(msg) + return nil } func (s *session) fromCallback(msg Message) MessageRejectError { @@ -489,9 +494,18 @@ type sendRequest struct { func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.messageIn = msgIn s.messageOut = msgOut + s.toSend = make(chan sendRequest) + + type fromCallback struct { + msg Message + rej MessageRejectError + } + fromCallbackCh := make(chan fromCallback) defer func() { close(s.messageOut) + close(s.toSend) + close(fromCallbackCh) s.onDisconnect() }() @@ -539,13 +553,31 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), fixIn.bytes) } else { msg.ReceiveTime = fixIn.receiveTime - s.sessionState = s.FixMsgIn(s, msg) + if rej := s.sessionState.VerifyMsgIn(s, msg); rej != nil { + s.sessionState = s.sessionState.FixMsgInRej(s, msg, rej) + } else { + // "turn off" incoming fix messages until the call + // to FromAdmin/App returns + msgIn = nil + go func() { + fromCallbackCh <- fromCallback{msg, s.fromCallback(msg)} + }() + } } } else { return } s.peerTimer.Reset(time.Duration(int64(1.2 * float64(s.heartBeatTimeout)))) + case callback := <-fromCallbackCh: + // "turn on" incoming fix message now that + // FromAdmin/App has completed + msgIn = s.messageIn + if callback.rej == nil { + s.sessionState = s.sessionState.FixMsgIn(s, callback.msg) + } else { + s.sessionState = s.sessionState.FixMsgInRej(s, callback.msg, callback.rej) + } case <-quit: quit = nil // prevent infinitly receiving on a closed channel if state, ok := s.sessionState.(inSession); ok { diff --git a/session_state.go b/session_state.go index 71572ac00..4dad5a288 100644 --- a/session_state.go +++ b/session_state.go @@ -5,10 +5,17 @@ import "fmt" //sessionState is the current state of the session state machine. The session state determines how the session responds to //incoming messages, timeouts, and requests to send application messages. type sessionState interface { - //FixMsgIn is called by the session on incoming messages from the counter party. The return type is the next session state + //VerifyFixMsgIn is called by the session on incoming messages from the counter party. The return type is a MessageRejectError resulting from session level validation. It returns nil if the message is valid. + VerifyMsgIn(*session, Message) MessageRejectError + + //FixMsgIn is called by the session on validated incoming messages from the counter party. The return type is the next session state //following message processing FixMsgIn(*session, Message) (nextState sessionState) + //FixMsgInRej is called by the session on rejected incoming messages from the counter party. The return type is the next session state + //following message processing + FixMsgInRej(*session, Message, MessageRejectError) (nextState sessionState) + //Timeout is called by the session on a timeout event. Timeout(*session, event) (nextState sessionState) From 7850dca6f29e4067ee0597d9ef7abae35469b31e Mon Sep 17 00:00:00 2001 From: bhaan Date: Mon, 11 Jul 2016 18:57:34 +0000 Subject: [PATCH 3/3] dedicated resend channel during resend state --- connection.go | 4 +--- resend_state.go | 5 +---- session.go | 29 ++++++++++++++++++----------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/connection.go b/connection.go index 7a5a939cf..c27e15e56 100644 --- a/connection.go +++ b/connection.go @@ -116,9 +116,7 @@ func writeLoop(connection io.Writer, messageOut chan []byte) { } func readLoop(parser *parser, msgIn chan fixIn) { - defer func() { - close(msgIn) - }() + defer close(msgIn) for { msg, err := parser.ReadMessage() diff --git a/resend_state.go b/resend_state.go index d622d23b2..d27e855f3 100644 --- a/resend_state.go +++ b/resend_state.go @@ -22,10 +22,7 @@ func (state resendState) handleNextState(session *session, nextState sessionStat targetSeqNum := session.store.NextTargetMsgSeqNum() if msg, ok := session.messageStash[targetSeqNum]; ok { delete(session.messageStash, targetSeqNum) - - // FIXME add a "resend" channel to the session loop to differentiate between - // new incoming fix messages, and stashed messages from the resend state - go func() { session.messageIn <- fixIn{msg.rawMessage, msg.ReceiveTime} }() + session.resendIn <- msg } return resendState{} diff --git a/session.go b/session.go index 54dbba085..0b1ad83fe 100644 --- a/session.go +++ b/session.go @@ -19,6 +19,7 @@ type session struct { messageOut chan []byte messageIn chan fixIn toSend chan sendRequest + resendIn chan Message sessionEvent chan event messageEvent chan bool @@ -495,6 +496,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.messageIn = msgIn s.messageOut = msgOut s.toSend = make(chan sendRequest) + s.resendIn = make(chan Message, 1) type fromCallback struct { msg Message @@ -505,7 +507,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { defer func() { close(s.messageOut) close(s.toSend) - close(fromCallbackCh) s.onDisconnect() }() @@ -532,6 +533,19 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.send(logon) } + fixMsgIn := func(msg Message) { + if rej := s.sessionState.VerifyMsgIn(s, msg); rej != nil { + s.sessionState = s.sessionState.FixMsgInRej(s, msg, rej) + } else { + // "turn off" incoming fix messages until the call + // to FromAdmin/App returns + msgIn = nil + go func() { + fromCallbackCh <- fromCallback{msg, s.fromCallback(msg)} + }() + } + } + for { switch s.sessionState.(type) { @@ -553,21 +567,14 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), fixIn.bytes) } else { msg.ReceiveTime = fixIn.receiveTime - if rej := s.sessionState.VerifyMsgIn(s, msg); rej != nil { - s.sessionState = s.sessionState.FixMsgInRej(s, msg, rej) - } else { - // "turn off" incoming fix messages until the call - // to FromAdmin/App returns - msgIn = nil - go func() { - fromCallbackCh <- fromCallback{msg, s.fromCallback(msg)} - }() - } + fixMsgIn(msg) } } else { return } s.peerTimer.Reset(time.Duration(int64(1.2 * float64(s.heartBeatTimeout)))) + case msg := <-s.resendIn: + fixMsgIn(msg) case callback := <-fromCallbackCh: // "turn on" incoming fix message now that // FromAdmin/App has completed