diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index ac8601f824..e779c24322 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -312,6 +312,10 @@ ErrMasterOperRespNotSuccess,[code=38032:class=dm-master:scope=internal:level=hig ErrMasterOperRequestTimeout,[code=38033:class=dm-master:scope=internal:level=high],"request is timeout, but request may be successful, please execute `query-status` to check status" ErrMasterHandleHTTPApis,[code=38034:class=dm-master:scope=internal:level=high],"serve http apis to grpc" ErrMasterHostPortNotValid,[code=38035:class=dm-master:scope=internal:level=high],"host:port '%s' not valid" +ErrMasterGetHostnameFail,[code=38036:class=dm-master:scope=internal:level=high],"get hostname fail" +ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level=high],"generate config item %s for embed etcd fail" +ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"start embed etcd fail" +ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"parse URL %s fail" ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set" ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag" ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file" diff --git a/cmd/dm-master/main.go b/cmd/dm-master/main.go index 601e25bc46..66d6e3ff6d 100644 --- a/cmd/dm-master/main.go +++ b/cmd/dm-master/main.go @@ -14,6 +14,7 @@ package main import ( + "context" "flag" "fmt" "os" @@ -30,6 +31,7 @@ import ( ) func main() { + // 1. parse config cfg := master.NewConfig() err := cfg.Parse(os.Args[1:]) switch errors.Cause(err) { @@ -41,6 +43,7 @@ func main() { os.Exit(2) } + // 2. init logger err = log.InitLogger(&log.Config{ File: cfg.LogFile, Level: strings.ToLower(cfg.LogLevel), @@ -50,39 +53,42 @@ func main() { os.Exit(2) } + // 3. print process version information utils.PrintInfo("dm-master", func() { log.L().Info("", zap.Stringer("dm-master config", cfg)) }) + // 4. start the server + ctx, cancel := context.WithCancel(context.Background()) + server := master.NewServer(cfg) + err = server.Start(ctx) + if err != nil { + log.L().Error("fail to start dm-master", zap.Error(err)) + os.Exit(2) + } + + // 5. wait for stopping the process sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - - server := master.NewServer(cfg) - go func() { sig := <-sc log.L().Info("got signal to exit", zap.Stringer("signal", sig)) - server.Close() + cancel() }() + <-ctx.Done() - err = server.Start() - if err != nil { - log.L().Error("fail to start dm-master", zap.Error(err)) - } + // 6. close the server server.Close() - log.L().Info("dm-master exit") + // 7. flush log syncErr := log.L().Sync() if syncErr != nil { fmt.Fprintln(os.Stderr, "sync log failed", syncErr) - } - - if err != nil || syncErr != nil { os.Exit(1) } } diff --git a/dm/master/config.go b/dm/master/config.go index c082076ca4..c895f540d2 100644 --- a/dm/master/config.go +++ b/dm/master/config.go @@ -19,15 +19,26 @@ import ( "flag" "fmt" "io/ioutil" + "net" + "net/url" + "os" "strings" "time" + "github.com/BurntSushi/toml" + "go.etcd.io/etcd/embed" + "go.uber.org/zap" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" +) - "github.com/BurntSushi/toml" - "go.uber.org/zap" +const ( + defaultRPCTimeout = "30s" + defaultNamePrefix = "dm-master" + defaultDataDirPrefix = "default" + defaultPeerUrls = "http://127.0.0.1:8291" ) // SampleConfigFile is sample config file of dm-master @@ -35,8 +46,6 @@ import ( // and assign it to SampleConfigFile while we build dm-master var SampleConfigFile string -var defaultRPCTimeout = "30s" - // NewConfig creates a config for dm-master func NewConfig() *Config { cfg := &Config{} @@ -51,6 +60,12 @@ func NewConfig() *Config { fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") //fs.StringVar(&cfg.LogRotate, "log-rotate", "day", "log file rotate type, hour/day") + fs.StringVar(&cfg.Name, "name", "", "human-readable name for this DM-master member") + fs.StringVar(&cfg.DataDir, "data-dir", "", `path to the data directory (default "default.${name}")`) + fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", fmt.Sprintf("initial cluster configuration for bootstrapping, e,g. dm-master=%s", defaultPeerUrls)) + fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic") + fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`) + return cfg } @@ -90,6 +105,15 @@ type Config struct { ConfigFile string `json:"config-file"` + // etcd relative config items + // NOTE: we use `MasterAddr` to generate `ClientUrls` and `AdvertiseClientUrls` + // NOTE: more items will be add when adding leader election + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` + PeerUrls string `toml:"peer-urls" json:"peer-urls"` + AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"` + InitialCluster string `toml:"initial-cluster" json:"initial-cluster"` + printVersion bool printSampleConfig bool } @@ -169,9 +193,15 @@ func (c *Config) configFromFile(path string) error { // adjust adjusts configs func (c *Config) adjust() error { + // MasterAddr's format may be "host:port" or ":port" + _, _, err := net.SplitHostPort(c.MasterAddr) + if err != nil { + return terror.ErrMasterHostPortNotValid.Delegate(err, c.MasterAddr) + } + c.DeployMap = make(map[string]string) for _, item := range c.Deploy { - if err := item.Verify(); err != nil { + if err = item.Verify(); err != nil { return err } @@ -202,7 +232,37 @@ func (c *Config) adjust() error { c.RPCRateBurst = DefaultBurst } - return nil + if c.Name == "" { + var hostname string + hostname, err = os.Hostname() + if err != nil { + return terror.ErrMasterGetHostnameFail.Delegate(err) + } + c.Name = fmt.Sprintf("%s-%s", defaultNamePrefix, hostname) + } + + if c.DataDir == "" { + c.DataDir = fmt.Sprintf("%s.%s", defaultDataDirPrefix, c.Name) + } + + if c.PeerUrls == "" { + c.PeerUrls = defaultPeerUrls + } + + if c.AdvertisePeerUrls == "" { + c.AdvertisePeerUrls = defaultPeerUrls + } + + if c.InitialCluster == "" { + items := strings.Split(c.AdvertisePeerUrls, ",") + for i, item := range items { + items[i] = fmt.Sprintf("%s=%s", c.Name, item) + } + c.InitialCluster = strings.Join(items, ",") + } + + _, err = c.genEmbedEtcdConfig() // verify embed etcd config + return err } // UpdateConfigFile write config to local file @@ -228,3 +288,62 @@ func (c *Config) Reload() error { return c.adjust() } + +// genEmbedEtcdConfig generates the configuration needed by embed etcd. +func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) { + cfg := embed.NewConfig() + cfg.Name = c.Name + cfg.Dir = c.DataDir + + // reuse the previous master-addr as the client listening URL. + cURL, err := parseURLs(c.MasterAddr) + if err != nil { + return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid master-addr") + } + cfg.LCUrls = cURL + cfg.ACUrls = cURL + + cfg.LPUrls, err = parseURLs(c.PeerUrls) + if err != nil { + return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid peer-urls") + } + + cfg.APUrls, err = parseURLs(c.AdvertisePeerUrls) + if err != nil { + return nil, terror.ErrMasterGenEmbedEtcdConfigFail.Delegate(err, "invalid advertise-peer-urls") + } + + cfg.InitialCluster = c.InitialCluster + + return cfg, nil +} + +// parseURLs parse a string into multiple urls. +// if the URL in the string without protocol scheme, use `http` as the default. +// if no IP exists in the address, `0.0.0.0` is used. +func parseURLs(s string) ([]url.URL, error) { + if s == "" { + return nil, nil + } + + items := strings.Split(s, ",") + urls := make([]url.URL, 0, len(items)) + for _, item := range items { + u, err := url.Parse(item) + // tolerate valid `master-addr`, but invalid URL format, like: + // `:8261`: missing protocol scheme + // `127.0.0.1:8261`: first path segment in URL cannot contain colon + if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") || + strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) { + u, err = url.Parse("http://" + item) + } + if err != nil { + return nil, terror.ErrMasterParseURLFail.Delegate(err, item) + } + if strings.Index(u.Host, ":") == 0 { + u.Host = "0.0.0.0" + u.Host + } + urls = append(urls, *u) + } + return urls, nil +} diff --git a/dm/master/config_test.go b/dm/master/config_test.go index 543a74335c..553c400fb6 100644 --- a/dm/master/config_test.go +++ b/dm/master/config_test.go @@ -18,18 +18,26 @@ import ( "flag" "fmt" "io/ioutil" + "net/url" + "os" "path" "strings" capturer "github.com/kami-zh/go-capturer" "github.com/pingcap/check" + + "github.com/pingcap/dm/pkg/terror" ) var ( defaultConfigFile = "./dm-master.toml" + _ = check.Suite(&testConfigSuite{}) ) -func (t *testMaster) TestPrintSampleConfig(c *check.C) { +type testConfigSuite struct { +} + +func (t *testConfigSuite) TestPrintSampleConfig(c *check.C) { var ( buf []byte err error @@ -65,12 +73,17 @@ func (t *testMaster) TestPrintSampleConfig(c *check.C) { c.Assert(strings.TrimSpace(out), check.Matches, "base64 decode config error:.*") } -func (t *testMaster) TestConfig(c *check.C) { +func (t *testConfigSuite) TestConfig(c *check.C) { var ( - err error - cfg = &Config{} - masterAddr = ":8261" - deployMap = map[string]string{ + err error + cfg = &Config{} + masterAddr = ":8261" + name = "dm-master" + dataDir = "default.dm-master" + peerURLs = "http://127.0.0.1:8291" + advertisePeerURLs = "http://127.0.0.1:8291" + initialCluster = "dm-master=http://127.0.0.1:8291" + deployMap = map[string]string{ "mysql-replica-01": "172.16.10.72:8262", "mysql-replica-02": "172.16.10.73:8262", } @@ -115,13 +128,18 @@ func (t *testMaster) TestConfig(c *check.C) { c.Assert(err, check.ErrorMatches, tc.errorReg) } else { c.Assert(cfg.MasterAddr, check.Equals, masterAddr) + c.Assert(cfg.Name, check.Equals, name) + c.Assert(cfg.DataDir, check.Equals, dataDir) + c.Assert(cfg.PeerUrls, check.Equals, peerURLs) + c.Assert(cfg.AdvertisePeerUrls, check.Equals, advertisePeerURLs) + c.Assert(cfg.InitialCluster, check.Equals, initialCluster) c.Assert(cfg.DeployMap, check.DeepEquals, deployMap) c.Assert(cfg.String(), check.Matches, fmt.Sprintf("{.*master-addr\":\"%s\".*}", masterAddr)) } } } -func (t *testMaster) TestUpdateConfigFile(c *check.C) { +func (t *testConfigSuite) TestUpdateConfigFile(c *check.C) { var ( err error content []byte @@ -153,7 +171,7 @@ func (t *testMaster) TestUpdateConfigFile(c *check.C) { c.Assert(newContent, check.DeepEquals, content) } -func (t *testMaster) TestInvalidConfig(c *check.C) { +func (t *testConfigSuite) TestInvalidConfig(c *check.C) { var ( err error cfg = NewConfig() @@ -194,4 +212,120 @@ dm-worker = "172.16.10.72:8262"`) err = cfg.configFromFile(filepath2) c.Assert(err, check.NotNil) c.Assert(err, check.ErrorMatches, "*master config contained unknown configuration options: aaa*") + + // invalid `master-addr` + filepath3 := path.Join(c.MkDir(), "test_invalid_config.toml") + configContent3 := []byte(`master-addr = ""`) + err = ioutil.WriteFile(filepath3, configContent3, 0644) + err = cfg.configFromFile(filepath3) + c.Assert(err, check.IsNil) + c.Assert(terror.ErrMasterHostPortNotValid.Equal(cfg.adjust()), check.IsTrue) +} + +func (t *testConfigSuite) TestGenEmbedEtcdConfig(c *check.C) { + hostname, err := os.Hostname() + c.Assert(err, check.IsNil) + + cfg1 := NewConfig() + cfg1.MasterAddr = ":8261" + c.Assert(cfg1.adjust(), check.IsNil) + etcdCfg, err := cfg1.genEmbedEtcdConfig() + c.Assert(err, check.IsNil) + c.Assert(etcdCfg.Name, check.Equals, fmt.Sprintf("dm-master-%s", hostname)) + c.Assert(etcdCfg.Dir, check.Equals, fmt.Sprintf("default.%s", etcdCfg.Name)) + c.Assert(etcdCfg.LCUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "0.0.0.0:8261"}}) + c.Assert(etcdCfg.ACUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "0.0.0.0:8261"}}) + c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}) + c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}) + c.Assert(etcdCfg.InitialCluster, check.DeepEquals, fmt.Sprintf("dm-master-%s=http://127.0.0.1:8291", hostname)) + + cfg2 := *cfg1 + cfg2.MasterAddr = "127.0.0.1\n:8261" + _, err = cfg2.genEmbedEtcdConfig() + c.Assert(terror.ErrMasterGenEmbedEtcdConfigFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, "(?m).*invalid master-addr.*") + cfg2.MasterAddr = "172.100.8.8:8261" + etcdCfg, err = cfg2.genEmbedEtcdConfig() + c.Assert(err, check.IsNil) + c.Assert(etcdCfg.LCUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "172.100.8.8:8261"}}) + c.Assert(etcdCfg.ACUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "172.100.8.8:8261"}}) + + cfg3 := *cfg1 + cfg3.PeerUrls = "127.0.0.1:\n8291" + _, err = cfg3.genEmbedEtcdConfig() + c.Assert(terror.ErrMasterGenEmbedEtcdConfigFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, "(?m).*invalid peer-urls.*") + cfg3.PeerUrls = "http://172.100.8.8:8291" + etcdCfg, err = cfg3.genEmbedEtcdConfig() + c.Assert(err, check.IsNil) + c.Assert(etcdCfg.LPUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "172.100.8.8:8291"}}) + + cfg4 := *cfg1 + cfg4.AdvertisePeerUrls = "127.0.0.1:\n8291" + _, err = cfg4.genEmbedEtcdConfig() + c.Assert(terror.ErrMasterGenEmbedEtcdConfigFail.Equal(err), check.IsTrue) + c.Assert(err, check.ErrorMatches, "(?m).*invalid advertise-peer-urls.*") + cfg4.AdvertisePeerUrls = "http://172.100.8.8:8291" + etcdCfg, err = cfg4.genEmbedEtcdConfig() + c.Assert(err, check.IsNil) + c.Assert(etcdCfg.APUrls, check.DeepEquals, []url.URL{{Scheme: "http", Host: "172.100.8.8:8291"}}) +} + +func (t *testConfigSuite) TestParseURLs(c *check.C) { + cases := []struct { + str string + urls []url.URL + hasErr bool + }{ + {}, // empty str + { + str: "http://127.0.0.1:8291", + urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}, + }, + { + str: "http://127.0.0.1:8291,http://127.0.0.1:18291", + urls: []url.URL{ + {Scheme: "http", Host: "127.0.0.1:8291"}, + {Scheme: "http", Host: "127.0.0.1:18291"}, + }, + }, + { + str: "127.0.0.1:8291", // no scheme + urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}}, + }, + { + str: "http://:8291", // no IP + urls: []url.URL{{Scheme: "http", Host: "0.0.0.0:8291"}}, + }, + { + str: ":8291", // no scheme, no IP + urls: []url.URL{{Scheme: "http", Host: "0.0.0.0:8291"}}, + }, + { + str: "http://", // no IP, no port + urls: []url.URL{{Scheme: "http", Host: ""}}, + }, + { + str: "http://\n127.0.0.1:8291", // invalid char in URL + hasErr: true, + }, + { + str: ":8291,http://127.0.0.1:18291", + urls: []url.URL{ + {Scheme: "http", Host: "0.0.0.0:8291"}, + {Scheme: "http", Host: "127.0.0.1:18291"}, + }, + }, + } + + for _, cs := range cases { + c.Logf("raw string %s", cs.str) + urls, err := parseURLs(cs.str) + if cs.hasErr { + c.Assert(terror.ErrMasterParseURLFail.Equal(err), check.IsTrue) + } else { + c.Assert(err, check.IsNil) + c.Assert(urls, check.DeepEquals, cs.urls) + } + } } diff --git a/dm/master/dm-master.toml b/dm/master/dm-master.toml index e63ea190b6..bcd08b20a3 100644 --- a/dm/master/dm-master.toml +++ b/dm/master/dm-master.toml @@ -1,5 +1,27 @@ # Master Configuration. +# log configuration +log-level = "info" +log-file = "dm-master.log" + +# dm-master listen address +master-addr = ":8261" + +# human-readable name for this DM-master member +name = "dm-master" + +# path to the data directory (default 'default.${name}') +data-dir = "default.dm-master" + +# URLs for peer traffic +peer-urls = "http://127.0.0.1:8291" + +# advertise URLs for peer traffic (default '${peer-urls}') +advertise-peer-urls = "http://127.0.0.1:8291" + +# initial cluster configuration for bootstrapping, e,g. dm-master=http://127.0.0.1:8291 +initial-cluster = "dm-master=http://127.0.0.1:8291" + # rpc configuration # # rpc timeout is a positive number plus time unit. we use golang standard time @@ -14,13 +36,6 @@ rpc-timeout = "30s" rpc-rate-limit = 10.0 rpc-rate-burst = 40 -#log configuration -log-level = "info" -log-file = "dm-master.log" - -#dm-master listen address -master-addr = ":8261" - # replication group <-> dm-Worker deployment, we'll refine it when new deployment function is available [[deploy]] source-id = "mysql-replica-01" diff --git a/dm/master/etcd.go b/dm/master/etcd.go new file mode 100644 index 0000000000..398b3be1cc --- /dev/null +++ b/dm/master/etcd.go @@ -0,0 +1,61 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "net/http" + "time" + + "go.etcd.io/etcd/embed" + "google.golang.org/grpc" + + "github.com/pingcap/dm/pkg/terror" +) + +const ( + // time waiting for etcd to be started + etcdStartTimeout = time.Minute +) + +// startEtcd starts an embedded etcd server. +func startEtcd(masterCfg *Config, + gRPCSvr func(*grpc.Server), + httpHandles map[string]http.Handler) (*embed.Etcd, error) { + cfg, err := masterCfg.genEmbedEtcdConfig() + if err != nil { + return nil, err + } + + // attach extra gRPC and HTTP server + if gRPCSvr != nil { + cfg.ServiceRegister = gRPCSvr + } + if httpHandles != nil { + cfg.UserHandlers = httpHandles + } + + e, err := embed.StartEtcd(cfg) + if err != nil { + return nil, terror.ErrMasterStartEmbedEtcdFail.Delegate(err) + } + + select { + case <-e.Server.ReadyNotify(): + case <-time.After(etcdStartTimeout): + e.Server.Stop() + e.Close() + return nil, terror.ErrMasterStartEmbedEtcdFail.Generatef("start embed etcd timeout %v", etcdStartTimeout) + } + return e, nil +} diff --git a/dm/master/http_handler.go b/dm/master/http_handler.go new file mode 100644 index 0000000000..ffb1ddddfc --- /dev/null +++ b/dm/master/http_handler.go @@ -0,0 +1,74 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "context" + "net/http" + "net/http/pprof" + + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "google.golang.org/grpc" + + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" +) + +// statusHandler handles process status. +type statusHandler struct { +} + +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "text/plain") + text := utils.GetRawInfo() + if _, err := w.Write([]byte(text)); err != nil { + log.L().Error("write status response", log.ShortError(err)) + } +} + +// getStatusHandle returns a HTTP handler to handle process status. +func getStatusHandle() http.Handler { + return &statusHandler{} +} + +// getHTTPAPIHandler returns a HTTP handler to handle DM-master APIs. +func getHTTPAPIHandler(ctx context.Context, addr string) (http.Handler, error) { + // dial the real API server in non-blocking mode, it may not started yet. + opts := []grpc.DialOption{grpc.WithInsecure()} + // NOTE: should we need to replace `host` in `addr` to `127.0.0.1`? + conn, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + return nil, terror.ErrMasterHandleHTTPApis.Delegate(err) + } + + gwmux := runtime.NewServeMux() + err = pb.RegisterMasterHandler(ctx, gwmux, conn) + if err != nil { + return nil, terror.ErrMasterHandleHTTPApis.Delegate(err) + } + return gwmux, nil +} + +// getDebugHandler returns a HTTP handler to handle debug information. +func getDebugHandler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + return mux +} diff --git a/dm/master/server.go b/dm/master/server.go index d928c9f298..58ef63a518 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -17,22 +17,19 @@ import ( "context" "fmt" "io" - "net" "net/http" "sort" "strings" "sync" "time" - "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/pingcap/errors" "github.com/siddontang/go/sync2" - "github.com/soheilhy/cmux" + "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" "github.com/pingcap/dm/checker" - "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/master/sql-operator" "github.com/pingcap/dm/dm/master/workerrpc" @@ -45,7 +42,6 @@ import ( var ( fetchDDLInfoRetryTimeout = 5 * time.Second - cmuxReadTimeout = 10 * time.Second ) // Server handles RPC requests for dm-master @@ -54,8 +50,11 @@ type Server struct { cfg *Config - rootLis net.Listener - svr *grpc.Server + // the embed etcd server, and the gRPC/HTTP API server also attached to it. + etcd *embed.Etcd + + // WaitGroup for background functions. + bgFunWg sync.WaitGroup // dm-worker-ID(host:ip) -> dm-worker client management workerClients map[string]workerrpc.Client @@ -95,43 +94,55 @@ func NewServer(cfg *Config) *Server { } // Start starts to serving -func (s *Server) Start() error { - var err error +func (s *Server) Start(ctx context.Context) (err error) { + // create clients to DM-workers + for _, workerAddr := range s.cfg.DeployMap { + s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr) + if err != nil { + return + } + } - _, _, err = s.splitHostPort() + // get an HTTP to gRPC API handler. + apiHandler, err := getHTTPAPIHandler(ctx, s.cfg.MasterAddr) if err != nil { - return err + return } - s.rootLis, err = net.Listen("tcp", s.cfg.MasterAddr) - if err != nil { - return terror.ErrMasterStartService.Delegate(err) + // HTTP handlers on etcd's client IP:port + // no `metrics` for DM-master now, add it later. + // NOTE: after received any HTTP request from chrome browser, + // the server may be blocked when closing sometime. + // And any request to etcd's builtin handler has the same problem. + // And curl or safari browser does trigger this problem. + // But I haven't figured it out. + // (maybe more requests are sent from chrome or its extensions). + userHandles := map[string]http.Handler{ + "/apis/": apiHandler, + "/status": getStatusHandle(), + "/debug/": getDebugHandler(), } - for _, workerAddr := range s.cfg.DeployMap { - s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr) - if err != nil { - return err - } + // gRPC API server + gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) } + + // start embed etcd server, gRPC API server and HTTP (API, status and debug) server. + s.etcd, err = startEtcd(s.cfg, gRPCSvr, userHandles) + if err != nil { + return } - s.closed.Set(false) - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - defer func() { - cancel() - wg.Wait() - }() + s.closed.Set(false) // the server started now. - wg.Add(1) + s.bgFunWg.Add(1) go func() { - defer wg.Done() + defer s.bgFunWg.Done() s.ap.Start(ctx) }() - wg.Add(1) + s.bgFunWg.Add(1) go func() { - defer wg.Done() + defer s.bgFunWg.Done() select { case <-ctx.Done(): return @@ -141,59 +152,15 @@ func (s *Server) Start() error { } }() - wg.Add(1) + s.bgFunWg.Add(1) go func() { - defer wg.Done() + defer s.bgFunWg.Done() // fetch DDL info from dm-workers to sync sharding DDL s.fetchWorkerDDLInfo(ctx) }() - // create a cmux - m := cmux.New(s.rootLis) - m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352 - - // match connections in order: first gRPC, then HTTP - grpcL := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - httpL := m.Match(cmux.HTTP1Fast()) - - s.svr = grpc.NewServer() - pb.RegisterMasterServer(s.svr, s) - go func() { - err2 := s.svr.Serve(grpcL) - if err2 != nil && !common.IsErrNetClosing(err2) && err2 != cmux.ErrListenerClosed { - log.L().Error("fail to start gRPC server", zap.Error(err2)) - } - }() - - httpmux := http.NewServeMux() - HandleStatus(httpmux) // serve status - - err = s.HandleHTTPApis(ctx, httpmux) // server http api - if err != nil { - return err - } - - wg.Add(1) - go func() { - defer wg.Done() - - httpS := &http.Server{ - Handler: httpmux, - } - - err3 := httpS.Serve(httpL) - if err3 != nil && !common.IsErrNetClosing(err3) && err3 != http.ErrServerClosed { - log.L().Error("run http server", log.ShortError(err3)) - } - }() - log.L().Info("listening gRPC API and status request", zap.String("address", s.cfg.MasterAddr)) - err = m.Serve() // start serving, block - if err != nil && common.IsErrNetClosing(err) { - err = nil - } - - return err + return } // Close close the RPC server, this function can be called multiple times @@ -203,14 +170,14 @@ func (s *Server) Close() { if s.closed.Get() { return } - if s.rootLis != nil { - err := s.rootLis.Close() - if err != nil && !common.IsErrNetClosing(err) { - log.L().Error("close net listener", zap.Error(err)) - } - } - if s.svr != nil { - s.svr.GracefulStop() + log.L().Info("closing server") + + // wait for background functions returned + s.bgFunWg.Wait() + + // close the etcd and other attached servers + if s.etcd != nil { + s.etcd.Close() } s.closed.Set(true) } @@ -1999,36 +1966,3 @@ func (s *Server) workerArgsExtractor(args ...interface{}) (workerrpc.Client, str return cli, worker, nil } - -// HandleHTTPApis handles http apis and translate to grpc request -func (s *Server) HandleHTTPApis(ctx context.Context, mux *http.ServeMux) error { - // MasterAddr's format may be "host:port" or "":port" - _, port, err := s.splitHostPort() - if err != nil { - return err - } - - opts := []grpc.DialOption{grpc.WithInsecure()} - conn, err := grpc.DialContext(ctx, "127.0.0.1:"+port, opts...) - if err != nil { - return terror.ErrMasterHandleHTTPApis.Delegate(err) - } - - gwmux := runtime.NewServeMux() - err = pb.RegisterMasterHandler(ctx, gwmux, conn) - if err != nil { - return terror.ErrMasterHandleHTTPApis.Delegate(err) - } - mux.Handle("/apis/", gwmux) - - return nil -} - -func (s *Server) splitHostPort() (host, port string, err error) { - // MasterAddr's format may be "host:port" or ":port" - host, port, err = net.SplitHostPort(s.cfg.MasterAddr) - if err != nil { - err = terror.ErrMasterHostPortNotValid.Delegate(err, s.cfg.MasterAddr) - } - return -} diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 3c335001c1..7e4cedcd69 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -14,6 +14,7 @@ package master import ( + "bytes" "context" "io/ioutil" "net/http" @@ -153,6 +154,7 @@ func testDefaultMasterServer(c *check.C) *Server { cfg := NewConfig() err := cfg.Parse([]string{"-config=./dm-master.toml"}) c.Assert(err, check.IsNil) + cfg.DataDir = c.MkDir() server := NewServer(cfg) go server.ap.Start(context.Background()) @@ -1486,33 +1488,26 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) { func (t *testMaster) TestServer(c *check.C) { cfg := NewConfig() c.Assert(cfg.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) + cfg.DataDir = c.MkDir() + cfg.MasterAddr = "127.0.0.1:18261" // use a different port s := NewServer(cfg) - masterAddr := cfg.MasterAddr - s.cfg.MasterAddr = "" - err := s.Start() - c.Assert(terror.ErrMasterHostPortNotValid.Equal(err), check.IsTrue) - s.Close() - s.cfg.MasterAddr = masterAddr - - go func() { - err1 := s.Start() - c.Assert(err1, check.IsNil) - }() - - c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return !s.closed.Get() - }), check.IsTrue) + ctx, cancel := context.WithCancel(context.Background()) + err1 := s.Start(ctx) + c.Assert(err1, check.IsNil) - t.testHTTPInterface(c, "status") + t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.MasterAddr), []byte(utils.GetRawInfo())) + t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.MasterAddr), []byte("Types of profiles available")) + t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no workers or not exist")) dupServer := NewServer(cfg) - err = dupServer.Start() - c.Assert(terror.ErrMasterStartService.Equal(err), check.IsTrue) + err := dupServer.Start(ctx) + c.Assert(terror.ErrMasterStartEmbedEtcdFail.Equal(err), check.IsTrue) c.Assert(err.Error(), check.Matches, ".*bind: address already in use") // close + cancel() s.Close() c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool { @@ -1520,11 +1515,13 @@ func (t *testMaster) TestServer(c *check.C) { }), check.IsTrue) } -func (t *testMaster) testHTTPInterface(c *check.C, uri string) { - resp, err := http.Get("http://127.0.0.1:8261/" + uri) +func (t *testMaster) testHTTPInterface(c *check.C, url string, contain []byte) { + resp, err := http.Get(url) c.Assert(err, check.IsNil) defer resp.Body.Close() c.Assert(resp.StatusCode, check.Equals, 200) - _, err = ioutil.ReadAll(resp.Body) + + body, err := ioutil.ReadAll(resp.Body) c.Assert(err, check.IsNil) + c.Assert(bytes.Contains(body, contain), check.IsTrue) } diff --git a/dm/master/status.go b/dm/master/status.go deleted file mode 100644 index 24a144bc81..0000000000 --- a/dm/master/status.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package master - -import ( - "net/http" - "net/http/pprof" - - "github.com/pingcap/dm/dm/common" - "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/utils" -) - -type statusHandler struct { -} - -func (h *statusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "text/plain") - text := utils.GetRawInfo() - _, err := w.Write([]byte(text)) - if err != nil && !common.IsErrNetClosing(err) { - log.L().Error("write status response", log.ShortError(err)) - } -} - -// HandleStatus handles functions for getting status by HTTP request -func HandleStatus(mux *http.ServeMux) { - mux.Handle("/status", &statusHandler{}) - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) -} diff --git a/dm/master/status_test.go b/dm/master/status_test.go deleted file mode 100644 index b3b5300b78..0000000000 --- a/dm/master/status_test.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package master - -import ( - "fmt" - "io/ioutil" - "net/http" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -var _ = check.Suite(&testHTTPServer{}) - -type testHTTPServer struct { - server *Server - cfg *Config -} - -func (t *testHTTPServer) startServer(c *check.C) { - t.cfg = NewConfig() - t.cfg.MasterAddr = ":8261" - t.cfg.RPCRateLimit = DefaultRate - t.cfg.RPCRateBurst = DefaultBurst - - t.server = NewServer(t.cfg) - go func() { - err := t.server.Start() - c.Assert(err, check.IsNil) - }() - - err := t.waitUntilServerOnline() - c.Assert(err, check.IsNil) -} - -func (t *testHTTPServer) stopServer(c *check.C) { - if t.server != nil { - t.server.Close() - } -} - -const retryTime = 100 - -func (t *testHTTPServer) waitUntilServerOnline() error { - statusURL := fmt.Sprintf("http://127.0.0.1%s/status", t.cfg.MasterAddr) - for i := 0; i < retryTime; i++ { - resp, err := http.Get(statusURL) - if err == nil && resp.StatusCode == http.StatusOK { - ioutil.ReadAll(resp.Body) - resp.Body.Close() - return nil - } - time.Sleep(time.Millisecond * 10) - } - return errors.Errorf("failed to connect http status for %d retries in every 10ms", retryTime) -} - -func (t *testHTTPServer) TestStatus(c *check.C) { - t.startServer(c) - defer t.stopServer(c) - - statusURL := fmt.Sprintf("http://127.0.0.1%s/status", t.cfg.MasterAddr) - resp, err := http.Get(statusURL) - c.Assert(err, check.IsNil) - c.Assert(resp.StatusCode, check.Equals, http.StatusOK) - buf, err2 := ioutil.ReadAll(resp.Body) - c.Assert(err2, check.IsNil) - status := string(buf) - c.Assert(status, check.Matches, "Release Version:.*\nGit Commit Hash:.*\nGit Branch:.*\nUTC Build Time:.*\nGo Version:.*\n") -} diff --git a/go.mod b/go.mod index f837bbff3b..d734b235fa 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect github.com/uber/jaeger-lib v2.0.0+incompatible // indirect github.com/unrolled/render v1.0.1 // indirect - go.etcd.io/etcd v3.3.15+incompatible // indirect + go.etcd.io/etcd v3.3.15+incompatible go.uber.org/atomic v1.4.0 // indirect go.uber.org/zap v1.10.0 golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc // indirect diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index c8317f3515..bbc29d3396 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -381,6 +381,10 @@ const ( codeMasterOperRequestTimeout codeMasterHandleHTTPApis codeMasterHostPortNotValid + codeMasterGetHostnameFail + codeMasterGenEmbedEtcdConfigFail + codeMasterStartEmbedEtcdFail + codeMasterParseURLFail ) // DM-worker error code @@ -783,41 +787,45 @@ var ( ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync") // DM-master error - ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") - ErrMasterSQLOpNotSupport = New(codeMasterSQLOpNotSupport, ClassDMMaster, ScopeInternal, LevelMedium, "op %s not supported") - ErrMasterSQLOpWithoutSharding = New(codeMasterSQLOpWithoutSharding, ClassDMMaster, ScopeInternal, LevelMedium, "operate request without --sharding specified not valid") - ErrMasterGRPCCreateConn = New(codeMasterGRPCCreateConn, ClassDMMaster, ScopeInternal, LevelHigh, "create grpc connection") - ErrMasterGRPCSendOnCloseConn = New(codeMasterGRPCSendOnCloseConn, ClassDMMaster, ScopeInternal, LevelHigh, "send request on a closed client") - ErrMasterGRPCClientClose = New(codeMasterGRPCClientClose, ClassDMMaster, ScopeInternal, LevelHigh, "close rpc client") - ErrMasterGRPCInvalidReqType = New(codeMasterGRPCInvalidReqType, ClassDMMaster, ScopeInternal, LevelHigh, "invalid request type: %v") - ErrMasterGRPCRequestError = New(codeMasterGRPCRequestError, ClassDMMaster, ScopeInternal, LevelHigh, "grpc request error") - ErrMasterDeployMapperVerify = New(codeMasterDeployMapperVerify, ClassDMMaster, ScopeInternal, LevelHigh, "user should specify valid relation between source(mysql/mariadb) and dm-worker, config %+v not valid") - ErrMasterConfigParseFlagSet = New(codeMasterConfigParseFlagSet, ClassDMMaster, ScopeInternal, LevelMedium, "parse config flag set") - ErrMasterConfigUnknownItem = New(codeMasterConfigUnknownItem, ClassDMMaster, ScopeInternal, LevelMedium, "master config contained unknown configuration options: %s") - ErrMasterConfigInvalidFlag = New(codeMasterConfigInvalidFlag, ClassDMMaster, ScopeInternal, LevelMedium, "'%s' is an invalid flag") - ErrMasterConfigTomlTransform = New(codeMasterConfigTomlTransform, ClassDMMaster, ScopeInternal, LevelMedium, "config toml transform") - ErrMasterConfigTimeoutParse = New(codeMasterConfigTimeoutParse, ClassDMMaster, ScopeInternal, LevelMedium, "parse rpc timeout str") - ErrMasterConfigUpdateCfgFile = New(codeMasterConfigUpdateCfgFile, ClassDMMaster, ScopeInternal, LevelHigh, "update config file") - ErrMasterShardingDDLDiff = New(codeMasterShardingDDLDiff, ClassDMMaster, ScopeInternal, LevelHigh, "sharding ddls in ddl lock %s is different with %s") - ErrMasterStartService = New(codeMasterStartService, ClassDMMaster, ScopeInternal, LevelHigh, "start server") - ErrMasterNoEmitToken = New(codeMasterNoEmitToken, ClassDMMaster, ScopeInternal, LevelHigh, "fail to get emit opportunity for worker %s") - ErrMasterLockNotFound = New(codeMasterLockNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "lock with ID %s not found") - ErrMasterLockIsResolving = New(codeMasterLockIsResolving, ClassDMMaster, ScopeInternal, LevelHigh, "lock %s is resolving") - ErrMasterWorkerCliNotFound = New(codeMasterWorkerCliNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s relevant worker-client not found") - ErrMasterWorkerNotWaitLock = New(codeMasterWorkerNotWaitLock, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s not waiting for DDL lock %s") - ErrMasterHandleSQLReqFail = New(codeMasterHandleSQLReqFail, ClassDMMaster, ScopeInternal, LevelHigh, "request DDL lock %s owner %s handle SQLs request %s fail %s") - ErrMasterOwnerExecDDL = New(codeMasterOwnerExecDDL, ClassDMMaster, ScopeInternal, LevelHigh, "owner %s ExecuteDDL fail") - ErrMasterPartWorkerExecDDLFail = New(codeMasterPartWorkerExecDDLFail, ClassDMMaster, ScopeInternal, LevelHigh, "DDL lock %s owner ExecuteDDL successfully, so DDL lock removed. but some dm-workers ExecuteDDL fail, you should to handle dm-worker directly") - ErrMasterWorkerExistDDLLock = New(codeMasterWorkerExistDDLLock, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s exist ddl lock, please unlock ddl lock first") - ErrMasterGetWorkerCfgExtractor = New(codeMasterGetWorkerCfgExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") - ErrMasterTaskConfigExtractor = New(codeMasterTaskConfigExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") - ErrMasterWorkerArgsExtractor = New(codeMasterWorkerArgsExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") - ErrMasterQueryWorkerConfig = New(codeMasterQueryWorkerConfig, ClassDMMaster, ScopeInternal, LevelHigh, "") - ErrMasterOperNotFound = New(codeMasterOperNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "operation %d of task %s not found, please execute `query-status` to check status") - ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "operation not success: %s") - ErrMasterOperRequestTimeout = New(codeMasterOperRequestTimeout, ClassDMMaster, ScopeInternal, LevelHigh, "request is timeout, but request may be successful, please execute `query-status` to check status") - ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc") - ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid") + ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") + ErrMasterSQLOpNotSupport = New(codeMasterSQLOpNotSupport, ClassDMMaster, ScopeInternal, LevelMedium, "op %s not supported") + ErrMasterSQLOpWithoutSharding = New(codeMasterSQLOpWithoutSharding, ClassDMMaster, ScopeInternal, LevelMedium, "operate request without --sharding specified not valid") + ErrMasterGRPCCreateConn = New(codeMasterGRPCCreateConn, ClassDMMaster, ScopeInternal, LevelHigh, "create grpc connection") + ErrMasterGRPCSendOnCloseConn = New(codeMasterGRPCSendOnCloseConn, ClassDMMaster, ScopeInternal, LevelHigh, "send request on a closed client") + ErrMasterGRPCClientClose = New(codeMasterGRPCClientClose, ClassDMMaster, ScopeInternal, LevelHigh, "close rpc client") + ErrMasterGRPCInvalidReqType = New(codeMasterGRPCInvalidReqType, ClassDMMaster, ScopeInternal, LevelHigh, "invalid request type: %v") + ErrMasterGRPCRequestError = New(codeMasterGRPCRequestError, ClassDMMaster, ScopeInternal, LevelHigh, "grpc request error") + ErrMasterDeployMapperVerify = New(codeMasterDeployMapperVerify, ClassDMMaster, ScopeInternal, LevelHigh, "user should specify valid relation between source(mysql/mariadb) and dm-worker, config %+v not valid") + ErrMasterConfigParseFlagSet = New(codeMasterConfigParseFlagSet, ClassDMMaster, ScopeInternal, LevelMedium, "parse config flag set") + ErrMasterConfigUnknownItem = New(codeMasterConfigUnknownItem, ClassDMMaster, ScopeInternal, LevelMedium, "master config contained unknown configuration options: %s") + ErrMasterConfigInvalidFlag = New(codeMasterConfigInvalidFlag, ClassDMMaster, ScopeInternal, LevelMedium, "'%s' is an invalid flag") + ErrMasterConfigTomlTransform = New(codeMasterConfigTomlTransform, ClassDMMaster, ScopeInternal, LevelMedium, "config toml transform") + ErrMasterConfigTimeoutParse = New(codeMasterConfigTimeoutParse, ClassDMMaster, ScopeInternal, LevelMedium, "parse rpc timeout str") + ErrMasterConfigUpdateCfgFile = New(codeMasterConfigUpdateCfgFile, ClassDMMaster, ScopeInternal, LevelHigh, "update config file") + ErrMasterShardingDDLDiff = New(codeMasterShardingDDLDiff, ClassDMMaster, ScopeInternal, LevelHigh, "sharding ddls in ddl lock %s is different with %s") + ErrMasterStartService = New(codeMasterStartService, ClassDMMaster, ScopeInternal, LevelHigh, "start server") + ErrMasterNoEmitToken = New(codeMasterNoEmitToken, ClassDMMaster, ScopeInternal, LevelHigh, "fail to get emit opportunity for worker %s") + ErrMasterLockNotFound = New(codeMasterLockNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "lock with ID %s not found") + ErrMasterLockIsResolving = New(codeMasterLockIsResolving, ClassDMMaster, ScopeInternal, LevelHigh, "lock %s is resolving") + ErrMasterWorkerCliNotFound = New(codeMasterWorkerCliNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s relevant worker-client not found") + ErrMasterWorkerNotWaitLock = New(codeMasterWorkerNotWaitLock, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s not waiting for DDL lock %s") + ErrMasterHandleSQLReqFail = New(codeMasterHandleSQLReqFail, ClassDMMaster, ScopeInternal, LevelHigh, "request DDL lock %s owner %s handle SQLs request %s fail %s") + ErrMasterOwnerExecDDL = New(codeMasterOwnerExecDDL, ClassDMMaster, ScopeInternal, LevelHigh, "owner %s ExecuteDDL fail") + ErrMasterPartWorkerExecDDLFail = New(codeMasterPartWorkerExecDDLFail, ClassDMMaster, ScopeInternal, LevelHigh, "DDL lock %s owner ExecuteDDL successfully, so DDL lock removed. but some dm-workers ExecuteDDL fail, you should to handle dm-worker directly") + ErrMasterWorkerExistDDLLock = New(codeMasterWorkerExistDDLLock, ClassDMMaster, ScopeInternal, LevelHigh, "worker %s exist ddl lock, please unlock ddl lock first") + ErrMasterGetWorkerCfgExtractor = New(codeMasterGetWorkerCfgExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") + ErrMasterTaskConfigExtractor = New(codeMasterTaskConfigExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") + ErrMasterWorkerArgsExtractor = New(codeMasterWorkerArgsExtractor, ClassDMMaster, ScopeInternal, LevelHigh, "") + ErrMasterQueryWorkerConfig = New(codeMasterQueryWorkerConfig, ClassDMMaster, ScopeInternal, LevelHigh, "") + ErrMasterOperNotFound = New(codeMasterOperNotFound, ClassDMMaster, ScopeInternal, LevelHigh, "operation %d of task %s not found, please execute `query-status` to check status") + ErrMasterOperRespNotSuccess = New(codeMasterOperRespNotSuccess, ClassDMMaster, ScopeInternal, LevelHigh, "operation not success: %s") + ErrMasterOperRequestTimeout = New(codeMasterOperRequestTimeout, ClassDMMaster, ScopeInternal, LevelHigh, "request is timeout, but request may be successful, please execute `query-status` to check status") + ErrMasterHandleHTTPApis = New(codeMasterHandleHTTPApis, ClassDMMaster, ScopeInternal, LevelHigh, "serve http apis to grpc") + ErrMasterHostPortNotValid = New(codeMasterHostPortNotValid, ClassDMMaster, ScopeInternal, LevelHigh, "host:port '%s' not valid") + ErrMasterGetHostnameFail = New(codeMasterGetHostnameFail, ClassDMMaster, ScopeInternal, LevelHigh, "get hostname fail") + ErrMasterGenEmbedEtcdConfigFail = New(codeMasterGenEmbedEtcdConfigFail, ClassDMMaster, ScopeInternal, LevelHigh, "generate config item %s for embed etcd fail") + ErrMasterStartEmbedEtcdFail = New(codeMasterStartEmbedEtcdFail, ClassDMMaster, ScopeInternal, LevelHigh, "start embed etcd fail") + ErrMasterParseURLFail = New(codeMasterParseURLFail, ClassDMMaster, ScopeInternal, LevelHigh, "parse URL %s fail") // DM-worker error ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set") diff --git a/tests/dmctl_basic/conf/dm-master.toml b/tests/dmctl_basic/conf/dm-master.toml index 334e0de993..b08f4d77d5 100644 --- a/tests/dmctl_basic/conf/dm-master.toml +++ b/tests/dmctl_basic/conf/dm-master.toml @@ -1,5 +1,7 @@ # Master Configuration. +master-addr = ":8261" + [[deploy]] source-id = "mysql-replica-01" dm-worker = "127.0.0.1:8262"