Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,31 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
continue
}

if !session.resend(msg) {
nextSeqNum = sentMessageSeqNum + 1
continue
}

if seqNum != sentMessageSeqNum {
state.generateSequenceReset(session, seqNum, sentMessageSeqNum)
if err = state.generateSequenceReset(session, seqNum, sentMessageSeqNum); err != nil {
return err
}
}

session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
if err = session.resend(msg); err != nil {
return
if _, err := msg.Build(); err != nil {
return err
}
session.sendBytes(msg.rawMessage)

seqNum = sentMessageSeqNum + 1
nextSeqNum = seqNum
}

if seqNum != nextSeqNum { // gapfill for catch-up
state.generateSequenceReset(session, seqNum, nextSeqNum)
if err = state.generateSequenceReset(session, seqNum, nextSeqNum); err != nil {
return err
}
}

return
Expand Down Expand Up @@ -335,7 +345,7 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT
return state
}

func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int) {
func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int) (err error) {
sequenceReset := NewMessage()
session.fillDefaultHeader(sequenceReset)

Expand All @@ -350,9 +360,15 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int,
sequenceReset.Header.SetField(tagOrigSendingTime, origSendingTime)
}

//FIXME error check?
msgBytes, _ := sequenceReset.Build()
session.sendBytes(msgBytes)
session.application.ToAdmin(sequenceReset, session.sessionID)

msgBytes, err := sequenceReset.Build()
if err != nil {
return
}

session.sendBytes(msgBytes)
session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo)

return
}
109 changes: 109 additions & 0 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,112 @@ func (s *InSessionTestSuite) TestFIXMsgInTargetTooHigh() {
stashedRawMsg, _ := stashedMsg.Build()
s.Equal(string(rawMsg), string(stashedRawMsg))
}

func (s *InSessionTestSuite) TestFIXMsgInResendRequestAllAdminExpectGapFill() {
s.MockApp.On("ToAdmin")
s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 3)
s.NextSenderMsgSeqNum(4)

s.MockApp.On("FromAdmin").Return(nil)
s.MockApp.On("ToAdmin")
s.fixMsgIn(s.session, s.ResendRequest(1))

s.MockApp.AssertExpectations(s.T())
s.LastToAdminMessageSent()
s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin)
s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagNewSeqNo, 4, s.MockApp.lastToAdmin.Body)
s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body)

s.NextSenderMsgSeqNum(4)
s.State(inSession{})
}

func (s *InSessionTestSuite) TestFIXMsgInResendRequestAllAdminThenApp() {
s.MockApp.On("ToAdmin")
s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.MockApp.On("ToApp").Return(nil)
s.session.send(s.NewOrderSingle())
s.LastToAppMessageSent()

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 2)
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
s.NextSenderMsgSeqNum(4)

s.MockApp.On("FromAdmin").Return(nil)
s.MockApp.On("ToAdmin")
s.MockApp.On("ToApp").Return(nil)
s.fixMsgIn(s.session, s.ResendRequest(1))

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 3)
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2)

s.LastToAdminMessageSent()
s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin)
s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagNewSeqNo, 3, s.MockApp.lastToAdmin.Body)
s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body)

s.LastToAppMessageSent()
s.MessageType(enum.MsgType_ORDER_SINGLE, s.MockApp.lastToApp)
s.FieldEquals(tagMsgSeqNum, 3, s.MockApp.lastToApp.Header)
s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToApp.Header)

s.NextSenderMsgSeqNum(4)
s.State(inSession{})
}

func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
s.MockApp.On("ToAdmin")
s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.MockApp.On("ToApp").Return(nil)
s.session.send(s.NewOrderSingle())
s.LastToAppMessageSent()

s.session.Timeout(s.session, internal.NeedHeartbeat)
s.LastToAdminMessageSent()

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 2)
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)
s.NextSenderMsgSeqNum(4)

//NOTE: a cheat here, need to reset mock
s.MockApp = MockApp{}
s.MockApp.On("FromAdmin").Return(nil)
s.MockApp.On("ToApp").Return(ErrDoNotSend)
s.MockApp.On("ToAdmin")
s.fixMsgIn(s.session, s.ResendRequest(1))

s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 1)
s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1)

s.LastToAdminMessageSent()
s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin)
s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header)
s.FieldEquals(tagNewSeqNo, 4, s.MockApp.lastToAdmin.Body)
s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body)

s.NoMessageSent()

s.NextSenderMsgSeqNum(4)
s.State(inSession{})
}
31 changes: 26 additions & 5 deletions quickfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KnowsFieldMap interface {
Has(Tag) bool
GetString(Tag) (string, MessageRejectError)
GetInt(Tag) (int, MessageRejectError)
GetField(Tag, FieldValueReader) MessageRejectError
}

func (s *QuickFIXSuite) MessageType(msgType string, msg Message) {
Expand All @@ -35,6 +36,11 @@ func (s *QuickFIXSuite) FieldEquals(tag Tag, expectedValue interface{}, fieldMap
val, err := fieldMap.GetInt(tag)
s.Nil(err)
s.Equal(expected, val)
case bool:
var val FIXBoolean
err := fieldMap.GetField(tag, &val)
s.Nil(err)
s.Equal(expected, val.Bool())
default:
s.FailNow("Field type not handled")
}
Expand Down Expand Up @@ -122,19 +128,34 @@ func (m *MessageFactory) buildMessage(msgType string) Message {
}

func (m *MessageFactory) Logout() Message {
return m.buildMessage("5")
return m.buildMessage(enum.MsgType_LOGOUT)
}

func (m *MessageFactory) NewOrderSingle() Message {
return m.buildMessage("D")
return m.buildMessage(enum.MsgType_ORDER_SINGLE)
}

func (m *MessageFactory) Heartbeat() Message {
return m.buildMessage("0")
return m.buildMessage(enum.MsgType_HEARTBEAT)
}

func (m *MessageFactory) Logon() Message {
return m.buildMessage("A")
return m.buildMessage(enum.MsgType_LOGON)
}

func (m *MessageFactory) ResendRequest(beginSeqNo int) Message {
msg := m.buildMessage(enum.MsgType_RESEND_REQUEST)
msg.Body.SetField(tagBeginSeqNo, FIXInt(beginSeqNo))
msg.Body.SetField(tagEndSeqNo, FIXInt(0))

return msg
}

func (m *MessageFactory) SequenceReset(seqNo int) Message {
msg := m.buildMessage(enum.MsgType_SEQUENCE_RESET)
msg.Body.SetField(tagNewSeqNo, FIXInt(seqNo))

return msg
}

type MockSessionReceiver struct {
Expand Down Expand Up @@ -216,7 +237,7 @@ func (s *SessionSuiteRig) Disconnected() {

func (s *SessionSuiteRig) NoMessageSent() {
msg, _ := s.Receiver.LastMessage()
s.Nil(msg, "no message should be sent")
s.Nil(msg, "no message should be sent but got %s", msg)
}

func (s *SessionSuiteRig) NoMessageQueued() {
Expand Down
31 changes: 31 additions & 0 deletions resend_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,34 @@ func (s *resendStateTestSuite) TestFixMsgIn() {
s.State(inSession{})
s.NextTargetMsgSeqNum(5)
}

func (s *resendStateTestSuite) TestFixMsgInSequenceReset() {
s.session.State = inSession{}

//in session expects seq number 1, send too high
s.MessageFactory.SetNextSeqNum(3)
s.MockApp.On("ToAdmin")

msgSeqNum2 := s.NewOrderSingle()
s.fixMsgIn(s.session, msgSeqNum2)

s.MockApp.AssertExpectations(s.T())
s.State(resendState{})
s.LastToAdminMessageSent()
s.MessageType(enum.MsgType_RESEND_REQUEST, s.MockApp.lastToAdmin)
s.FieldEquals(tagBeginSeqNo, 1, s.MockApp.lastToAdmin.Body)
s.NextTargetMsgSeqNum(1)

s.MessageFactory.SetNextSeqNum(1)
s.MockApp.On("FromAdmin").Return(nil)
s.fixMsgIn(s.session, s.SequenceReset(2))
s.NextTargetMsgSeqNum(2)
s.State(resendState{})

s.MockApp.On("FromApp").Return(nil)
s.fixMsgIn(s.session, s.NewOrderSingle())

s.MockApp.AssertNumberOfCalls(s.T(), "FromApp", 2)
s.NextTargetMsgSeqNum(4)
s.State(inSession{})
}
9 changes: 2 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *session) sendLogout(reason string) error {
return s.send(logout)
}

func (s *session) resend(msg Message) error {
func (s *session) resend(msg Message) bool {
msg.Header.SetField(tagPossDupFlag, FIXBoolean(true))

var origSendingTime FIXString
Expand All @@ -156,12 +156,7 @@ func (s *session) resend(msg Message) error {

s.insertSendingTime(msg.Header)

if _, err := msg.Build(); err != nil {
return err
}
s.sendBytes(msg.rawMessage)

return nil
return s.application.ToApp(msg, s.sessionID) == nil
}

//queueForSend will validate, persist, and queue the message for send
Expand Down