Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config advertise-addr for drainer #634

Merged
merged 9 commits into from Jun 18, 2019
@@ -2,7 +2,10 @@

# addr (i.e. 'host:port') to listen on for drainer connections
# will register this addr into etcd
# addr = "127.0.0.1:8249"
addr = "127.0.0.1:8249"

# addr(i.e. 'host:port') to advertise to the public
advertise-addr = ""

# the interval time (in seconds) of detect pumps' status
detect-interval = 10
@@ -79,6 +79,7 @@ type Config struct {
*flag.FlagSet `json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
@@ -109,6 +110,7 @@ func NewConfig() *Config {
fs.PrintDefaults()
}
fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8249), "addr (i.e. 'host:port') to listen on for drainer connections")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public, default to be the same value as -addr")
fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "drainer data directory path (default data.drainer)")
fs.IntVar(&cfg.DetectInterval, "detect-interval", defaultDetectInterval, "the interval time (in seconds) of detect pumps' status")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
@@ -225,18 +227,12 @@ func (cfg *Config) configFromFile(path string) error {
// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check ListenAddr
urllis, err := url.Parse(cfg.ListenAddr)
if err != nil {
return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err)
strictCheck := !util.IsInTestMode()
if err := validateAddr(cfg.ListenAddr, strictCheck); err != nil {
return errors.Annotate(err, "invalid addr")
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err)
}

if !util.IsValidateListenHost(host) {
log.Warn("pump may not be able to access drainer using this listen addr config", zap.String("listen addr", host))
if err := validateAddr(cfg.AdvertiseAddr, strictCheck); err != nil {
return errors.Annotate(err, "invalid advertise-addr")
}

// check EtcdEndpoints
@@ -264,7 +260,9 @@ func (cfg *Config) validate() error {
func (cfg *Config) adjustConfig() error {
// adjust configuration
util.AdjustString(&cfg.ListenAddr, util.DefaultListenAddr(8249))
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.AdvertiseAddr, cfg.ListenAddr)
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.DataDir, defaultDataDir)
util.AdjustInt(&cfg.DetectInterval, defaultDetectInterval)

@@ -349,3 +347,26 @@ func (cfg *Config) adjustConfig() error {

return nil
}

func validateAddr(addr string, strict bool) error {
urllis, err := url.Parse(addr)
if err != nil {
return errors.Annotatef(err, "failed to parse addr %v", addr)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Annotatef(err, "invalid host %v", urllis.Host)
}

if !util.IsValidateListenHost(host) {
err := errors.Errorf("pump may not be able to access drainer using this addr %s", addr)
if strict {
return err
}
if host != "127.0.0.1" && host != "localhost" {
return err
}
}
return nil
}
@@ -19,6 +19,7 @@ import (
"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var testEtcdCluster *integration.ClusterV3
@@ -36,17 +37,19 @@ type testDrainerSuite struct{}

func (t *testDrainerSuite) TestConfig(c *C) {
args := []string{
"-metrics-addr", "127.0.0.1:9091",
"-metrics-addr", "192.168.15.10:9091",
"-txn-batch", "1",
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", "../cmd/drainer/drainer.toml",
"-addr", "192.168.15.10:8257",
"-advertise-addr", "192.168.15.10:8257",
}

cfg := NewConfig()
err := cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "127.0.0.1:9091")
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
c.Assert(cfg.DataDir, Equals, "data.drainer")
c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql")
@@ -61,9 +64,13 @@ func (t *testDrainerSuite) TestValidate(c *C) {

cfg.ListenAddr = "http://123:9091"
err := cfg.validate()
c.Assert(err, ErrorMatches, ".*ListenAddr.*")
c.Assert(err, ErrorMatches, ".*invalid addr.*")

cfg.ListenAddr = "http://192.168.10.12:9091"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*invalid advertise-addr.*")

cfg.AdvertiseAddr = "http://192.168.10.12:9091"
cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")
@@ -96,4 +103,29 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)

cfg = NewConfig()
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")
}

type validateAddrSuite struct{}

var _ = Suite(&validateAddrSuite{})

func (s *validateAddrSuite) TestStrictOrNot(c *C) {
err := validateAddr("http://127.0.0.1:9090", true)
c.Assert(err, NotNil)
err = validateAddr("http://127.0.0.1:9090", false)
c.Assert(err, IsNil)
}
@@ -143,9 +143,9 @@ func NewServer(cfg *Config) (*Server, error) {
)
}

advURL, err := url.Parse(cfg.ListenAddr)
advURL, err := url.Parse(cfg.AdvertiseAddr)
if err != nil {
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.ListenAddr)
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr)
}

status := node.NewStatus(ID, advURL.Host, node.Online, 0, syncer.GetLatestCommitTS(), util.GetApproachTS(latestTS, latestTime))
@@ -17,6 +17,7 @@ import (
"context"
"fmt"
"net"
"os"
"time"

"github.com/pingcap/errors"
@@ -237,3 +238,9 @@ func AdjustDuration(v *time.Duration, defValue time.Duration) {
*v = defValue
}
}

// IsInTestMode returns true if the BINLOG_TEST environment
// variable is set.
func IsInTestMode() bool {
return os.Getenv("BINLOG_TEST") == "1"
}
@@ -17,6 +17,7 @@ import (
"context"
"errors"
"net"
"os"
"testing"
"time"

@@ -48,6 +49,13 @@ func (s dummyStore) CurrentVersion() (kv.Version, error) {
return s.ver, s.err
}

func (s *utilSuite) TestIsInTestMode(c *C) {
c.Assert(IsInTestMode(), IsFalse)
os.Setenv("BINLOG_TEST", "1")
defer os.Unsetenv("BINLOG_TEST")
c.Assert(IsInTestMode(), IsTrue)
}

func (s *utilSuite) TestQueryLatestTsFromPD(c *C) {
ds := dummyStore{err: errors.New("test")}
ver, err := QueryLatestTsFromPD(ds)
@@ -206,8 +206,7 @@ func (cfg *Config) validate() error {
return errors.Errorf("bad AdvertiseAddr host format: %s, %v", urladv.Host, err)
}
if !util.IsValidateListenHost(host) || host == "0.0.0.0" {
isTestEnv := os.Getenv("BINLOG_TEST") == "1"
if !(isTestEnv && host == "127.0.0.1") {
if !(util.IsInTestMode() && host == "127.0.0.1") {
return errors.Errorf("invalid advertiseAddr host: %v", host)
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.