Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions file_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type fileLog struct {
messageLogger *log.Logger
}

func (l fileLog) OnIncoming(msg string) {
func (l fileLog) OnIncoming(msg []byte) {
l.messageLogger.Print(msg)
}

func (l fileLog) OnOutgoing(msg string) {
func (l fileLog) OnOutgoing(msg []byte) {
l.messageLogger.Print(msg)
}

Expand Down
4 changes: 2 additions & 2 deletions file_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestFileLog_Append(t *testing.T) {
messageScanner := bufio.NewScanner(messageLogFile)
eventScanner := bufio.NewScanner(eventLogFile)

helper.Log.OnIncoming("incoming")
helper.Log.OnIncoming([]byte("incoming"))
if !messageScanner.Scan() {
t.Error("Unexpected EOF")
}
Expand All @@ -118,7 +118,7 @@ func TestFileLog_Append(t *testing.T) {
}

newHelper := newFileLogHelper(t)
newHelper.Log.OnIncoming("incoming")
newHelper.Log.OnIncoming([]byte("incoming"))
if !messageScanner.Scan() {
t.Error("Unexpected EOF")
}
Expand Down
11 changes: 3 additions & 8 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,8 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
}

session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
if _, err := msg.Build(); err != nil {
return err
}
session.sendBytes(msg.rawMessage)
msgBytes = msg.build()
session.sendBytes(msgBytes)

seqNum = sentMessageSeqNum + 1
nextSeqNum = seqNum
Expand Down Expand Up @@ -374,10 +372,7 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int,

session.application.ToAdmin(sequenceReset, session.sessionID)

msgBytes, err := sequenceReset.Build()
if err != nil {
return
}
msgBytes := sequenceReset.build()

session.sendBytes(msgBytes)
session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo)
Expand Down
8 changes: 4 additions & 4 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (s *InSessionTestSuite) TestFIXMsgInTargetTooHigh() {
stashedMsg, ok := resendState.messageStash[6]
s.True(ok)

rawMsg, _ := msgSeqNumTooHigh.Build()
stashedRawMsg, _ := stashedMsg.Build()
rawMsg := msgSeqNumTooHigh.build()
stashedRawMsg := stashedMsg.build()
s.Equal(string(rawMsg), string(stashedRawMsg))
}
func (s *InSessionTestSuite) TestFIXMsgInTargetTooHighResendRequestChunkSize() {
Expand Down Expand Up @@ -198,8 +198,8 @@ func (s *InSessionTestSuite) TestFIXMsgInTargetTooHighResendRequestChunkSize() {
stashedMsg, ok := resendState.messageStash[6]
s.True(ok)

rawMsg, _ := msgSeqNumTooHigh.Build()
stashedRawMsg, _ := stashedMsg.Build()
rawMsg := msgSeqNumTooHigh.build()
stashedRawMsg := stashedMsg.build()
s.Equal(string(rawMsg), string(stashedRawMsg))
}
}
Expand Down
4 changes: 2 additions & 2 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package quickfix
//Log is a generic interface for logging FIX messages and events.
type Log interface {
//log incoming fix message
OnIncoming(string)
OnIncoming([]byte)

//log outgoing fix message
OnOutgoing(string)
OnOutgoing([]byte)

//log fix event
OnEvent(string)
Expand Down
11 changes: 4 additions & 7 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,26 +283,23 @@ func extractField(parsedFieldBytes *TagValue, buffer []byte) (remBytes []byte, e
return buffer[(endIndex + 1):], err
}

func (m *Message) String() string {
return string(m.rawMessage)
func (m Message) String() string {
return string(m.build())
}

func newCheckSum(value int) FIXString {
return FIXString(fmt.Sprintf("%03d", value))
}

//Build constructs a []byte from a Message instance
func (m *Message) Build() ([]byte, error) {
func (m Message) build() []byte {
m.cook()

var b bytes.Buffer
m.Header.write(&b)
m.Body.write(&b)
m.Trailer.write(&b)

m.rawMessage = b.Bytes()

return m.rawMessage, nil
return b.Bytes()
}

func (m Message) cook() {
Expand Down
15 changes: 6 additions & 9 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package quickfix

import (
"bytes"
"github.com/quickfixgo/quickfix/enum"
"testing"

"github.com/quickfixgo/quickfix/enum"
)

var msgResult Message
Expand Down Expand Up @@ -82,11 +83,7 @@ func TestMessage_Build(t *testing.T) {
builder.Body.SetField(Tag(554), FIXString("secret"))

expectedBytes := []byte("8=FIX.4.49=4935=A52=20140615-19:49:56553=my_user554=secret10=072")
result, err := builder.Build()
if err != nil {
t.Error("Unexpected error", err)
}

result := builder.build()
if !bytes.Equal(expectedBytes, result) {
t.Error("Unexpected bytes, got ", string(result))
}
Expand All @@ -100,12 +97,12 @@ func TestMessage_ReBuild(t *testing.T) {
msg.Header.SetField(tagOrigSendingTime, FIXString("20140515-19:49:56.659"))
msg.Header.SetField(tagSendingTime, FIXString("20140615-19:49:56"))

msg.Build()
rebuildBytes := msg.build()

expectedBytes := []byte("8=FIX.4.29=12635=D34=249=TW52=20140615-19:49:5656=ISLD122=20140515-19:49:56.65911=10021=140=154=155=TSLA60=00010101-00:00:00.00010=128")

if !bytes.Equal(expectedBytes, msg.rawMessage) {
t.Errorf("Unexpected bytes,\n +%s\n-%s", msg.rawMessage, expectedBytes)
if !bytes.Equal(expectedBytes, rebuildBytes) {
t.Errorf("Unexpected bytes,\n +%s\n-%s", rebuildBytes, expectedBytes)
}

expectedBodyBytes := []byte("11=10021=140=154=155=TSLA60=00010101-00:00:00.000")
Expand Down
6 changes: 3 additions & 3 deletions null_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package quickfix

type nullLog struct{}

func (l nullLog) OnIncoming(s string) {}
func (l nullLog) OnOutgoing(s string) {}
func (l nullLog) OnEvent(s string) {}
func (l nullLog) OnIncoming([]byte) {}
func (l nullLog) OnOutgoing([]byte) {}
func (l nullLog) OnEvent(string) {}
func (l nullLog) OnEventf(format string, a ...interface{}) {}

type nullLogFactory struct{}
Expand Down
7 changes: 3 additions & 4 deletions quickfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func (s *QuickFIXSuite) FieldEquals(tag Tag, expectedValue interface{}, fieldMap
}
}

func (s *QuickFIXSuite) MessageEqualsBytes(msgBytes []byte, msg Message) {
_, err := msg.Build()
s.Require().Nil(err)
s.Equal(string(msg.rawMessage), string(msgBytes))
func (s *QuickFIXSuite) MessageEqualsBytes(expectedBytes []byte, msg Message) {
actualBytes := msg.build()
s.Equal(string(actualBytes), string(expectedBytes))
}

//MockStore wraps a memory store and mocks Refresh for convenience
Expand Down
4 changes: 2 additions & 2 deletions screen_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ type screenLog struct {
prefix string
}

func (l screenLog) OnIncoming(s string) {
func (l screenLog) OnIncoming(s []byte) {
logTime := time.Now().UTC()
fmt.Printf("<%v, %s, incoming>\n (%s)\n", logTime, l.prefix, s)
}

func (l screenLog) OnOutgoing(s string) {
func (l screenLog) OnOutgoing(s []byte) {
logTime := time.Now().UTC()
fmt.Printf("<%v, %s, outgoing>\n (%s)\n", logTime, l.prefix, s)
}
Expand Down
52 changes: 26 additions & 26 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type session struct {
messageIn <-chan fixIn

//application messages are queued up for send here
toSend []Message
toSend [][]byte

//mutex for access to toSend
sendMutex sync.Mutex
Expand Down Expand Up @@ -196,11 +196,12 @@ func (s *session) queueForSend(msg Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

if err := s.prepMessageForSend(&msg, nil); err != nil {
msgBytes, err := s.prepMessageForSend(msg, nil)
if err != nil {
return err
}

s.toSend = append(s.toSend, msg)
s.toSend = append(s.toSend, msgBytes)

select {
case s.messageEvent <- true:
Expand All @@ -222,11 +223,12 @@ func (s *session) sendInReplyTo(msg Message, inReplyTo *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

if err := s.prepMessageForSend(&msg, inReplyTo); err != nil {
msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
if err != nil {
return err
}

s.toSend = append(s.toSend, msg)
s.toSend = append(s.toSend, msgBytes)
s.sendQueued()

return nil
Expand Down Expand Up @@ -255,42 +257,42 @@ func (s *session) dropAndSendInReplyTo(msg Message, resetStore bool, inReplyTo *
}
}

if err := s.prepMessageForSend(&msg, inReplyTo); err != nil {
msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
if err != nil {
return err
}

s.dropQueued()
s.toSend = append(s.toSend, msg)
s.toSend = append(s.toSend, msgBytes)
s.sendQueued()

return nil
}

func (s *session) prepMessageForSend(msg, inReplyTo *Message) error {
s.fillDefaultHeader(*msg, inReplyTo)
func (s *session) prepMessageForSend(msg Message, inReplyTo *Message) (msgBytes []byte, err error) {
s.fillDefaultHeader(msg, inReplyTo)
seqNum := s.store.NextSenderMsgSeqNum()
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))

var err error
msgType, err := msg.MsgType()
if err != nil {
return err
return
}

if isAdminMessageType(string(msgType)) {
s.application.ToAdmin(*msg, s.sessionID)
s.application.ToAdmin(msg, s.sessionID)

if msgType == enum.MsgType_LOGON {
var resetSeqNumFlag FIXBoolean
if msg.Body.Has(tagResetSeqNumFlag) {
if err := msg.Body.GetField(tagResetSeqNumFlag, &resetSeqNumFlag); err != nil {
return err
if err = msg.Body.GetField(tagResetSeqNumFlag, &resetSeqNumFlag); err != nil {
return
}
}

if resetSeqNumFlag.Bool() {
if err := s.store.Reset(); err != nil {
return err
if err = s.store.Reset(); err != nil {
return
}

s.sentReset = true
Expand All @@ -300,17 +302,15 @@ func (s *session) prepMessageForSend(msg, inReplyTo *Message) error {

}
} else {
if err := s.application.ToApp(*msg, s.sessionID); err != nil {
return err
if err = s.application.ToApp(msg, s.sessionID); err != nil {
return
}
}

msgBytes, err := msg.Build()
if err == nil {
err = s.persist(seqNum, msgBytes)
}
msgBytes = msg.build()
err = s.persist(seqNum, msgBytes)

return err
return
}

func (s *session) persist(seqNum int, msgBytes []byte) error {
Expand All @@ -322,8 +322,8 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
for _, msg := range s.toSend {
s.sendBytes(msg.rawMessage)
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
}

s.dropQueued()
Expand All @@ -334,7 +334,7 @@ func (s *session) dropQueued() {
}

func (s *session) sendBytes(msg []byte) {
s.log.OnOutgoing(string(msg))
s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
}
Expand Down
2 changes: 1 addition & 1 deletion session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (sm *stateMachine) Incoming(session *session, m fixIn) {
return
}

session.log.OnIncoming(string(m.bytes))
session.log.OnIncoming(m.bytes)
if msg, err := ParseMessage(m.bytes); err != nil {
session.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), m.bytes)
} else {
Expand Down
4 changes: 2 additions & 2 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ func (s *SessionSuite) TestIncomingNotInSessionTime() {
}

msg := s.NewOrderSingle()
msg.Build()
msgBytes := msg.build()

s.session.Incoming(s.session, fixIn{bytes: msg.rawMessage})
s.session.Incoming(s.session, fixIn{bytes: msgBytes})
s.MockApp.AssertExpectations(s.T())
s.State(notSessionTime{})
}
Expand Down
Loading