diff --git a/acceptor.go b/acceptor.go index f58ef01f7..83384c5b8 100644 --- a/acceptor.go +++ b/acceptor.go @@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { } a.sessionAddr.Store(sessID, netConn.RemoteAddr()) - msgIn := make(chan fixIn) + msgIn := make(chan fixIn, session.InChanCapacity) msgOut := make(chan []byte) if err := session.connect(msgIn, msgOut); err != nil { diff --git a/config/configuration.go b/config/configuration.go index 4b9d322d4..6282bab1e 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -489,6 +489,16 @@ const ( // Valid Values: // - Any positive integer MaxLatency string = "MaxLatency" + + // InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages. + // + // Required: No + // + // Default: 1 + // + // Valid Values: + // - A positive integer, or zero for an unbuffered channel + InChanCapacity string = "InChanCapacity" ) const ( diff --git a/initiator.go b/initiator.go index 18451477e..9af90cb2b 100644 --- a/initiator.go +++ b/initiator.go @@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di netConn = tlsConn } - msgIn = make(chan fixIn) + msgIn = make(chan fixIn, session.InChanCapacity) msgOut = make(chan []byte) if err := session.connect(msgIn, msgOut); err != nil { session.log.OnEventf("Failed to initiate: %v", err) diff --git a/internal/session_settings.go b/internal/session_settings.go index ef7413ebd..c45134822 100644 --- a/internal/session_settings.go +++ b/internal/session_settings.go @@ -21,6 +21,7 @@ type SessionSettings struct { TimeZone *time.Location ResetSeqTime time.Time EnableResetSeqTime bool + InChanCapacity int // Required on logon for FIX.T.1 messages. DefaultApplVerID string diff --git a/session.go b/session.go index a6d296999..342988f64 100644 --- a/session.go +++ b/session.go @@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError { return nil } +func (s *session) drainMessageIn() { + s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn)) + for { + select { + case fixInc, ok := <-s.messageIn: + if !ok { + return + } + s.Incoming(s, fixInc) + default: + return + } + } +} + func (s *session) doReject(msg *Message, rej MessageRejectError) error { reply := msg.reverseRoute() @@ -824,6 +839,9 @@ func (s *session) onDisconnect() { s.messageOut = nil } + // s.messageIn is buffered so we need to drain it before disconnection + s.drainMessageIn() + s.messageIn = nil } diff --git a/session_factory.go b/session_factory.go index 613bb2069..1b79f5ee1 100644 --- a/session_factory.go +++ b/session_factory.go @@ -431,6 +431,18 @@ func (f sessionFactory) newSession( s.DisableMessagePersist = !persistMessages } + if settings.HasSetting(config.InChanCapacity) { + if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil { + return + } else if s.InChanCapacity < 0 { + err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))} + return + } + } else { + // Default to 1 buffered message per channel + s.InChanCapacity = 1 + } + if f.BuildInitiators { if err = f.buildInitiatorSettings(s, settings); err != nil { return