Skip to content

Commit

Permalink
Merge pull request #200 from cbusbey/session_time
Browse files Browse the repository at this point in the history
Session time
  • Loading branch information
cbusbey committed Aug 9, 2016
2 parents 4d04457 + 5f8baf6 commit e319b76
Show file tree
Hide file tree
Showing 20 changed files with 307 additions and 214 deletions.
19 changes: 10 additions & 9 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func (a *Acceptor) Start() error {

connections := a.listenForConnections(server)

a.quitChan = make(chan bool)
go func() {
for cxn := range connections {
go handleAcceptorConnection(cxn, a.qualifiedSessionIDs, a.globalLog, a.quitChan)
go handleAcceptorConnection(cxn, a.qualifiedSessionIDs, a.globalLog)
}
}()

Expand All @@ -66,12 +65,14 @@ func (a *Acceptor) Stop() {

//NewAcceptor creates and initializes a new Acceptor.
func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Settings, logFactory LogFactory) (*Acceptor, error) {
a := new(Acceptor)
a.app = app
a.storeFactory = storeFactory
a.settings = settings
a.logFactory = logFactory
a.qualifiedSessionIDs = make(map[SessionID]SessionID)
a := &Acceptor{
quitChan: make(chan bool),
app: app,
storeFactory: storeFactory,
settings: settings,
logFactory: logFactory,
qualifiedSessionIDs: make(map[SessionID]SessionID),
}

var err error
a.globalLog, err = logFactory.Create()
Expand All @@ -91,7 +92,7 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se
}
a.qualifiedSessionIDs[unqualifiedSessionID] = sessionID

if err = createSession(sessionID, storeFactory, sessionSettings, logFactory, app); err != nil {
if err = createSession(sessionID, storeFactory, sessionSettings, logFactory, app, a.quitChan); err != nil {
return nil, err
}
}
Expand Down
14 changes: 6 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

//Picks up session from net.Conn Initiator
func handleInitiatorConnection(address string, log Log, sessID SessionID, quit chan bool, reconnectInterval time.Duration, tlsConfig *tls.Config) {
func handleInitiatorConnection(address string, log Log, sessID SessionID, reconnectInterval time.Duration, tlsConfig *tls.Config) {
session := activate(sessID)
if session == nil {
log.OnEventf("Session not found for SessionID: %v", sessID)
Expand Down Expand Up @@ -46,11 +46,9 @@ func handleInitiatorConnection(address string, log Log, sessID SessionID, quit c
}

go readLoop(newParser(bufio.NewReader(netConn)), msgIn)
go func() {
writeLoop(netConn, msgOut)
netConn.Close()
}()
session.initiate(msgIn, msgOut, quit)
session.initiate(msgIn, msgOut)
writeLoop(netConn, msgOut)
netConn.Close()

reconnect:
log.OnEventf("%v Reconnecting in %v", sessID, reconnectInterval)
Expand All @@ -60,7 +58,7 @@ func handleInitiatorConnection(address string, log Log, sessID SessionID, quit c
}

//Picks up session from net.Conn Acceptor
func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionID]SessionID, log Log, quit chan bool) {
func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionID]SessionID, log Log) {
defer func() {
if err := recover(); err != nil {
log.OnEventf("Connection Terminated: %v", err)
Expand Down Expand Up @@ -119,7 +117,7 @@ func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionI
readLoop(parser, msgIn)
}()

go session.accept(msgIn, msgOut, quit)
session.accept(msgIn, msgOut)
writeLoop(netConn, msgOut)
}

Expand Down
11 changes: 3 additions & 8 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

type inSession struct {
}
type inSession struct{ loggedOn }

func (state inSession) String() string { return "In Session" }
func (state inSession) IsLoggedOn() bool { return true }
func (state inSession) String() string { return "In Session" }

func (state inSession) FixMsgIn(session *session, msg Message) sessionState {
var msgType FIXString
Expand Down Expand Up @@ -68,9 +66,6 @@ func (state inSession) Timeout(session *session, event internal.Event) (nextStat
session.log.OnEvent("Sent test request TEST")
session.peerTimer.Reset(time.Duration(int64(1.2 * float64(session.heartBeatTimeout))))
return pendingTimeout{state}
case internal.SessionExpire:
session.sendLogoutAndReset()
return handleDisconnectState(session)
}

return state
Expand Down Expand Up @@ -102,7 +97,7 @@ func (state inSession) handleLogout(session *session, msg Message) (nextState se
}
}

return handleDisconnectState(session)
return latentState{}
}

func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
Expand Down
19 changes: 10 additions & 9 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ func (s *InSessionTestSuite) TestIsLoggedOn() {
s.True(s.session.IsLoggedOn())
}

func (s *InSessionTestSuite) TestIsConnected() {
s.True(s.session.IsConnected())
}

func (s *InSessionTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}

func (s *InSessionTestSuite) TestLogout() {
s.mockApp.On("FromAdmin").Return(nil)
s.mockApp.On("ToAdmin")
Expand Down Expand Up @@ -92,24 +100,17 @@ func (s *InSessionTestSuite) TestDisconnected() {
s.State(latentState{})
}

func (s *InSessionTestSuite) TestTimeoutSessionExpire() {
func (s *InSessionTestSuite) TestShutdownNow() {
s.mockApp.On("FromApp").Return(nil)
s.FixMsgIn(s.session, s.NewOrderSingle())
s.mockApp.AssertExpectations(s.T())
s.session.store.IncrNextSenderMsgSeqNum()

s.mockApp.On("ToAdmin").Return(nil)
s.mockApp.On("OnLogout").Return(nil)
s.Timeout(s.session, internal.SessionExpire)
s.session.State.ShutdownNow(s.session)

s.mockApp.AssertExpectations(s.T())
s.LastToAdminMessageSent()
s.MessageType("5", s.mockApp.lastToAdmin)
s.FieldEquals(tagMsgSeqNum, 2, s.mockApp.lastToAdmin.Header)

s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
}
20 changes: 10 additions & 10 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ type Initiator struct {
//Start Initiator.
func (i *Initiator) Start() error {

i.quitChan = make(chan bool)

for sessionID, s := range i.sessionSettings {
socketConnectHost, err := s.Setting(config.SocketConnectHost)
if err != nil {
Expand All @@ -49,7 +47,7 @@ func (i *Initiator) Start() error {
return err
}
address := fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort)
go handleInitiatorConnection(address, i.globalLog, sessionID, i.quitChan, time.Duration(reconnectInterval)*time.Second, tlsConfig)
go handleInitiatorConnection(address, i.globalLog, sessionID, time.Duration(reconnectInterval)*time.Second, tlsConfig)
}

return nil
Expand All @@ -65,12 +63,14 @@ func (i *Initiator) Stop() {

//NewInitiator creates and initializes a new Initiator.
func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings *Settings, logFactory LogFactory) (*Initiator, error) {
i := new(Initiator)
i.app = app
i.storeFactory = storeFactory
i.settings = appSettings
i.sessionSettings = appSettings.SessionSettings()
i.logFactory = logFactory
i := &Initiator{
quitChan: make(chan bool),
app: app,
storeFactory: storeFactory,
settings: appSettings,
sessionSettings: appSettings.SessionSettings(),
logFactory: logFactory,
}

var err error
i.globalLog, err = logFactory.Create()
Expand All @@ -93,7 +93,7 @@ func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings
return nil, requiredConfigurationMissing(config.HeartBtInt)
}

err = createSession(sessionID, storeFactory, s, logFactory, app)
err = createSession(sessionID, storeFactory, s, logFactory, app, i.quitChan)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions internal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,4 @@ const (
LogonTimeout
//LogoutTimeout indicates the peer has not sent a logout request
LogoutTimeout
//SessionExpire indicates that the session has expired
SessionExpire
)
9 changes: 6 additions & 3 deletions latent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package quickfix

import "github.com/quickfixgo/quickfix/internal"

type latentState struct{}
type latentState struct{ inSessionTime }

func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }
func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }
func (state latentState) IsConnected() bool { return false }

func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
Expand All @@ -15,3 +16,5 @@ func (state latentState) FixMsgIn(session *session, msg Message) (nextState sess
func (state latentState) Timeout(*session, internal.Event) (nextState sessionState) {
return state
}

func (state latentState) ShutdownNow(*session) {}
32 changes: 32 additions & 0 deletions latent_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package quickfix

import (
"testing"

"github.com/stretchr/testify/suite"
)

type LatentStateTestSuite struct {
SessionSuite
}

func TestLatentStateTestSuite(t *testing.T) {
suite.Run(t, new(LatentStateTestSuite))
}

func (s *LatentStateTestSuite) SetupTest() {
s.Init()
s.session.State = latentState{}
}

func (s *LatentStateTestSuite) TestIsLoggedOn() {
s.False(s.session.IsLoggedOn())
}

func (s *LatentStateTestSuite) TestIsConnected() {
s.False(s.session.IsConnected())
}

func (s *LatentStateTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}
16 changes: 5 additions & 11 deletions logon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

type logonState struct{}
type logonState struct{ connectedNotLoggedOn }

func (s logonState) String() string { return "Logon State" }

func (s logonState) String() string { return "Logon State" }
func (s logonState) IsLoggedOn() bool { return false }
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
Expand All @@ -28,16 +28,10 @@ func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionSt

func (s logonState) Timeout(session *session, e internal.Event) (nextState sessionState) {
switch e {
default:
return s

case internal.LogonTimeout:
session.log.OnEvent("Timed out waiting for logon response")
case internal.SessionExpire:
if err := session.dropAndReset(); err != nil {
session.logError(err)
}
return latentState{}
}
return s

return handleDisconnectState(session)
}
37 changes: 10 additions & 27 deletions logon_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (s *LogonStateTestSuite) TestIsLoggedOn() {
s.False(s.session.IsLoggedOn())
}

func (s *LogonStateTestSuite) TestIsConnected() {
s.True(s.session.IsConnected())
}

func (s *LogonStateTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}

func (s *LogonStateTestSuite) TestTimeoutLogonTimeout() {
s.Timeout(s.session, internal.LogonTimeout)
s.State(latentState{})
Expand All @@ -50,40 +58,15 @@ func (s *LogonStateTestSuite) TestTimeoutNotLogonTimeout() {
}
}

func (s *LogonStateTestSuite) TestTimeoutSessionExpire() {
s.session.store.IncrNextTargetMsgSeqNum()
s.session.store.IncrNextSenderMsgSeqNum()

s.Timeout(s.session, internal.SessionExpire)

s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
func (s *LogonStateTestSuite) TestShutdownNow() {
s.session.State.ShutdownNow(s.session)
}

func (s *LogonStateTestSuite) TestDisconnected() {
s.session.Disconnected(s.session)
s.State(latentState{})
}

func (s *LogonStateTestSuite) TestTimeoutSessionExpireInitiateLogout() {
s.session.initiateLogon = true
s.session.store.IncrNextTargetMsgSeqNum()
s.session.store.IncrNextSenderMsgSeqNum()

s.mockApp.On("OnLogout")
s.Timeout(s.session, internal.SessionExpire)

s.mockApp.AssertExpectations(s.T())
s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
}

func (s *LogonStateTestSuite) TestFixMsgInNotLogon() {
s.FixMsgIn(s.session, s.NewOrderSingle())

Expand Down
15 changes: 4 additions & 11 deletions logout_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package quickfix

import "github.com/quickfixgo/quickfix/internal"

type logoutState struct{}
type logoutState struct{ connectedNotLoggedOn }

func (state logoutState) String() string { return "Logout State" }
func (state logoutState) IsLoggedOn() bool { return false }
func (state logoutState) String() string { return "Logout State" }

func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
nextState = inSession{}.FixMsgIn(session, msg)
Expand All @@ -18,16 +17,10 @@ func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sess

func (state logoutState) Timeout(session *session, event internal.Event) (nextState sessionState) {
switch event {
default:
return state

case internal.LogoutTimeout:
session.log.OnEvent("Timed out waiting for logout response")
case internal.SessionExpire:
if err := session.dropAndReset(); err != nil {
session.logError(err)
}
return latentState{}
}

return handleDisconnectState(session)
return state
}

0 comments on commit e319b76

Please sign in to comment.