Skip to content

Commit

Permalink
Optimize performance
Browse files Browse the repository at this point in the history
  • Loading branch information
letian0805 committed Apr 12, 2021
1 parent 9d890e4 commit abcd695
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ _test/echo_server
_test/tmp
_vendor*
gen
.idea
16 changes: 11 additions & 5 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"strconv"
"sync"

"github.com/armon/go-proxyproto"
proxyproto "github.com/armon/go-proxyproto"

"github.com/quickfixgo/quickfix/config"
)

//Acceptor accepts connections from FIX clients and manages the associated sessions.
type Acceptor struct {
sync.RWMutex
app Application
settings *Settings
logFactory LogFactory
Expand Down Expand Up @@ -84,7 +86,6 @@ func (a *Acceptor) Start() error {
return err
}
}

for sessionID := range a.sessions {
session := a.sessions[sessionID]
a.sessionGroup.Add(1)
Expand Down Expand Up @@ -125,7 +126,9 @@ func (a *Acceptor) Stop() {

//Get remote IP address for a given session.
func (a *Acceptor) RemoteAddr(sessionID SessionID) (net.Addr, bool) {
a.RLock()
addr, ok := a.sessionAddr[sessionID]
a.RUnlock()
return addr, ok
}

Expand Down Expand Up @@ -303,10 +306,11 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
session = dynamicSession
defer session.stop()
}

a.Lock()
a.sessionAddr[sessID] = netConn.RemoteAddr()
msgIn := make(chan fixIn)
msgOut := make(chan []byte)
a.Unlock()
msgIn := make(chan fixIn, session.ReceiveQueueLength)
msgOut := make(chan []byte, session.SendQueueLength)

if err := session.connect(msgIn, msgOut); err != nil {
a.globalLog.OnEventf("Unable to accept %v", err.Error())
Expand Down Expand Up @@ -351,7 +355,9 @@ LOOP:
case id := <-complete:
session, ok := sessions[id]
if ok {
a.Lock()
delete(a.sessionAddr, session.sessionID)
a.Unlock()
delete(sessions, id)
} else {
a.globalLog.OnEventf("Missing dynamic session %v!", id)
Expand Down
3 changes: 3 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ const (
RejectInvalidMessage string = "RejectInvalidMessage"
DynamicSessions string = "DynamicSessions"
DynamicQualifier string = "DynamicQualifier"
SendBufferSize string = "SendBufferSize"
SendQueueLength string = "SendQueueLength"
ReceiveQueueLength string = "ReceiveQueueLength"
)
2 changes: 2 additions & 0 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int

session.log.OnEventf("Resending Message: %v", sentMessageSeqNum)
msgBytes = msg.build()
session.log.OnOutgoing(msgBytes)
session.sendBytes(msgBytes)

seqNum = sentMessageSeqNum + 1
Expand Down Expand Up @@ -382,6 +383,7 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int,

msgBytes := sequenceReset.build()

session.log.OnOutgoing(msgBytes)
session.sendBytes(msgBytes)
session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo)

Expand Down
4 changes: 2 additions & 2 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
netConn = tlsConn
}

msgIn = make(chan fixIn)
msgOut = make(chan []byte)
msgIn = make(chan fixIn, session.ReceiveQueueLength)
msgOut = make(chan []byte, session.SendQueueLength)
if err := session.connect(msgIn, msgOut); err != nil {
session.log.OnEventf("Failed to initiate: %v", err)
goto reconnect
Expand Down
3 changes: 3 additions & 0 deletions internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type SessionSettings struct {
SkipCheckLatency bool
MaxLatency time.Duration
DisableMessagePersist bool
SendBufferSize int
SendQueueLength int
ReceiveQueueLength int

//required on logon for FIX.T.1 messages
DefaultApplVerID string
Expand Down
15 changes: 13 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,20 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
var queueBuffer = make([]byte, 0, s.SendBufferSize)
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
s.log.OnOutgoing(msgBytes)
if len(queueBuffer)+len(msgBytes) < s.SendBufferSize {
queueBuffer = append(queueBuffer, msgBytes...)
} else {
s.sendBytes(queueBuffer)
queueBuffer = make([]byte, 0, s.SendBufferSize)
queueBuffer = append(queueBuffer, msgBytes...)
}
}

if len(queueBuffer) > 0 {
s.sendBytes(queueBuffer)
}

s.dropQueued()
Expand All @@ -343,7 +355,6 @@ func (s *session) dropQueued() {
}

func (s *session) sendBytes(msg []byte) {
s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
}
Expand Down
27 changes: 27 additions & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,33 @@ func (f sessionFactory) newSession(
s.DisableMessagePersist = !persistMessages
}

s.SendBufferSize = 1000
if settings.HasSetting(config.SendBufferSize) {
var sendBufferSize int
if sendBufferSize, err = settings.IntSetting(config.SendBufferSize); err != nil {
return
}
s.SendBufferSize = sendBufferSize
}

s.SendQueueLength = 1
if settings.HasSetting(config.SendQueueLength) {
var sendQueueLength int
if sendQueueLength, err = settings.IntSetting(config.SendQueueLength); err != nil {
return
}
s.SendQueueLength = sendQueueLength
}

s.SendQueueLength = 1
if settings.HasSetting(config.ReceiveQueueLength) {
var receiveQueueLength int
if receiveQueueLength, err = settings.IntSetting(config.ReceiveQueueLength); err != nil {
return
}
s.ReceiveQueueLength = receiveQueueLength
}

if f.BuildInitiators {
if err = f.buildInitiatorSettings(s, settings); err != nil {
return
Expand Down

0 comments on commit abcd695

Please sign in to comment.