Skip to content

Commit

Permalink
server: support PROXY protocol fallback-able options (#41393)
Browse files Browse the repository at this point in the history
close #41409
  • Loading branch information
blacktear23 committed Feb 16, 2023
1 parent bacb08b commit 0519e7e
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 31 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ def go_deps():
name = "com_github_blacktear23_go_proxyprotocol",
build_file_proto_mode = "disable_global",
importpath = "github.com/blacktear23/go-proxyprotocol",
sum = "h1:zR7PZeoU0wAkElcIXenFiy3R56WB6A+UEVi4c6RH8wo=",
version = "v1.0.2",
sum = "h1:moi4x1lJlrQj2uYUJdEyCxqj9UNmaSKZwaGZIXnbAis=",
version = "v1.0.5",
)
go_repository(
name = "com_github_blizzy78_varnamelen",
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ type ProxyProtocol struct {
Networks string `toml:"networks" json:"networks"`
// PROXY protocol header read timeout, Unit is second.
HeaderTimeout uint `toml:"header-timeout" json:"header-timeout"`
// PROXY protocol header process fallback-able.
// If set to true and not send PROXY protocol header, connection will return connection's client IP.
Fallbackable bool `toml:"fallbackable" json:"fallbackable"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -979,6 +982,7 @@ var defaultConf = Config{
ProxyProtocol: ProxyProtocol{
Networks: "",
HeaderTimeout: 5,
Fallbackable: false,
},
PreparedPlanCache: PreparedPlanCache{
Enabled: true,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/apache/skywalking-eyes v0.4.0
github.com/ashanbrown/makezero v1.1.1
github.com/aws/aws-sdk-go v1.44.48
github.com/blacktear23/go-proxyprotocol v1.0.2
github.com/blacktear23/go-proxyprotocol v1.0.5
github.com/carlmjohnson/flagext v0.21.0
github.com/charithe/durationcheck v0.0.9
github.com/cheggaaa/pb/v3 v3.0.8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/biogo/store v0.0.0-20160505134755-913427a1d5e8/go.mod h1:Iev9Q3MErcn+w3UOJD/DkEzllvugfdx7bGcMOFhvr/4=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blacktear23/go-proxyprotocol v1.0.2 h1:zR7PZeoU0wAkElcIXenFiy3R56WB6A+UEVi4c6RH8wo=
github.com/blacktear23/go-proxyprotocol v1.0.2/go.mod h1:FSCbgnRZrQXazBLL5snfBbrcFSMtcmUDhSRb9OfFA1o=
github.com/blacktear23/go-proxyprotocol v1.0.5 h1:moi4x1lJlrQj2uYUJdEyCxqj9UNmaSKZwaGZIXnbAis=
github.com/blacktear23/go-proxyprotocol v1.0.5/go.mod h1:FSCbgnRZrQXazBLL5snfBbrcFSMtcmUDhSRb9OfFA1o=
github.com/bmatcuk/doublestar/v2 v2.0.4 h1:6I6oUiT/sU27eE2OFcWqBhL1SwjyvQuOssxT4a1yidI=
github.com/bmatcuk/doublestar/v2 v2.0.4/go.mod h1:QMmcs3H2AUQICWhfzLXz+IYln8lRQmTZRptLie8RgRw=
github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60=
Expand Down
37 changes: 33 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func newClientConn(s *Server) *clientConn {
status: connStatusDispatching,
lastActive: time.Now(),
authPlugin: mysql.AuthNativePassword,
ppEnabled: s.cfg.ProxyProtocol.Networks != "",
}
}

Expand Down Expand Up @@ -214,6 +215,9 @@ type clientConn struct {
cancelFunc context.CancelFunc
}
extensions *extension.SessionExtensions

// Proxy Protocol Enabled
ppEnabled bool
}

func (cc *clientConn) getCtx() *TiDBContext {
Expand Down Expand Up @@ -621,6 +625,21 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse(ctx context.Con
return err
}

// After read packets we should update the client's host and port to grab
// real client's IP and port from PROXY Protocol header if PROXY Protocol is enabled.
_, _, err = cc.PeerHost("", true)
if err != nil {
terror.Log(err)
return err
}
// If enable proxy protocol check audit plugins after update real IP
if cc.ppEnabled {
err = cc.server.checkAuditPlugin(cc)
if err != nil {
return err
}
}

if resp.Capability&mysql.ClientSSL > 0 {
tlsConfig := (*tls.Config)(atomic.LoadPointer(&cc.server.tlsConfig))
if tlsConfig != nil {
Expand Down Expand Up @@ -838,7 +857,8 @@ func (cc *clientConn) openSessionAndDoAuth(authData []byte, authPlugin string) e
if len(authData) == 0 {
hasPassword = "NO"
}
host, port, err := cc.PeerHost(hasPassword)

host, port, err := cc.PeerHost(hasPassword, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -881,7 +901,8 @@ func (cc *clientConn) checkAuthPlugin(ctx context.Context, resp *handshakeRespon
if len(authData) == 0 {
hasPassword = "NO"
}
host, _, err := cc.PeerHost(hasPassword)

host, _, err := cc.PeerHost(hasPassword, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -953,9 +974,17 @@ func (cc *clientConn) checkAuthPlugin(ctx context.Context, resp *handshakeRespon
return nil, nil
}

func (cc *clientConn) PeerHost(hasPassword string) (host, port string, err error) {
func (cc *clientConn) PeerHost(hasPassword string, update bool) (host, port string, err error) {
// already get peer host
if len(cc.peerHost) > 0 {
return cc.peerHost, cc.peerPort, nil
// Proxy protocol enabled and not update
if cc.ppEnabled && !update {
return cc.peerHost, cc.peerPort, nil
}
// Proxy protocol not enabled
if !cc.ppEnabled {
return cc.peerHost, cc.peerPort, nil
}
}
host = variable.DefHostname
if cc.isUnixSocket {
Expand Down
65 changes: 43 additions & 22 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
_ "net/http/pprof" // #nosec G108
"os"
"os/user"
"reflect"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -287,7 +288,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
proxyTarget = s.socket
}
ppListener, err := proxyprotocol.NewLazyListener(proxyTarget, s.cfg.ProxyProtocol.Networks,
int(s.cfg.ProxyProtocol.HeaderTimeout))
int(s.cfg.ProxyProtocol.HeaderTimeout), s.cfg.ProxyProtocol.Fallbackable)
if err != nil {
logutil.BgLogger().Error("ProxyProtocol networks parameter invalid")
return nil, errors.Trace(err)
Expand Down Expand Up @@ -435,7 +436,19 @@ func (s *Server) startNetworkListener(listener net.Listener, isUnixSocket bool,

clientConn := s.newConn(conn)
if isUnixSocket {
uc, ok := conn.(*net.UnixConn)
var (
uc *net.UnixConn
ok bool
)
if clientConn.ppEnabled {
// Using reflect to get Raw Conn object from proxy protocol wrapper connection object
ppv := reflect.ValueOf(conn)
vconn := ppv.Elem().FieldByName("Conn")
rconn := vconn.Interface()
uc, ok = rconn.(*net.UnixConn)
} else {
uc, ok = conn.(*net.UnixConn)
}
if !ok {
logutil.BgLogger().Error("Expected UNIX socket, but got something else")
return
Expand All @@ -450,25 +463,11 @@ func (s *Server) startNetworkListener(listener net.Listener, isUnixSocket bool,
}
}

err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
authPlugin := plugin.DeclareAuditManifest(p.Manifest)
if authPlugin.OnConnectionEvent == nil {
return nil
}
host, _, err := clientConn.PeerHost("")
if err != nil {
logutil.BgLogger().Error("get peer host failed", zap.Error(err))
terror.Log(clientConn.Close())
return errors.Trace(err)
}
if err = authPlugin.OnConnectionEvent(context.Background(), plugin.PreAuth,
&variable.ConnectionInfo{Host: host}); err != nil {
logutil.BgLogger().Info("do connection event failed", zap.Error(err))
terror.Log(clientConn.Close())
return errors.Trace(err)
}
return nil
})
err = nil
if !clientConn.ppEnabled {
// Check audit plugins when ProxyProtocol not enabled
err = s.checkAuditPlugin(clientConn)
}
if err != nil {
continue
}
Expand All @@ -483,6 +482,28 @@ func (s *Server) startNetworkListener(listener net.Listener, isUnixSocket bool,
}
}

func (s *Server) checkAuditPlugin(clientConn *clientConn) error {
return plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
authPlugin := plugin.DeclareAuditManifest(p.Manifest)
if authPlugin.OnConnectionEvent == nil {
return nil
}
host, _, err := clientConn.PeerHost("", false)
if err != nil {
logutil.BgLogger().Error("get peer host failed", zap.Error(err))
terror.Log(clientConn.Close())
return errors.Trace(err)
}
if err = authPlugin.OnConnectionEvent(context.Background(), plugin.PreAuth,
&variable.ConnectionInfo{Host: host}); err != nil {
logutil.BgLogger().Info("do connection event failed", zap.Error(err))
terror.Log(clientConn.Close())
return errors.Trace(err)
}
return nil
})
}

func (s *Server) startShutdown() {
s.rwlock.RLock()
logutil.BgLogger().Info("setting tidb-server to report unhealthy (shutting-down)")
Expand Down Expand Up @@ -535,7 +556,7 @@ func (s *Server) Close() {
// onConn runs in its own goroutine, handles queries from this connection.
func (s *Server) onConn(conn *clientConn) {
// init the connInfo
_, _, err := conn.PeerHost("")
_, _, err := conn.PeerHost("", false)
if err != nil {
logutil.BgLogger().With(zap.Uint64("conn", conn.connectionID)).
Error("get peer host failed", zap.Error(err))
Expand Down

0 comments on commit 0519e7e

Please sign in to comment.