Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session time #200

Merged
merged 2 commits into from
Aug 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 10 additions & 9 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func (a *Acceptor) Start() error {

connections := a.listenForConnections(server)

a.quitChan = make(chan bool)
go func() {
for cxn := range connections {
go handleAcceptorConnection(cxn, a.qualifiedSessionIDs, a.globalLog, a.quitChan)
go handleAcceptorConnection(cxn, a.qualifiedSessionIDs, a.globalLog)
}
}()

Expand All @@ -66,12 +65,14 @@ func (a *Acceptor) Stop() {

//NewAcceptor creates and initializes a new Acceptor.
func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Settings, logFactory LogFactory) (*Acceptor, error) {
a := new(Acceptor)
a.app = app
a.storeFactory = storeFactory
a.settings = settings
a.logFactory = logFactory
a.qualifiedSessionIDs = make(map[SessionID]SessionID)
a := &Acceptor{
quitChan: make(chan bool),
app: app,
storeFactory: storeFactory,
settings: settings,
logFactory: logFactory,
qualifiedSessionIDs: make(map[SessionID]SessionID),
}

var err error
a.globalLog, err = logFactory.Create()
Expand All @@ -91,7 +92,7 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se
}
a.qualifiedSessionIDs[unqualifiedSessionID] = sessionID

if err = createSession(sessionID, storeFactory, sessionSettings, logFactory, app); err != nil {
if err = createSession(sessionID, storeFactory, sessionSettings, logFactory, app, a.quitChan); err != nil {
return nil, err
}
}
Expand Down
14 changes: 6 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

//Picks up session from net.Conn Initiator
func handleInitiatorConnection(address string, log Log, sessID SessionID, quit chan bool, reconnectInterval time.Duration, tlsConfig *tls.Config) {
func handleInitiatorConnection(address string, log Log, sessID SessionID, reconnectInterval time.Duration, tlsConfig *tls.Config) {
session := activate(sessID)
if session == nil {
log.OnEventf("Session not found for SessionID: %v", sessID)
Expand Down Expand Up @@ -46,11 +46,9 @@ func handleInitiatorConnection(address string, log Log, sessID SessionID, quit c
}

go readLoop(newParser(bufio.NewReader(netConn)), msgIn)
go func() {
writeLoop(netConn, msgOut)
netConn.Close()
}()
session.initiate(msgIn, msgOut, quit)
session.initiate(msgIn, msgOut)
writeLoop(netConn, msgOut)
netConn.Close()

reconnect:
log.OnEventf("%v Reconnecting in %v", sessID, reconnectInterval)
Expand All @@ -60,7 +58,7 @@ func handleInitiatorConnection(address string, log Log, sessID SessionID, quit c
}

//Picks up session from net.Conn Acceptor
func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionID]SessionID, log Log, quit chan bool) {
func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionID]SessionID, log Log) {
defer func() {
if err := recover(); err != nil {
log.OnEventf("Connection Terminated: %v", err)
Expand Down Expand Up @@ -119,7 +117,7 @@ func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionI
readLoop(parser, msgIn)
}()

go session.accept(msgIn, msgOut, quit)
session.accept(msgIn, msgOut)
writeLoop(netConn, msgOut)
}

Expand Down
11 changes: 3 additions & 8 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

type inSession struct {
}
type inSession struct{ loggedOn }

func (state inSession) String() string { return "In Session" }
func (state inSession) IsLoggedOn() bool { return true }
func (state inSession) String() string { return "In Session" }

func (state inSession) FixMsgIn(session *session, msg Message) sessionState {
var msgType FIXString
Expand Down Expand Up @@ -68,9 +66,6 @@ func (state inSession) Timeout(session *session, event internal.Event) (nextStat
session.log.OnEvent("Sent test request TEST")
session.peerTimer.Reset(time.Duration(int64(1.2 * float64(session.heartBeatTimeout))))
return pendingTimeout{state}
case internal.SessionExpire:
session.sendLogoutAndReset()
return handleDisconnectState(session)
}

return state
Expand Down Expand Up @@ -102,7 +97,7 @@ func (state inSession) handleLogout(session *session, msg Message) (nextState se
}
}

return handleDisconnectState(session)
return latentState{}
}

func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
Expand Down
19 changes: 10 additions & 9 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ func (s *InSessionTestSuite) TestIsLoggedOn() {
s.True(s.session.IsLoggedOn())
}

func (s *InSessionTestSuite) TestIsConnected() {
s.True(s.session.IsConnected())
}

func (s *InSessionTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}

func (s *InSessionTestSuite) TestLogout() {
s.mockApp.On("FromAdmin").Return(nil)
s.mockApp.On("ToAdmin")
Expand Down Expand Up @@ -92,24 +100,17 @@ func (s *InSessionTestSuite) TestDisconnected() {
s.State(latentState{})
}

func (s *InSessionTestSuite) TestTimeoutSessionExpire() {
func (s *InSessionTestSuite) TestShutdownNow() {
s.mockApp.On("FromApp").Return(nil)
s.FixMsgIn(s.session, s.NewOrderSingle())
s.mockApp.AssertExpectations(s.T())
s.session.store.IncrNextSenderMsgSeqNum()

s.mockApp.On("ToAdmin").Return(nil)
s.mockApp.On("OnLogout").Return(nil)
s.Timeout(s.session, internal.SessionExpire)
s.session.State.ShutdownNow(s.session)

s.mockApp.AssertExpectations(s.T())
s.LastToAdminMessageSent()
s.MessageType("5", s.mockApp.lastToAdmin)
s.FieldEquals(tagMsgSeqNum, 2, s.mockApp.lastToAdmin.Header)

s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
}
20 changes: 10 additions & 10 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ type Initiator struct {
//Start Initiator.
func (i *Initiator) Start() error {

i.quitChan = make(chan bool)

for sessionID, s := range i.sessionSettings {
socketConnectHost, err := s.Setting(config.SocketConnectHost)
if err != nil {
Expand All @@ -49,7 +47,7 @@ func (i *Initiator) Start() error {
return err
}
address := fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort)
go handleInitiatorConnection(address, i.globalLog, sessionID, i.quitChan, time.Duration(reconnectInterval)*time.Second, tlsConfig)
go handleInitiatorConnection(address, i.globalLog, sessionID, time.Duration(reconnectInterval)*time.Second, tlsConfig)
}

return nil
Expand All @@ -65,12 +63,14 @@ func (i *Initiator) Stop() {

//NewInitiator creates and initializes a new Initiator.
func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings *Settings, logFactory LogFactory) (*Initiator, error) {
i := new(Initiator)
i.app = app
i.storeFactory = storeFactory
i.settings = appSettings
i.sessionSettings = appSettings.SessionSettings()
i.logFactory = logFactory
i := &Initiator{
quitChan: make(chan bool),
app: app,
storeFactory: storeFactory,
settings: appSettings,
sessionSettings: appSettings.SessionSettings(),
logFactory: logFactory,
}

var err error
i.globalLog, err = logFactory.Create()
Expand All @@ -93,7 +93,7 @@ func NewInitiator(app Application, storeFactory MessageStoreFactory, appSettings
return nil, requiredConfigurationMissing(config.HeartBtInt)
}

err = createSession(sessionID, storeFactory, s, logFactory, app)
err = createSession(sessionID, storeFactory, s, logFactory, app, i.quitChan)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions internal/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,4 @@ const (
LogonTimeout
//LogoutTimeout indicates the peer has not sent a logout request
LogoutTimeout
//SessionExpire indicates that the session has expired
SessionExpire
)
9 changes: 6 additions & 3 deletions latent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package quickfix

import "github.com/quickfixgo/quickfix/internal"

type latentState struct{}
type latentState struct{ inSessionTime }

func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }
func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }
func (state latentState) IsConnected() bool { return false }

func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
Expand All @@ -15,3 +16,5 @@ func (state latentState) FixMsgIn(session *session, msg Message) (nextState sess
func (state latentState) Timeout(*session, internal.Event) (nextState sessionState) {
return state
}

func (state latentState) ShutdownNow(*session) {}
32 changes: 32 additions & 0 deletions latent_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package quickfix

import (
"testing"

"github.com/stretchr/testify/suite"
)

type LatentStateTestSuite struct {
SessionSuite
}

func TestLatentStateTestSuite(t *testing.T) {
suite.Run(t, new(LatentStateTestSuite))
}

func (s *LatentStateTestSuite) SetupTest() {
s.Init()
s.session.State = latentState{}
}

func (s *LatentStateTestSuite) TestIsLoggedOn() {
s.False(s.session.IsLoggedOn())
}

func (s *LatentStateTestSuite) TestIsConnected() {
s.False(s.session.IsConnected())
}

func (s *LatentStateTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}
16 changes: 5 additions & 11 deletions logon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

type logonState struct{}
type logonState struct{ connectedNotLoggedOn }

func (s logonState) String() string { return "Logon State" }

func (s logonState) String() string { return "Logon State" }
func (s logonState) IsLoggedOn() bool { return false }
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
Expand All @@ -28,16 +28,10 @@ func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionSt

func (s logonState) Timeout(session *session, e internal.Event) (nextState sessionState) {
switch e {
default:
return s

case internal.LogonTimeout:
session.log.OnEvent("Timed out waiting for logon response")
case internal.SessionExpire:
if err := session.dropAndReset(); err != nil {
session.logError(err)
}
return latentState{}
}
return s

return handleDisconnectState(session)
}
37 changes: 10 additions & 27 deletions logon_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (s *LogonStateTestSuite) TestIsLoggedOn() {
s.False(s.session.IsLoggedOn())
}

func (s *LogonStateTestSuite) TestIsConnected() {
s.True(s.session.IsConnected())
}

func (s *LogonStateTestSuite) TestIsSessionTime() {
s.True(s.session.IsSessionTime())
}

func (s *LogonStateTestSuite) TestTimeoutLogonTimeout() {
s.Timeout(s.session, internal.LogonTimeout)
s.State(latentState{})
Expand All @@ -50,40 +58,15 @@ func (s *LogonStateTestSuite) TestTimeoutNotLogonTimeout() {
}
}

func (s *LogonStateTestSuite) TestTimeoutSessionExpire() {
s.session.store.IncrNextTargetMsgSeqNum()
s.session.store.IncrNextSenderMsgSeqNum()

s.Timeout(s.session, internal.SessionExpire)

s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
func (s *LogonStateTestSuite) TestShutdownNow() {
s.session.State.ShutdownNow(s.session)
}

func (s *LogonStateTestSuite) TestDisconnected() {
s.session.Disconnected(s.session)
s.State(latentState{})
}

func (s *LogonStateTestSuite) TestTimeoutSessionExpireInitiateLogout() {
s.session.initiateLogon = true
s.session.store.IncrNextTargetMsgSeqNum()
s.session.store.IncrNextSenderMsgSeqNum()

s.mockApp.On("OnLogout")
s.Timeout(s.session, internal.SessionExpire)

s.mockApp.AssertExpectations(s.T())
s.State(latentState{})
s.NextTargetMsgSeqNum(1)
s.NextSenderMsgSeqNum(1)
s.NoMessageSent()
s.NoMessageQueued()
}

func (s *LogonStateTestSuite) TestFixMsgInNotLogon() {
s.FixMsgIn(s.session, s.NewOrderSingle())

Expand Down
15 changes: 4 additions & 11 deletions logout_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package quickfix

import "github.com/quickfixgo/quickfix/internal"

type logoutState struct{}
type logoutState struct{ connectedNotLoggedOn }

func (state logoutState) String() string { return "Logout State" }
func (state logoutState) IsLoggedOn() bool { return false }
func (state logoutState) String() string { return "Logout State" }

func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
nextState = inSession{}.FixMsgIn(session, msg)
Expand All @@ -18,16 +17,10 @@ func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sess

func (state logoutState) Timeout(session *session, event internal.Event) (nextState sessionState) {
switch event {
default:
return state

case internal.LogoutTimeout:
session.log.OnEvent("Timed out waiting for logout response")
case internal.SessionExpire:
if err := session.dropAndReset(); err != nil {
session.logError(err)
}
return latentState{}
}

return handleDisconnectState(session)
return state
}