From 4b300b1ffe2c85924d68c7cbb3ecbc1cde8b8107 Mon Sep 17 00:00:00 2001 From: caohe Date: Tue, 26 Feb 2019 01:08:08 +0800 Subject: [PATCH 1/4] planner, executor: support SQL `show pump/drainer status` --- executor/show.go | 62 +++++++++++++++++++++++++++++++++++++ planner/core/planbuilder.go | 6 ++++ 2 files changed, 68 insertions(+) diff --git a/executor/show.go b/executor/show.go index 38a505b4d7e13..5efcf19ee464f 100644 --- a/executor/show.go +++ b/executor/show.go @@ -30,6 +30,10 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-tools/pkg/etcd" + "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/tidb-tools/tidb-binlog/node" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/plugin" @@ -44,6 +48,8 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) +var etcdDialTimeout = 5 * time.Second + // ShowExec represents a show executor. type ShowExec struct { baseExecutor @@ -115,6 +121,8 @@ func (e *ShowExec) fetchAll() error { return e.fetchShowCreateDatabase() case ast.ShowDatabases: return e.fetchShowDatabases() + case ast.ShowDrainerStatus: + return e.fetchShowDrainerStatus() case ast.ShowEngines: return e.fetchShowEngines() case ast.ShowGrants: @@ -123,6 +131,8 @@ func (e *ShowExec) fetchAll() error { return e.fetchShowIndex() case ast.ShowProcedureStatus: return e.fetchShowProcedureStatus() + case ast.ShowPumpStatus: + return e.fetchShowPumpStatus() case ast.ShowStatus: return e.fetchShowStatus() case ast.ShowTables: @@ -996,6 +1006,58 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { return nil } +// fetchShowPumpStatus gets status of all pumps and fill them into e.rows. +func (e *ShowExec) fetchShowPumpStatus() error { + registry, err := createRegistry(config.GetGlobalConfig().Path) + if err != nil { + return errors.Trace(err) + } + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[node.PumpNode]) + if err != nil { + return errors.Trace(err) + } + + for _, n := range nodes { + e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS)}) + } + + return nil +} + +// fetchShowDrainerStatus gets status of all drainers and fill them into e.rows. +func (e *ShowExec) fetchShowDrainerStatus() error { + registry, err := createRegistry(config.GetGlobalConfig().Path) + if err != nil { + return errors.Trace(err) + } + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[node.DrainerNode]) + if err != nil { + return errors.Trace(err) + } + + for _, n := range nodes { + e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS)}) + } + + return nil +} + +// createRegistry returns an ectd registry +func createRegistry(urls string) (*node.EtcdRegistry, error) { + ectdEndpoints, err := utils.ParseHostPortAddr(urls) + if err != nil { + return nil, errors.Trace(err) + } + cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) + if err != nil { + return nil, errors.Trace(err) + } + + return node.NewEtcdRegistry(cli, etcdDialTimeout), nil +} + func (e *ShowExec) getTable() (table.Table, error) { if e.Table == nil { return nil, errors.New("table not found") diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 7683bd33f1701..81830f19b1aa5 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1755,6 +1755,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"View", "Create View", "character_set_client", "collation_connection"} case ast.ShowCreateDatabase: names = []string{"Database", "Create Database"} + case ast.ShowDrainerStatus: + names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeDatetime} case ast.ShowGrants: if s.User != nil { names = []string{fmt.Sprintf("Grants for %s", s.User)} @@ -1778,6 +1781,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"} ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString} + case ast.ShowPumpStatus: + names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeDatetime} case ast.ShowStatsMeta: names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong} From 4b29ef94867de375b19e74367cc8eed6fe15704c Mon Sep 17 00:00:00 2001 From: caohe Date: Tue, 26 Feb 2019 21:41:35 +0800 Subject: [PATCH 2/4] planner, executor: fix comments --- executor/show.go | 32 +++++++------------------------- planner/core/planbuilder.go | 4 ++-- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/executor/show.go b/executor/show.go index 5efcf19ee464f..6a65ce7cd4cdc 100644 --- a/executor/show.go +++ b/executor/show.go @@ -122,7 +122,7 @@ func (e *ShowExec) fetchAll() error { case ast.ShowDatabases: return e.fetchShowDatabases() case ast.ShowDrainerStatus: - return e.fetchShowDrainerStatus() + return e.fetchShowPumpOrDrainerStatus(node.DrainerNode) case ast.ShowEngines: return e.fetchShowEngines() case ast.ShowGrants: @@ -132,7 +132,7 @@ func (e *ShowExec) fetchAll() error { case ast.ShowProcedureStatus: return e.fetchShowProcedureStatus() case ast.ShowPumpStatus: - return e.fetchShowPumpStatus() + return e.fetchShowPumpOrDrainerStatus(node.PumpNode) case ast.ShowStatus: return e.fetchShowStatus() case ast.ShowTables: @@ -1006,39 +1006,21 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { return nil } -// fetchShowPumpStatus gets status of all pumps and fill them into e.rows. -func (e *ShowExec) fetchShowPumpStatus() error { +// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. +func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { registry, err := createRegistry(config.GetGlobalConfig().Path) if err != nil { return errors.Trace(err) } + defer registry.Close() - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[node.PumpNode]) + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } for _, n := range nodes { - e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS)}) - } - - return nil -} - -// fetchShowDrainerStatus gets status of all drainers and fill them into e.rows. -func (e *ShowExec) fetchShowDrainerStatus() error { - registry, err := createRegistry(config.GetGlobalConfig().Path) - if err != nil { - return errors.Trace(err) - } - - nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[node.DrainerNode]) - if err != nil { - return errors.Trace(err) - } - - for _, n := range nodes { - e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS)}) + e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format("2006-01-02 15:04:05")}) } return nil diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 81830f19b1aa5..3c80b5481b83f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1757,7 +1757,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"Database", "Create Database"} case ast.ShowDrainerStatus: names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} - ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeDatetime} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar} case ast.ShowGrants: if s.User != nil { names = []string{fmt.Sprintf("Grants for %s", s.User)} @@ -1783,7 +1783,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString} case ast.ShowPumpStatus: names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} - ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeDatetime} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar} case ast.ShowStatsMeta: names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong} From 3f0ff8fc55ce0d18856f102773ffd2c6c3688769 Mon Sep 17 00:00:00 2001 From: caohe Date: Tue, 26 Feb 2019 22:03:33 +0800 Subject: [PATCH 3/4] planner, executor: fix errcheck --- executor/show.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executor/show.go b/executor/show.go index 6a65ce7cd4cdc..4ace658f37226 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1012,12 +1012,15 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { if err != nil { return errors.Trace(err) } - defer registry.Close() nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } + err = registry.Close() + if err != nil { + return errors.Trace(err) + } for _, n := range nodes { e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format("2006-01-02 15:04:05")}) From 378e59dd4e30cdedcf983f03599b5f51230ee9df Mon Sep 17 00:00:00 2001 From: caohe Date: Wed, 27 Feb 2019 12:52:21 +0800 Subject: [PATCH 4/4] planner, executor: use types.TimeFormat instead of its value --- executor/show.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/show.go b/executor/show.go index 4ace658f37226..2a053f19dddfb 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1023,7 +1023,7 @@ func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { } for _, n := range nodes { - e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format("2006-01-02 15:04:05")}) + e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format(types.TimeFormat)}) } return nil