Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support SQL show pump/drainer status #9456

Merged
merged 7 commits into from Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions executor/show.go
Expand Up @@ -30,6 +30,10 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb-tools/pkg/etcd"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
Expand All @@ -44,6 +48,8 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

var etcdDialTimeout = 5 * time.Second

// ShowExec represents a show executor.
type ShowExec struct {
baseExecutor
Expand Down Expand Up @@ -115,6 +121,8 @@ func (e *ShowExec) fetchAll() error {
return e.fetchShowCreateDatabase()
case ast.ShowDatabases:
return e.fetchShowDatabases()
case ast.ShowDrainerStatus:
return e.fetchShowPumpOrDrainerStatus(node.DrainerNode)
case ast.ShowEngines:
return e.fetchShowEngines()
case ast.ShowGrants:
Expand All @@ -123,6 +131,8 @@ func (e *ShowExec) fetchAll() error {
return e.fetchShowIndex()
case ast.ShowProcedureStatus:
return e.fetchShowProcedureStatus()
case ast.ShowPumpStatus:
return e.fetchShowPumpOrDrainerStatus(node.PumpNode)
case ast.ShowStatus:
return e.fetchShowStatus()
case ast.ShowTables:
Expand Down Expand Up @@ -996,6 +1006,43 @@ func (e *ShowExec) fetchShowWarnings(errOnly bool) error {
return nil
}

// fetchShowPumpOrDrainerStatus gets status of all pumps or drainers and fill them into e.rows.
func (e *ShowExec) fetchShowPumpOrDrainerStatus(kind string) error {
registry, err := createRegistry(config.GetGlobalConfig().Path)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
err = registry.Close()
if err != nil {
return errors.Trace(err)
}

for _, n := range nodes {
e.appendRow([]interface{}{n.NodeID, n.Addr, n.State, n.MaxCommitTS, utils.TSOToRoughTime(n.UpdateTS).Format("2006-01-02 15:04:05")})
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
ectdEndpoints, err := utils.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
}
cli, err := etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
if err != nil {
return nil, errors.Trace(err)
}

return node.NewEtcdRegistry(cli, etcdDialTimeout), nil
}

func (e *ShowExec) getTable() (table.Table, error) {
if e.Table == nil {
return nil, errors.New("table not found")
Expand Down
6 changes: 6 additions & 0 deletions planner/core/planbuilder.go
Expand Up @@ -1755,6 +1755,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"View", "Create View", "character_set_client", "collation_connection"}
case ast.ShowCreateDatabase:
names = []string{"Database", "Create Database"}
case ast.ShowDrainerStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
case ast.ShowGrants:
if s.User != nil {
names = []string{fmt.Sprintf("Grants for %s", s.User)}
Expand All @@ -1778,6 +1781,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) {
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowPumpStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
case ast.ShowStatsMeta:
names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong}
Expand Down