diff --git a/Makefile b/Makefile index fa2dfff5f..124f489f7 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ lint: golint . test: - go test -v . ./datadictionary + go test -v -cover . ./datadictionary _build_all: go build -v ./... diff --git a/acceptor.go b/acceptor.go index d2e7a0c94..9cd4750e8 100644 --- a/acceptor.go +++ b/acceptor.go @@ -56,7 +56,7 @@ func (a *Acceptor) Stop() { _ = recover() // suppress sending on closed channel error }() for _, channel := range a.quitChans { - channel <- true + close(channel) } } diff --git a/connection.go b/connection.go index 8164fcab6..c601b1fe0 100644 --- a/connection.go +++ b/connection.go @@ -2,45 +2,43 @@ package quickfix import ( "bufio" + "io" "net" + "time" ) //Picks up session from net.Conn Initiator -func handleInitiatorConnection(netConn net.Conn, log Log, sessID SessionID, quit chan bool) { - defer func() { - if err := recover(); err != nil { - log.OnEventf("Connection Terminated: %v", err) - } - - netConn.Close() - }() - +func handleInitiatorConnection(address string, log Log, sessID SessionID, quit chan bool) { + reconnectInterval := 30 * time.Second session := activate(sessID) if session == nil { log.OnEventf("Session not found for SessionID: %v", sessID) return } - defer func() { - deactivate(sessID) - }() - var msgOut chan []byte - var err error - if msgOut, err = session.initiate(); err != nil { - log.OnEventf("Session cannot initiate: %v", err) - return - } + defer deactivate(sessID) - reader := bufio.NewReader(netConn) - parser := newParser(reader) + for { + msgIn := make(chan fixIn) + msgOut := make(chan []byte) - msgIn := make(chan fixIn) - go writeLoop(netConn, msgOut) - go func() { - readLoop(parser, msgIn) - }() + netConn, err := net.Dial("tcp", address) + if err != nil { + goto reconnect + } - session.run(msgIn, quit) + go readLoop(newParser(bufio.NewReader(netConn)), msgIn) + go func() { + writeLoop(netConn, msgOut) + netConn.Close() + }() + session.initiate(msgIn, msgOut, quit) + + reconnect: + log.OnEventf("%v Reconnecting in %v", sessID, reconnectInterval) + time.Sleep(reconnectInterval) + continue + } } //Picks up session from net.Conn Acceptor @@ -96,32 +94,25 @@ func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionI deactivate(qualifiedSessID) }() - var msgOut chan []byte - if msgOut, err = session.accept(); err != nil { - log.OnEventf("Session cannot accept: %v", err) - return - } - msgIn := make(chan fixIn) - go writeLoop(netConn, msgOut) + msgOut := make(chan []byte) + go func() { msgIn <- fixIn{msgBytes, parser.lastRead} readLoop(parser, msgIn) }() - session.run(msgIn, quit) + go session.accept(msgIn, msgOut, quit) + writeLoop(netConn, msgOut) } -func writeLoop(connection net.Conn, messageOut chan []byte) { - defer func() { - close(messageOut) - }() - - var msg []byte +func writeLoop(connection io.Writer, messageOut chan []byte) { for { - if msg = <-messageOut; msg == nil { + msg, ok := <-messageOut + if !ok { return } + connection.Write(msg) } } @@ -133,17 +124,9 @@ func readLoop(parser *parser, msgIn chan fixIn) { for { msg, err := parser.ReadMessage() - if err != nil { - switch err.(type) { - //ignore message parser errors - case parseError: - continue - default: - return - } - } else { - msgIn <- fixIn{msg, parser.lastRead} + return } + msgIn <- fixIn{msg, parser.lastRead} } } diff --git a/connection_internal_test.go b/connection_internal_test.go new file mode 100644 index 000000000..ee3757d81 --- /dev/null +++ b/connection_internal_test.go @@ -0,0 +1,58 @@ +package quickfix + +import ( + "bytes" + "strings" + "testing" +) + +func TestWriteLoop(t *testing.T) { + writer := bytes.NewBufferString("") + msgOut := make(chan []byte) + + go func() { + msgOut <- []byte("test msg 1 ") + msgOut <- []byte("test msg 2 ") + msgOut <- []byte("test msg 3") + close(msgOut) + }() + writeLoop(writer, msgOut) + + expected := "test msg 1 test msg 2 test msg 3" + + if writer.String() != expected { + t.Errorf("expected %v got %v", expected, writer.String()) + } +} + +func TestReadLoop(t *testing.T) { + msgIn := make(chan fixIn) + stream := "hello8=FIX.4.09=5blah10=103garbage8=FIX.4.09=4foo10=103" + + parser := newParser(strings.NewReader(stream)) + go readLoop(parser, msgIn) + + var tests = []struct { + expectedMsg string + channelClosed bool + }{ + {expectedMsg: "8=FIX.4.09=5blah10=103"}, + {expectedMsg: "8=FIX.4.09=4foo10=103"}, + {channelClosed: true}, + } + + for _, test := range tests { + msg, ok := <-msgIn + switch { + case !ok && !test.channelClosed: + t.Error("Channel unexpectedly closed") + fallthrough + case !ok && test.channelClosed: + continue + } + + if string(msg.bytes) != test.expectedMsg { + t.Errorf("Expected %v got %v", test.expectedMsg, string(msg.bytes)) + } + } +} diff --git a/initiator.go b/initiator.go index 0f10795ba..dc3bc5598 100644 --- a/initiator.go +++ b/initiator.go @@ -3,7 +3,6 @@ package quickfix import ( "fmt" "github.com/quickfixgo/quickfix/config" - "net" ) //Initiator initiates connections and processes messages for all sessions. @@ -31,13 +30,9 @@ func (i *Initiator) Start() error { return fmt.Errorf("error on SocketConnectPort: %v", err) } - conn, err := net.Dial("tcp", fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort)) - if err != nil { - return err - } - i.quitChans[sessionID] = make(chan bool) - go handleInitiatorConnection(conn, i.globalLog, sessionID, i.quitChans[sessionID]) + address := fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort) + go handleInitiatorConnection(address, i.globalLog, sessionID, i.quitChans[sessionID]) } return nil @@ -49,7 +44,7 @@ func (i *Initiator) Stop() { _ = recover() // suppress sending on closed channel error }() for _, channel := range i.quitChans { - channel <- true + close(channel) } } diff --git a/session.go b/session.go index eb818f613..b9e9407f6 100644 --- a/session.go +++ b/session.go @@ -107,21 +107,21 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin return nil } -func (s *Session) initiate() (chan []byte, error) { +//kicks off session as an initiator +func (s *Session) initiate(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.currentState = logonState{} - s.messageOut = make(chan []byte) s.messageStash = make(map[int]Message) s.initiateLogon = true - return s.messageOut, nil + s.run(msgIn, msgOut, quit) } -func (s *Session) accept() (chan []byte, error) { +//kicks off session as an acceptor +func (s *Session) accept(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { s.currentState = logonState{} - s.messageOut = make(chan []byte) s.messageStash = make(map[int]Message) - return s.messageOut, nil + s.run(msgIn, msgOut, quit) } func (s *Session) onDisconnect() { @@ -467,10 +467,10 @@ type fixIn struct { receiveTime time.Time } -func (s *Session) run(msgIn chan fixIn, quit chan bool) { +func (s *Session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) { + s.messageOut = msgOut defer func() { - close(quit) - s.messageOut <- nil + close(s.messageOut) s.onDisconnect() }()