From d38ffc055d7c1f54eb975f29bf600821cc44105a Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Mon, 25 Jul 2016 11:38:35 -0500 Subject: [PATCH 1/2] normalized some error logging in session code --- in_session.go | 37 +++++++++++++++++++++++++++---------- initiator.go | 2 +- session.go | 21 +++++++++++++++++---- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/in_session.go b/in_session.go index e9cd5dc29..d690065a9 100644 --- a/in_session.go +++ b/in_session.go @@ -40,8 +40,9 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio if err := msg.Header.GetField(tagMsgType, &msgType); err == nil { switch string(msgType) { case enum.MsgType_LOGON: - session.handleLogon(msg) - return + if err := session.handleLogon(msg); err != nil { + return session.handleError(err) + } case enum.MsgType_LOGOUT: session.log.OnEvent("Received logout request") session.log.OnEvent("Sending logout response") @@ -56,7 +57,9 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio } } - session.store.IncrNextTargetMsgSeqNum() + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { + return session.handleError(err) + } return } @@ -79,12 +82,16 @@ func (state inSession) Timeout(session *session, event event) (nextState session case needHeartbeat: heartBt := NewMessage() heartBt.Header.SetField(tagMsgType, FIXString("0")) - session.send(heartBt) + if err := session.send(heartBt); err != nil { + return session.handleError(err) + } case peerTimeout: testReq := NewMessage() testReq.Header.SetField(tagMsgType, FIXString("1")) testReq.Body.SetField(tagTestReqID, FIXString("TEST")) - session.send(testReq) + if err := session.send(testReq); err != nil { + return session.handleError(err) + } session.log.OnEvent("Sent test request TEST") session.peerTimer.Reset(time.Duration(int64(1.2 * float64(session.heartBeatTimeout)))) return pendingTimeout{} @@ -100,10 +107,14 @@ func (state inSession) handleTestRequest(session *session, msg Message) (nextSta heartBt := NewMessage() heartBt.Header.SetField(tagMsgType, FIXString("0")) heartBt.Body.SetField(tagTestReqID, testReq) - session.send(heartBt) + if err := session.send(heartBt); err != nil { + return session.handleError(err) + } } - session.store.IncrNextTargetMsgSeqNum() + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { + return session.handleError(err) + } return state } @@ -115,7 +126,9 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS switch { case newSeqNo > expectedSeqNum: - session.store.SetNextTargetMsgSeqNum(int(newSeqNo)) + if err := session.store.SetNextTargetMsgSeqNum(int(newSeqNo)); err != nil { + return session.handleError(err) + } case newSeqNo < expectedSeqNum: //FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess) session.doReject(msg, valueIsIncorrectNoTag()) @@ -150,7 +163,9 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS } state.resendMessages(session, int(beginSeqNo), endSeqNo) - session.store.IncrNextTargetMsgSeqNum() + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { + return session.handleError(err) + } return state } @@ -217,7 +232,9 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR return logoutState{} default: session.doReject(msg, rej) - session.store.IncrNextTargetMsgSeqNum() + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { + return session.handleError(err) + } return state } } diff --git a/initiator.go b/initiator.go index f15f4063e..29bc9820f 100644 --- a/initiator.go +++ b/initiator.go @@ -34,7 +34,7 @@ func (i *Initiator) Start() error { return fmt.Errorf("error on SocketConnectPort: %v", err) } - var reconnectInterval int = 30 // Default configuration (in seconds) + reconnectInterval := 30 // Default configuration (in seconds) if s.HasSetting(config.ReconnectInterval) { if reconnectInterval, err = s.IntSetting(config.ReconnectInterval); err != nil { return fmt.Errorf("error on ReconnectInterval: %v", err) diff --git a/session.go b/session.go index 588458965..0345e9f4a 100644 --- a/session.go +++ b/session.go @@ -39,6 +39,15 @@ type session struct { targetDefaultApplVerID string } +func (s *session) logError(err error) { + s.log.OnEvent(err.Error()) +} + +func (s *session) handleError(err error) sessionState { + s.logError(err) + return latentState{} +} + //TargetDefaultApplicationVersionID returns the default application version ID for messages received by this version. //Applicable for For FIX.T.1 sessions. func (s *session) TargetDefaultApplicationVersionID() string { @@ -307,7 +316,9 @@ func (s *session) handleLogon(msg Message) error { } s.log.OnEvent("Responding to logon request") - s.send(reply) + if err := s.send(reply); err != nil { + return err + } } else { s.log.OnEvent("Received logon response") } @@ -323,8 +334,7 @@ func (s *session) handleLogon(msg Message) error { } } - s.store.IncrNextTargetMsgSeqNum() - return nil + return s.store.IncrNextTargetMsgSeqNum() } func (s *session) initiateLogout(reason string) { @@ -561,7 +571,10 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { } s.log.OnEvent("Sending logon request") - s.send(logon) + if err := s.send(logon); err != nil { + s.logError(err) + return + } } fixMsgIn := func(msg Message) { From 38d031bde880303eb8ebae463752b41267310973 Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Mon, 25 Jul 2016 12:38:37 -0500 Subject: [PATCH 2/2] more error handling in session code --- in_session.go | 77 +++++++++++++++++++++++++++++++++++++++------------ session.go | 47 +++++++++++++++++++++---------- 2 files changed, 92 insertions(+), 32 deletions(-) diff --git a/in_session.go b/in_session.go index d690065a9..130c26b9c 100644 --- a/in_session.go +++ b/in_session.go @@ -43,10 +43,14 @@ func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessio if err := session.handleLogon(msg); err != nil { return session.handleError(err) } + + return case enum.MsgType_LOGOUT: session.log.OnEvent("Received logout request") session.log.OnEvent("Sending logout response") - session.sendLogout("") + if err := session.sendLogout(""); err != nil { + return session.handleError(err) + } nextState = latentState{} case enum.MsgType_TEST_REQUEST: return state.handleTestRequest(session, msg) @@ -68,7 +72,9 @@ func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRej if err := msg.Header.GetField(tagMsgType, &msgType); err == nil { switch string(msgType) { case enum.MsgType_LOGON: - session.initiateLogout("") + if err := session.initiateLogout(""); err != nil { + return session.handleError(err) + } return logoutState{} case enum.MsgType_LOGOUT: return latentState{} @@ -131,7 +137,9 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS } case newSeqNo < expectedSeqNum: //FIXME: to be compliant with legacy tests, do not include tag in reftagid? (11c_NewSeqNoLess) - session.doReject(msg, valueIsIncorrectNoTag()) + if err := session.doReject(msg, valueIsIncorrectNoTag()); err != nil { + return session.handleError(err) + } } } return state @@ -162,18 +170,21 @@ func (state inSession) handleResendRequest(session *session, msg Message) (nextS endSeqNo = expectedSeqNum - 1 } - state.resendMessages(session, int(beginSeqNo), endSeqNo) + if err := state.resendMessages(session, int(beginSeqNo), endSeqNo); err != nil { + return session.handleError(err) + } + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { return session.handleError(err) } return state } -func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) { +func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int) (err error) { msgs, err := session.store.GetMessages(beginSeqNo, endSeqNo) if err != nil { session.log.OnEventf("error retrieving messages from store: %s", err.Error()) - panic(err) + return } seqNum := beginSeqNo @@ -192,8 +203,10 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int state.generateSequenceReset(session, seqNum, sentMessageSeqNum) } - session.resend(msg) session.log.OnEventf("Resending Message: %v", sentMessageSeqNum) + if err = session.resend(msg); err != nil { + return + } seqNum = sentMessageSeqNum + 1 nextSeqNum = seqNum @@ -202,6 +215,8 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int if seqNum != nextSeqNum { // gapfill for catch-up state.generateSequenceReset(session, seqNum, nextSeqNum) } + + return } func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) { @@ -210,7 +225,9 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR switch session.sessionState.(type) { default: - session.doTargetTooHigh(TypedError) + if err := session.doTargetTooHigh(TypedError); err != nil { + return session.handleError(err) + } case resendState: //assumes target too high reject already sent } @@ -221,17 +238,27 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR case targetTooLow: return state.doTargetTooLow(session, msg, TypedError) case incorrectBeginString: - session.initiateLogout(rej.Error()) + if err := session.initiateLogout(rej.Error()); err != nil { + session.handleError(err) + } return logoutState{} } switch rej.RejectReason() { case rejectReasonCompIDProblem, rejectReasonSendingTimeAccuracyProblem: - session.doReject(msg, rej) - session.initiateLogout("") + if err := session.doReject(msg, rej); err != nil { + return session.handleError(err) + } + + if err := session.initiateLogout(""); err != nil { + return session.handleError(err) + } return logoutState{} default: - session.doReject(msg, rej) + if err := session.doReject(msg, rej); err != nil { + return session.handleError(err) + } + if err := session.store.IncrNextTargetMsgSeqNum(); err != nil { return session.handleError(err) } @@ -245,7 +272,9 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT origSendingTime := new(FIXUTCTimestamp) if err = msg.Header.GetField(tagOrigSendingTime, origSendingTime); err != nil { - session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)) + if rejErr := session.doReject(msg, RequiredTagMissing(tagOrigSendingTime)); rejErr != nil { + return session.handleError(rejErr) + } return state } @@ -253,18 +282,30 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT msg.Header.GetField(tagSendingTime, sendingTime) if sendingTime.Before(origSendingTime.Time) { - session.doReject(msg, sendingTimeAccuracyProblem()) - session.initiateLogout("") + if err := session.doReject(msg, sendingTimeAccuracyProblem()); err != nil { + return session.handleError(err) + } + + if err := session.initiateLogout(""); err != nil { + return session.handleError(err) + } return logoutState{} } if appReject := session.fromCallback(msg); appReject != nil { - session.doReject(msg, appReject) - session.initiateLogout("") + if err := session.doReject(msg, appReject); err != nil { + return session.handleError(err) + } + + if err := session.initiateLogout(""); err != nil { + return session.handleError(err) + } return logoutState{} } } else { - session.initiateLogout(rej.Error()) + if err := session.initiateLogout(rej.Error()); err != nil { + return session.handleError(err) + } return logoutState{} } diff --git a/session.go b/session.go index 0345e9f4a..e07c1bac6 100644 --- a/session.go +++ b/session.go @@ -182,7 +182,7 @@ func (s *session) fillDefaultHeader(msg Message) { s.insertSendingTime(msg.Header) } -func (s *session) sendLogout(reason string) { +func (s *session) sendLogout(reason string) error { logout := NewMessage() logout.Header.SetField(tagMsgType, FIXString("5")) logout.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString)) @@ -191,10 +191,10 @@ func (s *session) sendLogout(reason string) { if reason != "" { logout.Body.SetField(tagText, FIXString(reason)) } - s.send(logout) + return s.send(logout) } -func (s *session) resend(msg Message) { +func (s *session) resend(msg Message) error { msg.Header.SetField(tagPossDupFlag, FIXBoolean(true)) var origSendingTime FIXString @@ -204,8 +204,12 @@ func (s *session) resend(msg Message) { s.insertSendingTime(msg.Header) - msg.Build() + if _, err := msg.Build(); err != nil { + return err + } s.sendBytes(msg.rawMessage) + + return nil } //send should NOT be called outside of the run loop @@ -241,7 +245,7 @@ func (s *session) sendBytes(msg []byte) { s.stateTimer.Reset(time.Duration(s.heartBeatTimeout)) } -func (s *session) doTargetTooHigh(reject targetTooHigh) { +func (s *session) doTargetTooHigh(reject targetTooHigh) error { s.log.OnEventf("MsgSeqNum too high, expecting %v but received %v", reject.ExpectedTarget, reject.ReceivedTarget) resend := NewMessage() @@ -254,9 +258,13 @@ func (s *session) doTargetTooHigh(reject targetTooHigh) { } resend.Body.SetField(tagEndSeqNo, FIXInt(endSeqNum)) - s.send(resend) + if err := s.send(resend); err != nil { + return err + } s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", reject.ExpectedTarget, endSeqNum) + + return nil } func (s *session) verifyLogon(msg Message) MessageRejectError { @@ -287,6 +295,7 @@ func (s *session) verifyLogon(msg Message) MessageRejectError { return s.verifyIgnoreSeqNumTooHigh(msg) } + return nil } @@ -329,18 +338,22 @@ func (s *session) handleLogon(msg Message) error { if err := s.checkTargetTooHigh(msg); err != nil { switch TypedError := err.(type) { case targetTooHigh: - s.doTargetTooHigh(TypedError) - return nil + return s.doTargetTooHigh(TypedError) } } return s.store.IncrNextTargetMsgSeqNum() } -func (s *session) initiateLogout(reason string) { +func (s *session) initiateLogout(reason string) (err error) { + if err = s.sendLogout(reason); err != nil { + s.logError(err) + return + } s.log.OnEvent("Inititated logout request") - s.sendLogout(reason) time.AfterFunc(time.Duration(2)*time.Second, func() { s.sessionEvent <- logoutTimeout }) + + return } func (s *session) verify(msg Message) MessageRejectError { @@ -474,7 +487,7 @@ func (s *session) checkBeginString(msg Message) MessageRejectError { return nil } -func (s *session) doReject(msg Message, rej MessageRejectError) { +func (s *session) doReject(msg Message, rej MessageRejectError) error { reply := msg.reverseRoute() if s.sessionID.BeginString >= enum.BeginStringFIX42 { @@ -516,8 +529,8 @@ func (s *session) doReject(msg Message, rej MessageRejectError) { reply.Body.SetField(tagRefSeqNum, seqNum) } - s.send(reply) s.log.OnEventf("Message Rejected: %v", rej.Error()) + return s.send(reply) } type fixIn struct { @@ -553,7 +566,10 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { if s.initiateLogon { if s.resetOnLogon { - s.store.Reset() + if err := s.store.Reset(); err != nil { + s.logError(err) + return + } } logon := NewMessage() @@ -632,7 +648,10 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { case <-quit: quit = nil // prevent infinitly receiving on a closed channel if s.IsLoggedOn() { - s.initiateLogout("") + if err := s.initiateLogout(""); err != nil { + s.logError(err) + return + } s.sessionState = logoutState{} } else { return