From bdae8465b37259d0cc3e36fab565bdf1b34694d6 Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Thu, 28 Jul 2016 11:32:34 -0500 Subject: [PATCH 1/2] allow sends outside of session being logged on --- pending_timeout_test.go | 18 ++-- registry.go | 2 - resend_state_test.go | 23 ++--- session.go | 38 ++++++--- session_test.go | 183 ++++++++++++++++++++++++++++++++-------- 5 files changed, 200 insertions(+), 64 deletions(-) diff --git a/pending_timeout_test.go b/pending_timeout_test.go index b263fda2a..9d2e10bff 100644 --- a/pending_timeout_test.go +++ b/pending_timeout_test.go @@ -7,26 +7,23 @@ import ( ) func TestPendingTimeout_SessionTimeout(t *testing.T) { - session := &session{ - log: nullLog{}, - } - tests := []pendingTimeout{ pendingTimeout{inSession{}}, pendingTimeout{resendState{}}, } for _, state := range tests { + session := &session{ + log: nullLog{}, + sessionState: state, + } + nextState := state.Timeout(session, peerTimeout) assert.IsType(t, latentState{}, nextState) } } func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) { - session := &session{ - log: nullLog{}, - } - tests := []pendingTimeout{ pendingTimeout{inSession{}}, pendingTimeout{resendState{}}, @@ -35,6 +32,11 @@ func TestPendingTimeout_TimeoutUnchangedState(t *testing.T) { testEvents := []event{needHeartbeat, logonTimeout, logoutTimeout} for _, state := range tests { + session := &session{ + log: nullLog{}, + sessionState: state, + } + for _, event := range testEvents { nextState := state.Timeout(session, event) assert.Equal(t, state, nextState) diff --git a/registry.go b/registry.go index 6f45e1593..ab7a85c4c 100644 --- a/registry.go +++ b/registry.go @@ -39,8 +39,6 @@ func SendToTarget(m Messagable, sessionID SessionID) error { session, err := lookupSession(sessionID) if err != nil { return err - } else if session.toSend == nil { - return fmt.Errorf("Not logged on") } request := sendRequest{msg, make(chan error)} diff --git a/resend_state_test.go b/resend_state_test.go index d8da1cb25..d14510e05 100644 --- a/resend_state_test.go +++ b/resend_state_test.go @@ -16,13 +16,15 @@ func TestResendState_TimeoutPeerTimeout(t *testing.T) { <-otherEnd }() + state := resendState{} session := &session{ - store: new(memoryStore), - application: new(TestClient), - messageOut: otherEnd, - log: nullLog{}, + store: new(memoryStore), + application: new(TestClient), + messageOut: otherEnd, + log: nullLog{}, + sessionState: state, } - state := resendState{} + nextState := state.Timeout(session, peerTimeout) assert.Equal(t, pendingTimeout{state}, nextState) } @@ -33,13 +35,14 @@ func TestResendState_TimeoutUnchanged(t *testing.T) { <-otherEnd }() + state := resendState{} session := &session{ - store: new(memoryStore), - application: new(TestClient), - messageOut: otherEnd, - log: nullLog{}, + store: new(memoryStore), + application: new(TestClient), + messageOut: otherEnd, + log: nullLog{}, + sessionState: state, } - state := resendState{} tests := []event{needHeartbeat, logonTimeout, logoutTimeout} diff --git a/session.go b/session.go index e07c1bac6..62400cfe7 100644 --- a/session.go +++ b/session.go @@ -130,6 +130,7 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin return err } + session.toSend = make(chan sendRequest) session.sessionEvent = make(chan event) session.messageEvent = make(chan bool) session.application = application @@ -212,6 +213,14 @@ func (s *session) resend(msg Message) error { return nil } +func (s *session) persist(seqNum int, msgBytes []byte) error { + if err := s.store.SaveMessage(seqNum, msgBytes); err != nil { + return err + } + + return s.store.IncrNextSenderMsgSeqNum() +} + //send should NOT be called outside of the run loop func (s *session) send(msg Message) (err error) { s.fillDefaultHeader(msg) @@ -220,7 +229,11 @@ func (s *session) send(msg Message) (err error) { msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum)) var msgType FIXString - if msg.Header.GetField(tagMsgType, &msgType); isAdminMessageType(string(msgType)) { + if err = msg.Header.GetField(tagMsgType, &msgType); err != nil { + return err + } + + if isAdminMessageType(string(msgType)) { s.application.ToAdmin(msg, s.sessionID) } else { s.application.ToApp(msg, s.sessionID) @@ -231,9 +244,19 @@ func (s *session) send(msg Message) (err error) { return } - if err = s.store.SaveMessage(seqNum, msgBytes); err == nil { + if err = s.persist(seqNum, msgBytes); err != nil { + return + } + + if s.IsLoggedOn() { s.sendBytes(msgBytes) - err = s.store.IncrNextSenderMsgSeqNum() + } else { + switch msgType { + case enum.MsgType_LOGON: + fallthrough + case enum.MsgType_RESEND_REQUEST: + s.sendBytes(msgBytes) + } } return @@ -546,7 +569,6 @@ type sendRequest struct { func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.messageIn = msgIn s.messageOut = msgOut - s.toSend = make(chan sendRequest) s.resendIn = make(chan Message, 1) type fromCallback struct { @@ -557,8 +579,6 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { defer func() { close(s.messageOut) - close(s.toSend) - s.toSend = nil s.stateTimer.Stop() s.peerTimer.Stop() s.onDisconnect() @@ -615,11 +635,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { select { case request := <-s.toSend: - if s.IsLoggedOn() { - request.err <- s.send(request.msg) - } else { - request.err <- fmt.Errorf("Not logged on") - } + request.err <- s.send(request.msg) case fixIn, ok := <-msgIn: if ok { s.log.OnIncoming(string(fixIn.bytes)) diff --git a/session_test.go b/session_test.go index 0132155a5..317af6ce6 100644 --- a/session_test.go +++ b/session_test.go @@ -5,6 +5,8 @@ import ( "time" "github.com/quickfixgo/quickfix/enum" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) func buildMessage() Message { @@ -279,50 +281,165 @@ func (e *TestClient) FromApp(msg Message, sessionID SessionID) (reject MessageRe return nil } -func TestSession_CheckToAdminCalled(t *testing.T) { - app := new(TestClient) - otherEnd := make(chan []byte) - go func() { - <-otherEnd - }() +type SessionSendTestSuite struct { + suite.Suite - session := session{ + *TestClient + session + sendChannel chan []byte +} + +func (suite *SessionSendTestSuite) SetupTest() { + suite.sendChannel = make(chan []byte, 1) + suite.TestClient = new(TestClient) + suite.session = session{ store: new(memoryStore), - application: app, - messageOut: otherEnd, + application: suite, log: nullLog{}, + messageOut: suite.sendChannel, } - builder := buildMessage() - builder.Header.SetField(tagMsgType, FIXString("A")) - session.send(builder) +} + +func (suite *SessionSendTestSuite) SentMessage() (msg []byte) { + select { + case msg = <-suite.sendChannel: + default: + } + return +} + +func TestSessionSendTestSuite(t *testing.T) { + suite.Run(t, new(SessionSendTestSuite)) +} + +func (suite *SessionSendTestSuite) SendNewOrderSingle() { + msg := buildMessage() + msg.Header.SetField(tagMsgType, FIXString("D")) + require.Nil(suite.T(), suite.send(msg)) +} + +func (suite *SessionSendTestSuite) SendHeartbeat() { + msg := buildMessage() + msg.Header.SetField(tagMsgType, FIXString("0")) + require.Nil(suite.T(), suite.send(msg)) +} + +func (suite *SessionSendTestSuite) SendLogon() { + msg := buildMessage() + msg.Header.SetField(tagMsgType, FIXString("A")) + require.Nil(suite.T(), suite.send(msg)) +} - if app.adminCalled != 1 { - t.Error("ToAdmin should have been called exactly once, instead was called", app.adminCalled, "times") +func (suite *SessionSendTestSuite) MessagePersisted() { + suite.Equal(2, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should be incremented") + persistedMessages, err := suite.store.GetMessages(1, 1) + suite.Nil(err) + suite.Len(persistedMessages, 1, "The message should be persisted") +} + +func (suite *SessionSendTestSuite) TestSendAppMessageLoggedOn() { + var tests = []struct { + sessionState + }{ + {inSession{}}, + {resendState{}}, + {pendingTimeout{inSession{}}}, + {pendingTimeout{resendState{}}}, } - if app.appCalled != 0 { - t.Error("ToApp should not have been called, instead was called", app.appCalled, "times") + + for _, test := range tests { + suite.SetupTest() + suite.session.sessionState = test.sessionState + + suite.SendNewOrderSingle() + + suite.Equal(1, suite.appCalled, "ToApp should be called") + suite.MessagePersisted() + suite.NotNil(suite.SentMessage(), "The message should have been sent") } } -func TestSession_CheckToAppCalled(t *testing.T) { - app := new(TestClient) - otherEnd := make(chan []byte) - go func() { - <-otherEnd - }() +func (suite *SessionSendTestSuite) TestSendAdminMessageLoggedOn() { + var tests = []struct { + sessionState + }{ + {inSession{}}, + {resendState{}}, + {pendingTimeout{inSession{}}}, + {pendingTimeout{resendState{}}}, + } - session := session{ - store: new(memoryStore), - application: app, - messageOut: otherEnd, - log: nullLog{}} - builder := buildMessage() - session.send(builder) + for _, test := range tests { + suite.SetupTest() + suite.session.sessionState = test.sessionState + + suite.SendHeartbeat() - if app.appCalled != 1 { - t.Error("Toapp should have been called exactly once, instead was called", app.appCalled, "times") + suite.Equal(1, suite.adminCalled, "ToAdmin should be called") + suite.MessagePersisted() + suite.NotNil(suite.SentMessage(), "The message should have been sent") } - if app.adminCalled != 0 { - t.Error("Toadmin should not have been called, instead was called", app.adminCalled, "times") +} + +func (suite *SessionSendTestSuite) TestSendAppMessageNotLoggedOn() { + var tests = []struct { + sessionState + }{ + {logonState{}}, + {logoutState{}}, + {latentState{}}, + } + + for _, test := range tests { + suite.SetupTest() + suite.session.sessionState = test.sessionState + + suite.SendNewOrderSingle() + + suite.Equal(1, suite.appCalled, "ToApp should be called even if the message is not sent") + suite.MessagePersisted() + suite.Nil(suite.SentMessage(), "The message should not be sent") + } +} + +func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnNotSent() { + var tests = []struct { + sessionState + }{ + {logonState{}}, + {logoutState{}}, + {latentState{}}, + } + + for _, test := range tests { + suite.SetupTest() + suite.session.sessionState = test.sessionState + + suite.SendHeartbeat() + + suite.Equal(1, suite.adminCalled, "ToAdmin should be called even if the message is not sent") + suite.MessagePersisted() + suite.Nil(suite.SentMessage(), "The message should not be sent") + } +} + +func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnIsSent() { + var tests = []struct { + sessionState + }{ + {logonState{}}, + {logoutState{}}, + {latentState{}}, + } + + for _, test := range tests { + suite.SetupTest() + suite.session.sessionState = test.sessionState + + suite.SendLogon() + + suite.Equal(1, suite.adminCalled, "ToAdmin should be called") + suite.MessagePersisted() + suite.NotNil(suite.SentMessage(), "The message should be sent") } } From b42495ca0a95a55007878d9df59991571ec74c01 Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Thu, 28 Jul 2016 15:38:10 -0500 Subject: [PATCH 2/2] reverting to send queue for out of session sending --- registry.go | 4 +- session.go | 117 +++++++++++++++++++-------- session_test.go | 205 +++++++++++++++++++++++++----------------------- 3 files changed, 196 insertions(+), 130 deletions(-) diff --git a/registry.go b/registry.go index ab7a85c4c..7e0c909d7 100644 --- a/registry.go +++ b/registry.go @@ -41,9 +41,7 @@ func SendToTarget(m Messagable, sessionID SessionID) error { return err } - request := sendRequest{msg, make(chan error)} - session.toSend <- request - return <-request.err + return session.queueForSend(msg) } type sessionActivate struct { diff --git a/session.go b/session.go index 62400cfe7..f278c4102 100644 --- a/session.go +++ b/session.go @@ -2,6 +2,7 @@ package quickfix import ( "fmt" + "sync" "time" "github.com/quickfixgo/quickfix/config" @@ -18,9 +19,14 @@ type session struct { messageOut chan []byte messageIn chan fixIn - toSend chan sendRequest resendIn chan Message + //application messages are queued up for send here + toSend []Message + + //mutex for access to toSend + sendMutex sync.Mutex + sessionEvent chan event messageEvent chan bool application Application @@ -130,7 +136,6 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin return err } - session.toSend = make(chan sendRequest) session.sessionEvent = make(chan event) session.messageEvent = make(chan bool) session.application = application @@ -213,18 +218,58 @@ func (s *session) resend(msg Message) error { return nil } -func (s *session) persist(seqNum int, msgBytes []byte) error { - if err := s.store.SaveMessage(seqNum, msgBytes); err != nil { - return err +//queueForSend will validate, persist, and queue the message for send +func (s *session) queueForSend(msg Message) (err error) { + s.sendMutex.Lock() + defer s.sendMutex.Unlock() + + if err = s.prepMessageForSend(&msg); err != nil { + return } - return s.store.IncrNextSenderMsgSeqNum() + s.toSend = append(s.toSend, msg) + + select { + case s.messageEvent <- true: + default: + } + + return } -//send should NOT be called outside of the run loop +//send will validate, persist, queue the message and send all messages in the queue func (s *session) send(msg Message) (err error) { - s.fillDefaultHeader(msg) + s.sendMutex.Lock() + defer s.sendMutex.Unlock() + if err = s.prepMessageForSend(&msg); err != nil { + return + } + + s.toSend = append(s.toSend, msg) + s.sendQueued() + + return +} + +//dropAndSend will validate and persist the message, then drops the send queue and sends the message +func (s *session) dropAndSend(msg Message) (err error) { + + s.sendMutex.Lock() + defer s.sendMutex.Unlock() + if err = s.prepMessageForSend(&msg); err != nil { + return + } + + s.dropQueued() + s.toSend = append(s.toSend, msg) + s.sendQueued() + + return +} + +func (s *session) prepMessageForSend(msg *Message) (err error) { + s.fillDefaultHeader(*msg) seqNum := s.store.NextSenderMsgSeqNum() msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum)) @@ -234,9 +279,9 @@ func (s *session) send(msg Message) (err error) { } if isAdminMessageType(string(msgType)) { - s.application.ToAdmin(msg, s.sessionID) + s.application.ToAdmin(*msg, s.sessionID) } else { - s.application.ToApp(msg, s.sessionID) + s.application.ToApp(*msg, s.sessionID) } var msgBytes []byte @@ -244,22 +289,38 @@ func (s *session) send(msg Message) (err error) { return } - if err = s.persist(seqNum, msgBytes); err != nil { - return + return s.persist(seqNum, msgBytes) +} + +func (s *session) persist(seqNum int, msgBytes []byte) error { + if err := s.store.SaveMessage(seqNum, msgBytes); err != nil { + return err + } + + return s.store.IncrNextSenderMsgSeqNum() +} + +func (s *session) sendQueued() { + for _, msg := range s.toSend { + s.sendBytes(msg.rawMessage) } + s.dropQueued() +} + +func (s *session) dropQueued() { + s.toSend = s.toSend[:0] +} + +func (s *session) sendOrDropAppMessages() { + s.sendMutex.Lock() + defer s.sendMutex.Unlock() + if s.IsLoggedOn() { - s.sendBytes(msgBytes) + s.sendQueued() } else { - switch msgType { - case enum.MsgType_LOGON: - fallthrough - case enum.MsgType_RESEND_REQUEST: - s.sendBytes(msgBytes) - } + s.dropQueued() } - - return } func (s *session) sendBytes(msg []byte) { @@ -348,7 +409,7 @@ func (s *session) handleLogon(msg Message) error { } s.log.OnEvent("Responding to logon request") - if err := s.send(reply); err != nil { + if err := s.dropAndSend(reply); err != nil { return err } } else { @@ -561,11 +622,6 @@ type fixIn struct { receiveTime time.Time } -type sendRequest struct { - msg Message - err chan error -} - func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.messageIn = msgIn s.messageOut = msgOut @@ -607,7 +663,7 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { } s.log.OnEvent("Sending logon request") - if err := s.send(logon); err != nil { + if err := s.dropAndSend(logon); err != nil { s.logError(err) return } @@ -627,15 +683,12 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { } for { - switch s.sessionState.(type) { case latentState: return } select { - case request := <-s.toSend: - request.err <- s.send(request.msg) case fixIn, ok := <-msgIn: if ok { s.log.OnIncoming(string(fixIn.bytes)) @@ -674,6 +727,8 @@ func (s *session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { } case evt := <-s.sessionEvent: s.sessionState = s.Timeout(s, evt) + case <-s.messageEvent: + s.sendOrDropAppMessages() } } } diff --git a/session_test.go b/session_test.go index 317af6ce6..85fe7a374 100644 --- a/session_test.go +++ b/session_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/quickfixgo/quickfix/enum" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -251,8 +252,8 @@ func TestSession_CheckTargetTooLow(t *testing.T) { } type TestClient struct { - adminCalled int - appCalled int + adminMessages []Message + appMessages []Message } func (e *TestClient) OnCreate(sessionID SessionID) { @@ -269,11 +270,11 @@ func (e *TestClient) FromAdmin(msg Message, sessionID SessionID) (reject Message } func (e *TestClient) ToAdmin(msg Message, sessionID SessionID) { - e.adminCalled = e.adminCalled + 1 + e.adminMessages = append(e.adminMessages, msg) } func (e *TestClient) ToApp(msg Message, sessionID SessionID) (err error) { - e.appCalled = e.appCalled + 1 + e.appMessages = append(e.appMessages, msg) return nil } @@ -285,14 +286,14 @@ type SessionSendTestSuite struct { suite.Suite *TestClient - session + *session sendChannel chan []byte } func (suite *SessionSendTestSuite) SetupTest() { - suite.sendChannel = make(chan []byte, 1) + suite.sendChannel = make(chan []byte, 10) suite.TestClient = new(TestClient) - suite.session = session{ + suite.session = &session{ store: new(memoryStore), application: suite, log: nullLog{}, @@ -300,7 +301,7 @@ func (suite *SessionSendTestSuite) SetupTest() { } } -func (suite *SessionSendTestSuite) SentMessage() (msg []byte) { +func (suite *SessionSendTestSuite) sentMessage() (msg []byte) { select { case msg = <-suite.sendChannel: default: @@ -312,134 +313,146 @@ func TestSessionSendTestSuite(t *testing.T) { suite.Run(t, new(SessionSendTestSuite)) } -func (suite *SessionSendTestSuite) SendNewOrderSingle() { +func (suite *SessionSendTestSuite) NewOrderSingle() Message { msg := buildMessage() msg.Header.SetField(tagMsgType, FIXString("D")) - require.Nil(suite.T(), suite.send(msg)) + return msg } -func (suite *SessionSendTestSuite) SendHeartbeat() { +func (suite *SessionSendTestSuite) Heartbeat() Message { msg := buildMessage() msg.Header.SetField(tagMsgType, FIXString("0")) - require.Nil(suite.T(), suite.send(msg)) + return msg } -func (suite *SessionSendTestSuite) SendLogon() { +func (suite *SessionSendTestSuite) Logon() Message { msg := buildMessage() msg.Header.SetField(tagMsgType, FIXString("A")) - require.Nil(suite.T(), suite.send(msg)) + return msg } -func (suite *SessionSendTestSuite) MessagePersisted() { +func (suite *SessionSendTestSuite) shouldPersistMessage() { suite.Equal(2, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should be incremented") persistedMessages, err := suite.store.GetMessages(1, 1) suite.Nil(err) suite.Len(persistedMessages, 1, "The message should be persisted") } -func (suite *SessionSendTestSuite) TestSendAppMessageLoggedOn() { - var tests = []struct { - sessionState - }{ - {inSession{}}, - {resendState{}}, - {pendingTimeout{inSession{}}}, - {pendingTimeout{resendState{}}}, - } +func (suite *SessionSendTestSuite) shouldPersistMessageWithSequenceNum(expectedSeqNum int) { + suite.Equal(expectedSeqNum+1, suite.store.NextSenderMsgSeqNum(), "The next sender sequence number should be incremented") + persistedMessages, err := suite.store.GetMessages(expectedSeqNum, expectedSeqNum) + suite.Nil(err) + suite.Len(persistedMessages, 1, "The message should be persisted") +} - for _, test := range tests { - suite.SetupTest() - suite.session.sessionState = test.sessionState +func (suite *SessionSendTestSuite) shouldCallToApp() { + suite.Len(suite.appMessages, 1, "ToApp should be called") + suite.appMessages = []Message{} +} - suite.SendNewOrderSingle() +func (suite *SessionSendTestSuite) shouldCallToAdmin() { + suite.Len(suite.adminMessages, 1, "ToAdmin should be called") + suite.adminMessages = []Message{} +} - suite.Equal(1, suite.appCalled, "ToApp should be called") - suite.MessagePersisted() - suite.NotNil(suite.SentMessage(), "The message should have been sent") - } +func (suite *SessionSendTestSuite) shouldBeType(msg Message, expectedMsgType string) { + actual, err := msg.Header.GetString(tagMsgType) + suite.Nil(err, "Message doesn't have message type") + suite.Equal(expectedMsgType, actual) } -func (suite *SessionSendTestSuite) TestSendAdminMessageLoggedOn() { - var tests = []struct { - sessionState - }{ - {inSession{}}, - {resendState{}}, - {pendingTimeout{inSession{}}}, - {pendingTimeout{resendState{}}}, - } +func (suite *SessionSendTestSuite) lastToApp() Message { + suite.NotEmpty(suite.appMessages, "ToApp should be called") + return suite.appMessages[len(suite.appMessages)-1] +} - for _, test := range tests { - suite.SetupTest() - suite.session.sessionState = test.sessionState +func (suite *SessionSendTestSuite) lastToAdmin() Message { + suite.NotEmpty(suite.appMessages, "ToAdmin should be called") + return suite.adminMessages[len(suite.adminMessages)-1] +} - suite.SendHeartbeat() +func (suite *SessionSendTestSuite) shouldSendMessage() { + suite.NotNil(suite.sentMessage(), "The message should have been sent") +} +func (suite *SessionSendTestSuite) shouldNotSendMessage() { + suite.Nil(suite.sentMessage(), "The message should not have been sent") +} - suite.Equal(1, suite.adminCalled, "ToAdmin should be called") - suite.MessagePersisted() - suite.NotNil(suite.SentMessage(), "The message should have been sent") +func (suite *SessionSendTestSuite) shouldSendMessages(cnt int) { + for i := 0; i < cnt; i++ { + suite.shouldSendMessage() } + + suite.shouldNotSendMessage() } -func (suite *SessionSendTestSuite) TestSendAppMessageNotLoggedOn() { - var tests = []struct { - sessionState - }{ - {logonState{}}, - {logoutState{}}, - {latentState{}}, - } +func (suite *SessionSendTestSuite) sentMessageShouldBe(sentMsg []byte, msg Message) { + expectedBytes, _ := msg.Build() + suite.Equal(string(expectedBytes), string(sentMsg)) +} - for _, test := range tests { - suite.SetupTest() - suite.session.sessionState = test.sessionState +func (suite *SessionSendTestSuite) TestQueueForSendAppMessage() { + require.Nil(suite.T(), suite.queueForSend(suite.NewOrderSingle())) - suite.SendNewOrderSingle() + suite.shouldCallToApp() + suite.shouldPersistMessage() + suite.shouldNotSendMessage() +} - suite.Equal(1, suite.appCalled, "ToApp should be called even if the message is not sent") - suite.MessagePersisted() - suite.Nil(suite.SentMessage(), "The message should not be sent") - } +func (suite *SessionSendTestSuite) TestQueueForSendAdminMessage() { + require.Nil(suite.T(), suite.queueForSend(suite.Heartbeat())) + + suite.shouldCallToAdmin() + suite.shouldPersistMessage() + suite.shouldNotSendMessage() } -func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnNotSent() { - var tests = []struct { - sessionState - }{ - {logonState{}}, - {logoutState{}}, - {latentState{}}, - } +func (suite *SessionSendTestSuite) TestSendAppMessage() { + require.Nil(suite.T(), suite.send(suite.NewOrderSingle())) - for _, test := range tests { - suite.SetupTest() - suite.session.sessionState = test.sessionState + suite.shouldCallToApp() + suite.shouldPersistMessage() + suite.shouldSendMessage() +} - suite.SendHeartbeat() +func (suite *SessionSendTestSuite) TestSendAdminMessage() { + require.Nil(suite.T(), suite.send(suite.Heartbeat())) - suite.Equal(1, suite.adminCalled, "ToAdmin should be called even if the message is not sent") - suite.MessagePersisted() - suite.Nil(suite.SentMessage(), "The message should not be sent") - } + suite.shouldCallToAdmin() + suite.shouldPersistMessage() + suite.shouldSendMessage() } -func (suite *SessionSendTestSuite) TestSendAdminMessageNotLoggedOnIsSent() { - var tests = []struct { - sessionState - }{ - {logonState{}}, - {logoutState{}}, - {latentState{}}, - } +func (suite *SessionSendTestSuite) TestSendFlushesQueue() { + require.Nil(suite.T(), suite.queueForSend(suite.NewOrderSingle())) + require.Nil(suite.T(), suite.queueForSend(suite.Heartbeat())) + suite.shouldNotSendMessage() - for _, test := range tests { - suite.SetupTest() - suite.session.sessionState = test.sessionState + require.Nil(suite.T(), suite.send(suite.NewOrderSingle())) + suite.shouldSendMessages(3) +} - suite.SendLogon() +func (suite *SessionSendTestSuite) TestDropAndSendAdminMessage() { + require.Nil(suite.T(), suite.dropAndSend(suite.Heartbeat())) - suite.Equal(1, suite.adminCalled, "ToAdmin should be called") - suite.MessagePersisted() - suite.NotNil(suite.SentMessage(), "The message should be sent") - } + suite.shouldCallToAdmin() + suite.shouldPersistMessage() + suite.shouldSendMessage() +} + +func (suite *SessionSendTestSuite) TestDropAndSendDropsQueue() { + require.Nil(suite.T(), suite.queueForSend(suite.NewOrderSingle())) + require.Nil(suite.T(), suite.queueForSend(suite.Heartbeat())) + suite.shouldNotSendMessage() + + require.Nil(suite.T(), suite.dropAndSend(suite.Logon())) + msg := suite.lastToAdmin() + suite.shouldBeType(msg, enum.MsgType_LOGON) + + //only one message sent + sentMsgBytes := suite.sentMessage() + suite.NotNil(sentMsgBytes) + suite.shouldNotSendMessage() + + suite.sentMessageShouldBe(sentMsgBytes, msg) }