Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ func writeLoop(connection io.Writer, messageOut chan []byte) {
}

func readLoop(parser *parser, msgIn chan fixIn) {
defer func() {
close(msgIn)
}()
defer close(msgIn)

for {
msg, err := parser.ReadMessage()
Expand Down
116 changes: 56 additions & 60 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,65 @@ type inSession struct {
func (state inSession) String() string { return "In Session" }
func (state inSession) IsLoggedOn() bool { return true }

func (state inSession) VerifyMsgIn(session *session, msg Message) (err MessageRejectError) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
switch string(msgType) {
case enum.MsgType_LOGON:
return session.verifyLogon(msg)
case enum.MsgType_LOGOUT:
return nil
case enum.MsgType_RESEND_REQUEST:
return session.verifyIgnoreSeqNumTooHighOrLow(msg)
case enum.MsgType_SEQUENCE_RESET:
var gapFillFlag FIXBoolean
msg.Body.GetField(tagGapFillFlag, &gapFillFlag)
return session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag))
default:
return session.verify(msg)
}
}
return nil
}

func (state inSession) FixMsgIn(session *session, msg Message) (nextState sessionState) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
switch string(msgType) {
//logon
case "A":
return state.handleLogon(session, msg)
//logout
case "5":
return state.handleLogout(session, msg)
//test request
case "1":
case enum.MsgType_LOGON:
session.handleLogon(msg)
return state
case enum.MsgType_LOGOUT:
session.log.OnEvent("Received logout request")
session.log.OnEvent("Sending logout response")
state.generateLogout(session)
return latentState{}
case enum.MsgType_TEST_REQUEST:
return state.handleTestRequest(session, msg)
//resend request
case "2":
case enum.MsgType_RESEND_REQUEST:
return state.handleResendRequest(session, msg)
//sequence reset
case "4":
case enum.MsgType_SEQUENCE_RESET:
return state.handleSequenceReset(session, msg)
default:
if err := session.verify(msg); err != nil {
return state.processReject(session, msg, err)
}
}
}

session.store.IncrNextTargetMsgSeqNum()

return state
}

func (state inSession) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil {
switch string(msgType) {
case enum.MsgType_LOGON:
return state.initiateLogout(session, "")
case enum.MsgType_LOGOUT:
return latentState{}
}
}
return state.processReject(session, msg, rej)
}

func (state inSession) Timeout(session *session, event event) (nextState sessionState) {
switch event {
case needHeartbeat:
Expand All @@ -60,30 +88,22 @@ func (state inSession) Timeout(session *session, event event) (nextState session
return state
}

func (state inSession) handleLogon(session *session, msg Message) (nextState sessionState) {
if err := session.handleLogon(msg); err != nil {
return state.initiateLogout(session, "")
func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
var testReq FIXString
if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil {
session.log.OnEvent("Test Request with no testRequestID")
} else {
heartBt := NewMessage()
heartBt.Header.SetField(tagMsgType, FIXString("0"))
heartBt.Body.SetField(tagTestReqID, testReq)
session.send(heartBt)
}

session.store.IncrNextTargetMsgSeqNum()
return state
}

func (state inSession) handleLogout(session *session, msg Message) (nextState sessionState) {
session.log.OnEvent("Received logout request")
session.log.OnEvent("Sending logout response")

state.generateLogout(session)
return latentState{}
}

func (state inSession) handleSequenceReset(session *session, msg Message) (nextState sessionState) {
var gapFillFlag FIXBoolean
msg.Body.GetField(tagGapFillFlag, &gapFillFlag)

if err := session.verifySelect(msg, bool(gapFillFlag), bool(gapFillFlag)); err != nil {
return state.processReject(session, msg, err)
}

var newSeqNo FIXInt
if err := msg.Body.GetField(tagNewSeqNo, &newSeqNo); err == nil {
expectedSeqNum := FIXInt(session.store.NextTargetMsgSeqNum())
Expand All @@ -97,15 +117,10 @@ func (state inSession) handleSequenceReset(session *session, msg Message) (nextS
session.doReject(msg, valueIsIncorrectNoTag())
}
}

return state
}

func (state inSession) handleResendRequest(session *session, msg Message) (nextState sessionState) {
if err := session.verifyIgnoreSeqNumTooHighOrLow(msg); err != nil {
return state.processReject(session, msg, err)
}

var err error
var beginSeqNoField FIXInt
if err = msg.Body.GetField(tagBeginSeqNo, &beginSeqNoField); err != nil {
Expand Down Expand Up @@ -167,26 +182,6 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
}
}

func (state inSession) handleTestRequest(session *session, msg Message) (nextState sessionState) {
if err := session.verify(msg); err != nil {
return state.processReject(session, msg, err)
}

var testReq FIXString
if err := msg.Body.GetField(tagTestReqID, &testReq); err != nil {
session.log.OnEvent("Test Request with no testRequestID")
} else {
heartBt := NewMessage()
heartBt.Header.SetField(tagMsgType, FIXString("0"))
heartBt.Body.SetField(tagTestReqID, testReq)
session.send(heartBt)
}

session.store.IncrNextTargetMsgSeqNum()

return state
}

func (state inSession) processReject(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
switch TypedError := rej.(type) {
case targetTooHigh:
Expand All @@ -197,6 +192,7 @@ func (state inSession) processReject(session *session, msg Message, rej MessageR
case resendState:
//assumes target too high reject already sent
}

session.messageStash[TypedError.ReceivedTarget] = msg
return resendState{}

Expand Down
8 changes: 8 additions & 0 deletions latent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@ type latentState struct{}
func (state latentState) String() string { return "Latent State" }
func (state latentState) IsLoggedOn() bool { return false }

func (state latentState) VerifyMsgIn(session *session, msg Message) MessageRejectError {
return InvalidMessageType()
}

func (state latentState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
session.log.OnEventf("Invalid Session State: Unexpected Msg %v while in Latent state", msg)
return state
}

func (state latentState) FixMsgInRej(session *session, msg Message, err MessageRejectError) (nextState sessionState) {
return state.FixMsgIn(session, msg)
}

func (state latentState) Timeout(*session, event) (nextState sessionState) {
return state
}
30 changes: 24 additions & 6 deletions logon_state.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,40 @@
package quickfix

import "github.com/quickfixgo/quickfix/enum"

type logonState struct{}

func (state logonState) String() string { return "Logon State" }
func (s logonState) IsLoggedOn() bool { return false }

func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
func (s logonState) VerifyMsgIn(session *session, msg Message) MessageRejectError {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err == nil && string(msgType) == "A" {
if err := session.handleLogon(msg); err != nil {
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
return RequiredTagMissing(tagMsgType)
}

switch string(msgType) {
case enum.MsgType_LOGON:
err := session.verifyLogon(msg)
if err != nil {
session.log.OnEvent(err.Error())
return latentState{}
}
return err
default:
session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg)
return InvalidMessageType()
}
}

return inSession{}
func (s logonState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
if err := session.handleLogon(msg); err != nil {
session.log.OnEvent(err.Error())
return latentState{}
}
return inSession{}
}

session.log.OnEventf("Invalid Session State: Received Msg %v while waiting for Logon", msg)
func (s logonState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState {
return latentState{}
}

Expand Down
14 changes: 10 additions & 4 deletions logout_state.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
package quickfix

type logoutState struct {
}
import "github.com/quickfixgo/quickfix/enum"

type logoutState struct{}

func (state logoutState) String() string { return "Logout State" }
func (s logoutState) IsLoggedOn() bool { return false }

func (state logoutState) VerifyMsgIn(session *session, msg Message) MessageRejectError { return nil }

func (state logoutState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
var msgType FIXString
if err := msg.Header.GetField(tagMsgType, &msgType); err != nil {
return latentState{}
}

switch string(msgType) {
//logout
case "5":
case enum.MsgType_LOGOUT:
session.log.OnEvent("Received logout response")
return latentState{}
default:
return state
}
}

func (state logoutState) FixMsgInRej(session *session, msg Message, err MessageRejectError) sessionState {
return state.FixMsgIn(session, msg)
}

func (state logoutState) Timeout(session *session, event event) (nextState sessionState) {
switch event {
case logoutTimeout:
Expand Down
8 changes: 3 additions & 5 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
return err
}

//NOTE: must queue for send here. otherwise, if not executed in same goroutine as session run loop,
//message may be sent on closed channel or sent outside of valid state
session.queueForSend(msg)

return nil
request := sendRequest{msg, make(chan error)}
session.toSend <- request
return <-request.err
}

type sessionActivate struct {
Expand Down
24 changes: 15 additions & 9 deletions resend_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ type resendState struct {
inSession
}

func (state resendState) String() string { return "Resend" }

func (state resendState) FixMsgIn(session *session, msg Message) (nextState sessionState) {
for ok := true; ok; {
nextState = state.inSession.FixMsgIn(session, msg)
return state.handleNextState(session, state.inSession.FixMsgIn(session, msg))
}

if !nextState.IsLoggedOn() {
return
}
func (state resendState) FixMsgInRej(session *session, msg Message, rej MessageRejectError) (nextState sessionState) {
return state.handleNextState(session, state.inSession.FixMsgInRej(session, msg, rej))
}

msg, ok = session.messageStash[session.store.NextTargetMsgSeqNum()]
func (state resendState) handleNextState(session *session, nextState sessionState) sessionState {
if !nextState.IsLoggedOn() || len(session.messageStash) == 0 {
return nextState
}

if len(session.messageStash) != 0 {
nextState = resendState{}
targetSeqNum := session.store.NextTargetMsgSeqNum()
if msg, ok := session.messageStash[targetSeqNum]; ok {
delete(session.messageStash, targetSeqNum)
session.resendIn <- msg
}

return
return resendState{}
}
Loading