diff --git a/executor/show.go b/executor/show.go index 38a505b4d7e13..2a053f19dddfb 100644 --- a/executor/show.go +++ b/executor/show.go @@ -30,6 +30,10 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-tools/pkg/etcd" + "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/tidb-tools/tidb-binlog/node" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/plugin" @@ -44,6 +48,8 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) +var etcdDialTimeout = 5 * time.Second + // ShowExec represents a show executor. type ShowExec struct { baseExecutor @@ -115,6 +121,8 @@ func (e *ShowExec) fetchAll() error { return e.fetchShowCreateDatabase() case ast.ShowDatabases: return e.fetchShowDatabases() + case ast.ShowDrainerStatus: + return e.fetchShowPumpOrDrainerStatus(node.DrainerNode) case ast.ShowEngines: return e.fetchShowEngines() case ast.ShowGrants: @@ -123,6 +131,8 @@ func (e *ShowExec) fetchAll() error { return e.fetchShowIndex() case ast.ShowProcedureStatus: return e.fetchShowProcedureStatus() + case ast.ShowPumpStatus: + return e.fetchShowPumpOrDrainerStatus(node.PumpNode) case ast.ShowStatus: return e.fetchShowStatus() case ast.ShowTables: @@ -996,6 +1006,43 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error { return nil } +// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows. +func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error { + registry, err := createRegistry(config.GetGlobalConfig().Path) + if err != nil { + return errors.Trace(err) + } + + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) + if err != nil { + return errors.Trace(err) + } + err = registry.Close() + if err != nil { + return errors.Trace(err) + } + + for _, n := range nodes { + e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format(types.TimeFormat)}) + } + + return nil +} + +// createRegistry returns an ectd registry +func createRegistry(urls string) (*node.EtcdRegistry, error) { + ectdEndpoints, err := utils.ParseHostPortAddr(urls) + if err != nil { + return nil, errors.Trace(err) + } + cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil) + if err != nil { + return nil, errors.Trace(err) + } + + return node.NewEtcdRegistry(cli, etcdDialTimeout), nil +} + func (e *ShowExec) getTable() (table.Table, error) { if e.Table == nil { return nil, errors.New("table not found") diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 346d853c6e0fe..2bcff254afcd1 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1750,6 +1750,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"View", "Create View", "character_set_client", "collation_connection"} case ast.ShowCreateDatabase: names = []string{"Database", "Create Database"} + case ast.ShowDrainerStatus: + names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar} case ast.ShowGrants: if s.User != nil { names = []string{fmt.Sprintf("Grants for %s", s.User)} @@ -1773,6 +1776,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"} ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString} + case ast.ShowPumpStatus: + names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"} + ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar} case ast.ShowStatsMeta: names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong}