Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lint:
golint .

test:
go test -v . ./datadictionary
go test -v -cover . ./datadictionary

_build_all:
go build -v ./...
Expand Down
2 changes: 1 addition & 1 deletion acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (a *Acceptor) Stop() {
_ = recover() // suppress sending on closed channel error
}()
for _, channel := range a.quitChans {
channel <- true
close(channel)
}
}

Expand Down
85 changes: 34 additions & 51 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,43 @@ package quickfix

import (
"bufio"
"io"
"net"
"time"
)

//Picks up session from net.Conn Initiator
func handleInitiatorConnection(netConn net.Conn, log Log, sessID SessionID, quit chan bool) {
defer func() {
if err := recover(); err != nil {
log.OnEventf("Connection Terminated: %v", err)
}

netConn.Close()
}()

func handleInitiatorConnection(address string, log Log, sessID SessionID, quit chan bool) {
reconnectInterval := 30 * time.Second
session := activate(sessID)
if session == nil {
log.OnEventf("Session not found for SessionID: %v", sessID)
return
}
defer func() {
deactivate(sessID)
}()

var msgOut chan []byte
var err error
if msgOut, err = session.initiate(); err != nil {
log.OnEventf("Session cannot initiate: %v", err)
return
}
defer deactivate(sessID)

reader := bufio.NewReader(netConn)
parser := newParser(reader)
for {
msgIn := make(chan fixIn)
msgOut := make(chan []byte)

msgIn := make(chan fixIn)
go writeLoop(netConn, msgOut)
go func() {
readLoop(parser, msgIn)
}()
netConn, err := net.Dial("tcp", address)
if err != nil {
goto reconnect
}

session.run(msgIn, quit)
go readLoop(newParser(bufio.NewReader(netConn)), msgIn)
go func() {
writeLoop(netConn, msgOut)
netConn.Close()
}()
session.initiate(msgIn, msgOut, quit)

reconnect:
log.OnEventf("%v Reconnecting in %v", sessID, reconnectInterval)
time.Sleep(reconnectInterval)
continue
}
}

//Picks up session from net.Conn Acceptor
Expand Down Expand Up @@ -96,32 +94,25 @@ func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionI
deactivate(qualifiedSessID)
}()

var msgOut chan []byte
if msgOut, err = session.accept(); err != nil {
log.OnEventf("Session cannot accept: %v", err)
return
}

msgIn := make(chan fixIn)
go writeLoop(netConn, msgOut)
msgOut := make(chan []byte)

go func() {
msgIn <- fixIn{msgBytes, parser.lastRead}
readLoop(parser, msgIn)
}()

session.run(msgIn, quit)
go session.accept(msgIn, msgOut, quit)
writeLoop(netConn, msgOut)
}

func writeLoop(connection net.Conn, messageOut chan []byte) {
defer func() {
close(messageOut)
}()

var msg []byte
func writeLoop(connection io.Writer, messageOut chan []byte) {
for {
if msg = <-messageOut; msg == nil {
msg, ok := <-messageOut
if !ok {
return
}

connection.Write(msg)
}
}
Expand All @@ -133,17 +124,9 @@ func readLoop(parser *parser, msgIn chan fixIn) {

for {
msg, err := parser.ReadMessage()

if err != nil {
switch err.(type) {
//ignore message parser errors
case parseError:
continue
default:
return
}
} else {
msgIn <- fixIn{msg, parser.lastRead}
return
}
msgIn <- fixIn{msg, parser.lastRead}
}
}
58 changes: 58 additions & 0 deletions connection_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package quickfix

import (
"bytes"
"strings"
"testing"
)

func TestWriteLoop(t *testing.T) {
writer := bytes.NewBufferString("")
msgOut := make(chan []byte)

go func() {
msgOut <- []byte("test msg 1 ")
msgOut <- []byte("test msg 2 ")
msgOut <- []byte("test msg 3")
close(msgOut)
}()
writeLoop(writer, msgOut)

expected := "test msg 1 test msg 2 test msg 3"

if writer.String() != expected {
t.Errorf("expected %v got %v", expected, writer.String())
}
}

func TestReadLoop(t *testing.T) {
msgIn := make(chan fixIn)
stream := "hello8=FIX.4.09=5blah10=103garbage8=FIX.4.09=4foo10=103"

parser := newParser(strings.NewReader(stream))
go readLoop(parser, msgIn)

var tests = []struct {
expectedMsg string
channelClosed bool
}{
{expectedMsg: "8=FIX.4.09=5blah10=103"},
{expectedMsg: "8=FIX.4.09=4foo10=103"},
{channelClosed: true},
}

for _, test := range tests {
msg, ok := <-msgIn
switch {
case !ok && !test.channelClosed:
t.Error("Channel unexpectedly closed")
fallthrough
case !ok && test.channelClosed:
continue
}

if string(msg.bytes) != test.expectedMsg {
t.Errorf("Expected %v got %v", test.expectedMsg, string(msg.bytes))
}
}
}
11 changes: 3 additions & 8 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package quickfix
import (
"fmt"
"github.com/quickfixgo/quickfix/config"
"net"
)

//Initiator initiates connections and processes messages for all sessions.
Expand Down Expand Up @@ -31,13 +30,9 @@ func (i *Initiator) Start() error {
return fmt.Errorf("error on SocketConnectPort: %v", err)
}

conn, err := net.Dial("tcp", fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort))
if err != nil {
return err
}

i.quitChans[sessionID] = make(chan bool)
go handleInitiatorConnection(conn, i.globalLog, sessionID, i.quitChans[sessionID])
address := fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort)
go handleInitiatorConnection(address, i.globalLog, sessionID, i.quitChans[sessionID])
}

return nil
Expand All @@ -49,7 +44,7 @@ func (i *Initiator) Stop() {
_ = recover() // suppress sending on closed channel error
}()
for _, channel := range i.quitChans {
channel <- true
close(channel)
}
}

Expand Down
18 changes: 9 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,21 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
return nil
}

func (s *Session) initiate() (chan []byte, error) {
//kicks off session as an initiator
func (s *Session) initiate(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
s.currentState = logonState{}
s.messageOut = make(chan []byte)
s.messageStash = make(map[int]Message)
s.initiateLogon = true

return s.messageOut, nil
s.run(msgIn, msgOut, quit)
}

func (s *Session) accept() (chan []byte, error) {
//kicks off session as an acceptor
func (s *Session) accept(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
s.currentState = logonState{}
s.messageOut = make(chan []byte)
s.messageStash = make(map[int]Message)

return s.messageOut, nil
s.run(msgIn, msgOut, quit)
}

func (s *Session) onDisconnect() {
Expand Down Expand Up @@ -467,10 +467,10 @@ type fixIn struct {
receiveTime time.Time
}

func (s *Session) run(msgIn chan fixIn, quit chan bool) {
func (s *Session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
s.messageOut = msgOut
defer func() {
close(quit)
s.messageOut <- nil
close(s.messageOut)
s.onDisconnect()
}()

Expand Down