From 6b4fb8cae4a6db430f7918d9e3880d5a624d5896 Mon Sep 17 00:00:00 2001 From: "Michael L. Wilner" Date: Thu, 1 Jul 2021 13:17:18 -0500 Subject: [PATCH 1/3] Acceptor can listen on different ports per session --- acceptor.go | 88 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 36 deletions(-) diff --git a/acceptor.go b/acceptor.go index ac0ab6e52..223aed1ae 100644 --- a/acceptor.go +++ b/acceptor.go @@ -10,7 +10,7 @@ import ( "strconv" "sync" - "github.com/armon/go-proxyproto" + proxyproto "github.com/armon/go-proxyproto" "github.com/quickfixgo/quickfix/config" ) @@ -23,13 +23,14 @@ type Acceptor struct { globalLog Log sessions map[SessionID]*session sessionGroup sync.WaitGroup - listener net.Listener listenerShutdown sync.WaitGroup dynamicSessions bool dynamicQualifier bool dynamicQualifierCount int dynamicSessionChan chan *session sessionAddr map[SessionID]net.Addr + sessionHostPort map[SessionID]int + listeners map[string]net.Listener connectionValidator ConnectionValidator sessionFactory } @@ -42,46 +43,49 @@ type ConnectionValidator interface { } //Start accepting connections. -func (a *Acceptor) Start() error { +func (a *Acceptor) Start() (err error) { socketAcceptHost := "" if a.settings.GlobalSettings().HasSetting(config.SocketAcceptHost) { - var err error if socketAcceptHost, err = a.settings.GlobalSettings().Setting(config.SocketAcceptHost); err != nil { - return err + return } } - socketAcceptPort, err := a.settings.GlobalSettings().IntSetting(config.SocketAcceptPort) - if err != nil { - return err + a.sessionHostPort = make(map[SessionID]int) + a.listeners = make(map[string]net.Listener) + for sessionID, sessionSettings := range a.settings.SessionSettings() { + if sessionSettings.HasSetting(config.SocketAcceptPort) { + if a.sessionHostPort[sessionID], err = sessionSettings.IntSetting(config.SocketAcceptPort); err != nil { + return + } + } else if a.sessionHostPort[sessionID], err = a.settings.GlobalSettings().IntSetting(config.SocketAcceptPort); err != nil { + return + } + address := net.JoinHostPort(socketAcceptHost, strconv.Itoa(a.sessionHostPort[sessionID])) + a.listeners[address] = nil } var tlsConfig *tls.Config if tlsConfig, err = loadTLSConfig(a.settings.GlobalSettings()); err != nil { - return err + return } var useTCPProxy bool if a.settings.GlobalSettings().HasSetting(config.UseTCPProxy) { if useTCPProxy, err = a.settings.GlobalSettings().BoolSetting(config.UseTCPProxy); err != nil { - return err + return } } - address := net.JoinHostPort(socketAcceptHost, strconv.Itoa(socketAcceptPort)) - if tlsConfig != nil { - if a.listener, err = tls.Listen("tcp", address, tlsConfig); err != nil { - return err - } - } else if useTCPProxy { - listener, err := net.Listen("tcp", address) - if err != nil { - return err - } - a.listener = &proxyproto.Listener{Listener: listener} - } else { - if a.listener, err = net.Listen("tcp", address); err != nil { - return err + for address := range a.listeners { + if tlsConfig != nil { + if a.listeners[address], err = tls.Listen("tcp", address, tlsConfig); err != nil { + return + } + } else if a.listeners[address], err = net.Listen("tcp", address); err != nil { + return + } else if useTCPProxy { + a.listeners[address] = &proxyproto.Listener{Listener: a.listeners[address]} } } @@ -101,9 +105,11 @@ func (a *Acceptor) Start() error { a.sessionGroup.Done() }() } - a.listenerShutdown.Add(1) - go a.listenForConnections() - return nil + a.listenerShutdown.Add(len(a.listeners)) + for _, listener := range a.listeners { + go a.listenForConnections(listener) + } + return } //Stop logs out existing sessions, close their connections, and stop accepting new connections. @@ -112,7 +118,9 @@ func (a *Acceptor) Stop() { _ = recover() // suppress sending on closed channel error }() - a.listener.Close() + for _, listener := range a.listeners { + listener.Close() + } a.listenerShutdown.Wait() if a.dynamicSessions { close(a.dynamicSessionChan) @@ -132,12 +140,14 @@ func (a *Acceptor) RemoteAddr(sessionID SessionID) (net.Addr, bool) { //NewAcceptor creates and initializes a new Acceptor. func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Settings, logFactory LogFactory) (a *Acceptor, err error) { a = &Acceptor{ - app: app, - storeFactory: storeFactory, - settings: settings, - logFactory: logFactory, - sessions: make(map[SessionID]*session), - sessionAddr: make(map[SessionID]net.Addr), + app: app, + storeFactory: storeFactory, + settings: settings, + logFactory: logFactory, + sessions: make(map[SessionID]*session), + sessionAddr: make(map[SessionID]net.Addr), + sessionHostPort: make(map[SessionID]int), + listeners: make(map[string]net.Listener), } if a.settings.GlobalSettings().HasSetting(config.DynamicSessions) { if a.dynamicSessions, err = settings.globalSettings.BoolSetting(config.DynamicSessions); err != nil { @@ -171,11 +181,11 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se return } -func (a *Acceptor) listenForConnections() { +func (a *Acceptor) listenForConnections(listener net.Listener) { defer a.listenerShutdown.Done() for { - netConn, err := a.listener.Accept() + netConn, err := listener.Accept() if err != nil { return } @@ -276,6 +286,12 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { TargetCompID: string(senderCompID), TargetSubID: string(senderSubID), TargetLocationID: string(senderLocationID), } + localConnectionPort := netConn.LocalAddr().(*net.TCPAddr).Port + if expectedPort, ok := a.sessionHostPort[sessID]; !ok || expectedPort != localConnectionPort { + a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes) + return + } + // We have a session ID and a network connection. This seems to be a good place for any custom authentication logic. if a.connectionValidator != nil { if err := a.connectionValidator.Validate(netConn, sessID); err != nil { From b24fac80032f65288f43fd133b803aabc0d2ab62 Mon Sep 17 00:00:00 2001 From: "Michael L. Wilner" Date: Thu, 1 Jul 2021 13:17:56 -0500 Subject: [PATCH 2/3] Test for acceptors per session --- accepter_test.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/accepter_test.go b/accepter_test.go index 6924447fe..5e032cbb1 100644 --- a/accepter_test.go +++ b/accepter_test.go @@ -4,11 +4,18 @@ import ( "net" "testing" - "github.com/armon/go-proxyproto" + "github.com/quickfixgo/quickfix/config" + + proxyproto "github.com/armon/go-proxyproto" "github.com/stretchr/testify/assert" ) func TestAcceptor_Start(t *testing.T) { + sessionSettings := NewSessionSettings() + sessionSettings.Set(config.BeginString, BeginStringFIX42) + sessionSettings.Set(config.SenderCompID, "sender") + sessionSettings.Set(config.TargetCompID, "target") + settingsWithTCPProxy := NewSettings() settingsWithTCPProxy.GlobalSettings().Set("UseTCPProxy", "Y") @@ -23,8 +30,8 @@ func TestAcceptor_Start(t *testing.T) { ) acceptorStartTests := []struct { - name string - settings *Settings + name string + settings *Settings listenerType int }{ {"with TCP proxy set", settingsWithTCPProxy, ProxyListener}, @@ -35,22 +42,29 @@ func TestAcceptor_Start(t *testing.T) { for _, tt := range acceptorStartTests { t.Run(tt.name, func(t *testing.T) { tt.settings.GlobalSettings().Set("SocketAcceptPort", "5001") + if _, err := tt.settings.AddSession(sessionSettings); err != nil { + assert.Nil(t, err) + } acceptor := &Acceptor{settings: tt.settings} if err := acceptor.Start(); err != nil { assert.NotNil(t, err) } - if tt.listenerType == ProxyListener { - _, ok := acceptor.listener.(*proxyproto.Listener) - assert.True(t, ok) - } + assert.Len(t, acceptor.listeners, 1) + + for _, listener := range acceptor.listeners { + if tt.listenerType == ProxyListener { + _, ok := listener.(*proxyproto.Listener) + assert.True(t, ok) + } - if tt.listenerType == GenericListener { - _, ok := acceptor.listener.(*net.TCPListener) - assert.True(t, ok) + if tt.listenerType == GenericListener { + _, ok := listener.(*net.TCPListener) + assert.True(t, ok) + } } acceptor.Stop() }) } -} \ No newline at end of file +} From b3191eb28748c6579495dee34aeefd4ffe0e002d Mon Sep 17 00:00:00 2001 From: "Michael L. Wilner" Date: Thu, 1 Jul 2021 13:45:18 -0500 Subject: [PATCH 3/3] Update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c6137fd7a..ff2f9b241 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *~ *.swp *.swo +.idea vendor _test/test _test/echo_server