diff --git a/.gitignore b/.gitignore index 7d214f0..dcf056b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ bin/ .admin-cli-history pegasus.log +.idea shell.log diff --git a/cmd/disk_balance.go b/cmd/disk_balance.go new file mode 100644 index 0000000..df260e2 --- /dev/null +++ b/cmd/disk_balance.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "admin-cli/executor" + "admin-cli/shell" + + "github.com/desertbit/grumble" +) + +func init() { + shell.AddCommand(&grumble.Command{ + Name: "disk-migrate", + Help: "migrate replica between the two disks within a specified ReplicaServer node", + Flags: func(f *grumble.Flags) { + /*define the flags*/ + f.String("g", "gpid", "", "gpid, for example, '2.1'") + f.String("n", "node", "", "target node, for example, 127.0.0.1:34801") + f.String("f", "from", "", "origin disk tag, for example, ssd1") + f.String("t", "to", "", "target disk tag, for example, ssd2") + }, + Run: func(c *grumble.Context) error { + return executor.DiskMigrate( + pegasusClient, + c.Flags.String("node"), + c.Flags.String("gpid"), + c.Flags.String("from"), + c.Flags.String("to")) + }, + }) + + // TODO(jiashuo1) need generate migrate strategy(step) depends the disk-info result to run + shell.AddCommand(&grumble.Command{ + Name: "disk-balance", + Help: "migrate replica automatically to let the disk capacity balance within a specified ReplicaServer node", + Flags: func(f *grumble.Flags) { + /*define the flags*/ + }, + Run: func(c *grumble.Context) error { + return executor.DiskBalance() + }, + }) + +} diff --git a/cmd/disk_info.go b/cmd/disk_info.go new file mode 100644 index 0000000..5bb92b4 --- /dev/null +++ b/cmd/disk_info.go @@ -0,0 +1,51 @@ +package cmd + +import ( + "admin-cli/executor" + "admin-cli/shell" + + "github.com/desertbit/grumble" +) + +func init() { + shell.AddCommand(&grumble.Command{ + Name: "disk-capacity", + Help: "query disk capacity info ", + Flags: func(f *grumble.Flags) { + /*define the flags*/ + f.Bool("r", "resolve", false, "resolve input or output address, default false") + f.Bool("j", "json", false, "use JSON as the format of the output results. By default tabular format is used.") + f.String("n", "node", "", "node address(ip:port), for example, 127.0.0.1:34801") + f.String("d", "disk", "", "disk tag, for example, ssd1") + f.String("a", "app", "", "app name, for example, temp") + }, + Run: func(c *grumble.Context) error { + return executor.QueryDiskInfo( + pegasusClient, + executor.CapacitySize, + c.Flags.String("node"), + c.Flags.String("app"), + c.Flags.String("disk")) + }, + }) + + shell.AddCommand(&grumble.Command{ + Name: "disk-replica", + Help: "query disk replica count info", + Flags: func(f *grumble.Flags) { + /*define the flags*/ + f.Bool("r", "resolve", false, "resolve input or output address, default false") + f.Bool("j", "json", false, "use JSON as the format of the output results. By default tabular format is used.") + f.String("n", "node", "", "node address(ip:port), for example, 127.0.0.1:34801") + f.String("a", "app", "", "app name, for example, temp") + }, + Run: func(c *grumble.Context) error { + return executor.QueryDiskInfo( + pegasusClient, + executor.ReplicaCount, + c.Flags.String("node"), + c.Flags.String("app"), + "") + }, + }) +} diff --git a/cmd/disk_migrate.go b/cmd/disk_migrate.go deleted file mode 100644 index 0c860f7..0000000 --- a/cmd/disk_migrate.go +++ /dev/null @@ -1,25 +0,0 @@ -package cmd - -import ( - "admin-cli/shell" - - "github.com/desertbit/grumble" -) - -func init() { - shell.AddCommand(&grumble.Command{ - Name: "disk-migrate", - Help: "migrate replica between the two disks within a specified ReplicaServer node", - Flags: func(f *grumble.Flags) { - /*define the flags*/ - }, - Run: func(c *grumble.Context) error { - return diskMigrate(c) - }, - }) -} - -func diskMigrate(c *grumble.Context) error { - /*c.Flags["flag1"]*/ - return nil -} diff --git a/cmd/list_nodes.go b/cmd/list_nodes.go new file mode 100644 index 0000000..00166ac --- /dev/null +++ b/cmd/list_nodes.go @@ -0,0 +1,24 @@ +package cmd + +import ( + "admin-cli/executor" + "admin-cli/shell" + "github.com/desertbit/grumble" +) + +func init() { + shell.AddCommand(&grumble.Command{ + Name: "list-nodes", + Aliases: []string{"nodes"}, + Help: "list all nodes in the cluster", + Flags: func(f *grumble.Flags) { + f.Bool("d", "detail", false, "show detail replica count in all node") + }, + Run: func(c *grumble.Context) error { + return executor.ListNodes( + pegasusClient, + c.Flags.Bool("detail"), + ) + }, + }) +} diff --git a/cmd/remote_command.go b/cmd/remote_command.go new file mode 100644 index 0000000..6862f03 --- /dev/null +++ b/cmd/remote_command.go @@ -0,0 +1,38 @@ +package cmd + +import ( + "admin-cli/executor" + "github.com/desertbit/grumble" +) + +func init() { + rootCmd := &grumble.Command{ + Name: "remote-command", + Help: "send remote command, for example, remote-command meta or replica", + } + + rootCmd.AddCommand(&grumble.Command{ + Name: "meta", + Help: "send remote command to meta server", + Flags: initFlag, + Run: func(c *grumble.Context) error { + return executor.RemoteCommand(pegasusClient, "meta", c.Flags.String("node"), c.Flags.String("command"), c.Flags.String("arguments")) + }, + }) + + rootCmd.AddCommand(&grumble.Command{ + Name: "replica", + Help: "send remote command to replica server", + Flags: initFlag, + Run: func(c *grumble.Context) error { + return executor.RemoteCommand(pegasusClient, "replica", c.Flags.String("node"), c.Flags.String("command"), c.Flags.String("arguments")) + }, + }) +} + +func initFlag(f *grumble.Flags) { + /*define the flags*/ + f.String("n", "node", "", "specify server node address, such as 127.0.0.1:34801, empty mean all node") + f.String("c", "command", "help", "remote command name, you can -c help to see support command") + f.String("a", "arguments", "", "if empty means query the command argument value, otherwise mean set update value") +} diff --git a/cmd/server_info.go b/cmd/server_info.go new file mode 100644 index 0000000..2d7668b --- /dev/null +++ b/cmd/server_info.go @@ -0,0 +1,19 @@ +package cmd + +import ( + "admin-cli/executor" + "admin-cli/shell" + + "github.com/desertbit/grumble" +) + +func init() { + shell.AddCommand(&grumble.Command{ + Name: "server-info", + Help: "displays the overall server information", + Run: func(c *grumble.Context) error { + return executor.ServerInfo(pegasusClient) + }, + }) + +} diff --git a/executor/client.go b/executor/client.go index d0e36e7..d239a58 100644 --- a/executor/client.go +++ b/executor/client.go @@ -1,9 +1,15 @@ package executor import ( + "context" + "fmt" "io" + "time" + adminClient "github.com/XiaoMi/pegasus-go-client/admin" + "github.com/XiaoMi/pegasus-go-client/idl/admin" "github.com/XiaoMi/pegasus-go-client/session" + "github.com/pegasus-kv/collector/aggregate" ) // Client can access both Pegasus ReplicaServer and MetaServer. @@ -15,14 +21,86 @@ type Client struct { ReplicaPool *session.ReplicaManager MetaAddresses []string + + ReplicaAddresses []string } // NewClient creates a client for accessing Pegasus cluster for use of admin-cli. func NewClient(writer io.Writer, metaAddrs []string) *Client { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + meta := session.NewMetaManager(metaAddrs, session.NewNodeSession) + replicaPool := session.NewReplicaManager(session.NewNodeSession) + + resp, err := meta.ListNodes(ctx, &admin.ListNodesRequest{ + Status: admin.NodeStatus_NS_INVALID, + }) + if err != nil { + return nil + } + + var nodes []string + for _, node := range resp.Infos { + nodes = append(nodes, node.Address.GetAddress()) + } + return &Client{ - Writer: writer, - Meta: session.NewMetaManager(metaAddrs, session.NewNodeSession), - ReplicaPool: session.NewReplicaManager(session.NewNodeSession), - MetaAddresses: metaAddrs, + Writer: writer, + Meta: meta, + ReplicaPool: replicaPool, + MetaAddresses: metaAddrs, + ReplicaAddresses: nodes, + } +} + +func (client *Client) GetReplicaClient(addr string) (*session.ReplicaSession, error) { + err := client.validateReplicaAddress(addr) + if err != nil { + return nil, err + } + return client.ReplicaPool.GetReplica(addr), nil +} + +func (client *Client) GetRemoteCommandClient(addr string, nodeType session.NodeType) (*adminClient.RemoteCmdClient, error) { + switch nodeType { + case session.NodeTypeMeta: + err := client.validateMetaAddress(addr) + if err != nil { + return nil, err + } + case session.NodeTypeReplica: + err := client.validateReplicaAddress(addr) + if err != nil { + return nil, err + } + } + return adminClient.NewRemoteCmdClient(addr, nodeType), nil +} + +func (client *Client) GetPerfCounterClient(addr string) (*aggregate.PerfSession, error) { + err := client.validateReplicaAddress(addr) + if err != nil { + return nil, err + } + return aggregate.NewPerfSession(addr), nil +} + +func (client *Client) validateReplicaAddress(addr string) error { + for _, node := range client.ReplicaAddresses { + if node == addr { + return nil + } + } + return fmt.Errorf("The cluster doesn't exist the replica server node [%s]", addr) +} + +// used for remote_command -t meta +func (client *Client) validateMetaAddress(addr string) error { + for _, meta := range client.MetaAddresses { + if addr == meta { + return nil + } } + return fmt.Errorf("The cluster doesn't exist the meta server node [%s]", addr) } diff --git a/executor/disk_balance.go b/executor/disk_balance.go new file mode 100644 index 0000000..567906f --- /dev/null +++ b/executor/disk_balance.go @@ -0,0 +1,51 @@ +package executor + +import ( + "admin-cli/helper" + "context" + "fmt" + "time" + + "github.com/XiaoMi/pegasus-go-client/idl/radmin" +) + +func DiskMigrate(client *Client, replicaServer string, pidStr string, from string, to string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var addr, err = helper.Resolve(replicaServer, helper.Host2Addr) + if err == nil { + replicaServer = addr + } + + pid, err := helper.Str2Gpid(pidStr) + if err != nil { + return err + } + + replicaClient, err := client.GetReplicaClient(replicaServer) + if err != nil { + return err + } + + resp, err := replicaClient.DiskMigrate(ctx, &radmin.ReplicaDiskMigrateRequest{ + Pid: pid, + OriginDisk: from, + TargetDisk: to, + }) + + if err != nil { + if resp != nil && resp.Hint != nil { + return fmt.Errorf("Internal server error [%s:%s]", err, *resp.Hint) + } + return err + } + + return nil +} + +// TODO(jiashuo1) need generate migrate strategy(step) depends the disk-info result to run +func DiskBalance() error { + fmt.Println("Wait support") + return nil +} diff --git a/executor/disk_info.go b/executor/disk_info.go new file mode 100644 index 0000000..8ee3dbe --- /dev/null +++ b/executor/disk_info.go @@ -0,0 +1,176 @@ +package executor + +import ( + "admin-cli/helper" + "context" + "fmt" + "strconv" + "time" + + "github.com/XiaoMi/pegasus-go-client/idl/base" + "github.com/XiaoMi/pegasus-go-client/idl/radmin" + "github.com/olekukonko/tablewriter" +) + +type DiskInfoType int32 + +const ( + CapacitySize DiskInfoType = 0 + ReplicaCount DiskInfoType = 1 +) + +// QueryDiskInfo command +func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var addr, err = helper.Resolve(replicaServer, helper.Host2Addr) + if err == nil { + replicaServer = addr + } + + replicaClient, err := client.GetReplicaClient(replicaServer) + if err != nil { + return err + } + resp, err := replicaClient.QueryDiskInfo(ctx, &radmin.QueryDiskInfoRequest{ + Node: &base.RPCAddress{}, //TODO(jiashuo1) this thrift variable is useless, it need be deleted on client/server + AppName: tableName, + }) + if err != nil { + return err + } + + switch infoType { + case CapacitySize: + _ = queryDiskCapacity(client, replicaServer, resp, diskTag) + case ReplicaCount: + queryDiskReplicaCount(client, resp) + default: + break + } + return nil +} + +func queryDiskCapacity(client *Client, replicaServer string, resp *radmin.QueryDiskInfoResponse, diskTag string) error { + + type NodeCapacityStruct struct { + Disk string + Capacity int64 + Available int64 + Ratio int64 + } + + type ReplicaCapacityStruct struct { + Replica string + Status string + Capacity float64 + } + + var nodeCapacityInfos []NodeCapacityStruct + var replicaCapacityInfos []ReplicaCapacityStruct + + perfClient, err := client.GetPerfCounterClient(replicaServer) + if err != nil { + return err + } + + for _, diskInfo := range resp.DiskInfos { + // pass disk tag means query one disk detail capacity of replica + if len(diskTag) != 0 && diskInfo.Tag == diskTag { + appendReplicaCapacityInfo := func(replicasWithAppId map[int32][]*base.Gpid, replicaStatus string) { + for _, replicas := range replicasWithAppId { + for _, replica := range replicas { + var gpidStr = fmt.Sprintf("%d.%d", replica.Appid, replica.PartitionIndex) + replicaCapacityInfos = append(replicaCapacityInfos, ReplicaCapacityStruct{ + Replica: gpidStr, + Status: replicaStatus, + Capacity: float64(helper.GetReplicaCounterValue(perfClient, "disk.storage.sst(MB)", gpidStr)), + }) + } + } + } + appendReplicaCapacityInfo(diskInfo.HoldingPrimaryReplicas, "primary") + appendReplicaCapacityInfo(diskInfo.HoldingSecondaryReplicas, "secondary") + + // formats into tabular + tabular := tablewriter.NewWriter(client) + tabular.SetHeader([]string{"Replica", "Status", "Capacity"}) + tabular.SetAutoFormatHeaders(false) + tabular.SetAlignment(tablewriter.ALIGN_CENTER) + for _, replicaCapacityInfo := range replicaCapacityInfos { + tabular.Append([]string{ + replicaCapacityInfo.Replica, + replicaCapacityInfo.Status, + strconv.FormatFloat(replicaCapacityInfo.Capacity, 'f', -1, 64)}) + } + tabular.Render() + return nil + } + + nodeCapacityInfos = append(nodeCapacityInfos, NodeCapacityStruct{ + Disk: diskInfo.Tag, + Capacity: diskInfo.DiskCapacityMb, + Available: diskInfo.DiskAvailableMb, + Ratio: diskInfo.DiskAvailableMb * 100.0 / diskInfo.DiskCapacityMb, + }) + } + + // formats into tabular + tabular := tablewriter.NewWriter(client) + tabular.SetAlignment(tablewriter.ALIGN_CENTER) + tabular.SetHeader([]string{"Disk", "Capacity", "Available", "Ratio"}) + for _, nodeCapacityInfo := range nodeCapacityInfos { + tabular.Append([]string{ + nodeCapacityInfo.Disk, + strconv.FormatInt(nodeCapacityInfo.Capacity, 10), + strconv.FormatInt(nodeCapacityInfo.Available, 10), + strconv.FormatInt(nodeCapacityInfo.Ratio, 10)}) + } + tabular.Render() + return nil +} + +func queryDiskReplicaCount(client *Client, resp *radmin.QueryDiskInfoResponse) { + type ReplicaCountStruct struct { + Disk string + Primary int + Secondary int + Total int + } + + computeReplicaCount := func(replicasWithAppId map[int32][]*base.Gpid) int { + var replicaCount = 0 + for _, replicas := range replicasWithAppId { + for range replicas { + replicaCount++ + } + } + return replicaCount + } + + var replicaCountInfos []ReplicaCountStruct + for _, diskInfo := range resp.DiskInfos { + var primaryCount = computeReplicaCount(diskInfo.HoldingPrimaryReplicas) + var secondaryCount = computeReplicaCount(diskInfo.HoldingSecondaryReplicas) + replicaCountInfos = append(replicaCountInfos, ReplicaCountStruct{ + Disk: diskInfo.Tag, + Primary: primaryCount, + Secondary: secondaryCount, + Total: primaryCount + secondaryCount, + }) + } + + // formats into tabular + tabular := tablewriter.NewWriter(client) + tabular.SetHeader([]string{"Disk", "Primary", "Secondary", "Total"}) + tabular.SetAlignment(tablewriter.ALIGN_CENTER) + for _, replicaCountInfo := range replicaCountInfos { + tabular.Append([]string{ + replicaCountInfo.Disk, + strconv.Itoa(replicaCountInfo.Primary), + strconv.Itoa(replicaCountInfo.Secondary), + strconv.Itoa(replicaCountInfo.Total)}) + } + tabular.Render() +} diff --git a/executor/list_nodes.go b/executor/list_nodes.go new file mode 100644 index 0000000..c7da8f5 --- /dev/null +++ b/executor/list_nodes.go @@ -0,0 +1,217 @@ +package executor + +import ( + "admin-cli/helper" + "context" + "fmt" + "strconv" + "time" + + "github.com/XiaoMi/pegasus-go-client/idl/admin" + "github.com/olekukonko/tablewriter" +) + +type NodeDetailInfo struct { + // nodes + Node string + Status string + ReplicaCount int + PrimaryCount int + SecondaryCount int + // nodes -d + DiskTotalMB int64 + DiskUsedMb int64 + DiskRatio int64 + MemUsedMB int64 + BlockCacheMB int64 + MemTableMB int64 + MemIdxMB int64 + // nodes -q + GetQPS int64 + MgetQPS int64 + PutQPS int64 + MputQPS int64 + GetBytes int64 + MgetBytes int64 + PutBytes int64 + MputBytes int64 +} + +func ListNodes(client *Client, detail bool) error { + + nodes, err := queryNodeDetailInfo(client) + if err != nil { + return err + } + + var rowDatas [][]string + for _, node := range nodes { + rowDatas = append(rowDatas, []string{ + node.Node, + node.Status, + strconv.Itoa(node.ReplicaCount), + strconv.Itoa(node.PrimaryCount), + strconv.Itoa(node.SecondaryCount), + strconv.FormatInt(node.DiskTotalMB, 10), + strconv.FormatInt(node.DiskUsedMb, 10), + strconv.FormatInt(node.DiskRatio, 10), + strconv.FormatInt(node.MemUsedMB, 10), + strconv.FormatInt(node.BlockCacheMB, 10), + strconv.FormatInt(node.MemTableMB, 10), + strconv.FormatInt(node.MemIdxMB, 10), + strconv.FormatInt(node.GetQPS, 10), + strconv.FormatInt(node.MgetQPS, 10), + strconv.FormatInt(node.PutQPS, 10), + strconv.FormatInt(node.MputQPS, 10), + strconv.FormatInt(node.GetBytes, 10), + strconv.FormatInt(node.MgetBytes, 10), + strconv.FormatInt(node.PutBytes, 10), + strconv.FormatInt(node.MputBytes, 10)}) + } + + var baseHeader = []string{"Node", "Status"} + var replicaCountHeader = []string{"Replica", "Primary", "Secondary"} + var usageHeader = []string{"DiskTotalMB", "DiskUsedMb", "DiskRatio", "MemUsedMB", "BlockCacheMB", "MemTableMB", "MemIdxMB"} + var requestHeader = []string{"GetQPS", "MgetQPS", "PutQPS", "MputQPS", "GetBytes", "MgetBytes", "PutBytes", "MputBytes"} + var headers = [][]string{replicaCountHeader, usageHeader, requestHeader} + + var headerIndex = 2 + for _, header := range headers { + tabular := tablewriter.NewWriter(client) + tabular.SetAlignment(tablewriter.ALIGN_CENTER) + tabular.SetHeader(append(baseHeader, header...)) + tabular.SetAutoFormatHeaders(false) + + for _, rowData := range rowDatas { + tabular.Append(append([]string{rowData[0], rowData[1]}, rowData[headerIndex:headerIndex+len(header)]...)) + } + headerIndex += len(header) + + if !detail { + tabular.Render() + return nil + } + + tabular.Render() + } + + return nil +} + +func queryNodeDetailInfo(client *Client) ([]*NodeDetailInfo, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + listNodeResp, err := client.Meta.ListNodes(ctx, &admin.ListNodesRequest{ + Status: admin.NodeStatus_NS_INVALID, + }) + if err != nil { + return nil, err + } + + var nodes []*NodeDetailInfo + for _, info := range listNodeResp.Infos { + var node = NodeDetailInfo{} + statusErr := node.setStatus(info) + if statusErr != nil { + return nil, statusErr + } + usageErr := node.setUsageInfo(client, info.Address.GetAddress()) + if usageErr != nil { + return nil, usageErr + } + replicaCountErr := node.setReplicaCount(client, info.Address.GetAddress()) + if replicaCountErr != nil { + return nil, replicaCountErr + } + QPSErr := node.setQPSInfo(client, info.Address.GetAddress()) + if QPSErr != nil { + return nil, QPSErr + } + nodes = append(nodes, &node) + } + + return nodes, nil +} + +func (node *NodeDetailInfo) setStatus(info *admin.NodeInfo) error { + host, err := helper.Resolve(info.Address.GetAddress(), helper.Addr2Host) + if err != nil { + return err + } + node.Node = fmt.Sprintf("%s[%s]", host, info.Address.GetAddress()) + + node.Status = info.Status.String() + return nil +} + +func (node *NodeDetailInfo) setReplicaCount(client *Client, addr string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var apps []string + listAppsResp, err := client.Meta.ListApps(ctx, &admin.ListAppsRequest{ + Status: admin.AppStatus_AS_AVAILABLE, + }) + if err != nil { + return err + } + for _, app := range listAppsResp.Infos { + apps = append(apps, app.AppName) + } + + for _, app := range apps { + partitionInfoResp, err := client.Meta.QueryConfig(ctx, app) + if err != nil { + return err + } + + for _, partition := range partitionInfoResp.Partitions { + if partition.Primary.GetAddress() == addr { + node.PrimaryCount++ + } + + for _, secondary := range partition.Secondaries { + if secondary.GetAddress() == addr { + node.SecondaryCount++ + } + } + } + } + + node.ReplicaCount = node.PrimaryCount + node.SecondaryCount + return nil +} + +func (node *NodeDetailInfo) setUsageInfo(client *Client, addr string) error { + perfClient, err := client.GetPerfCounterClient(addr) + if err != nil { + return err + } + node.DiskTotalMB = helper.GetNodeCounterValue(perfClient, "disk.capacity.total") + node.DiskUsedMb = node.DiskTotalMB - helper.GetNodeCounterValue(perfClient, "disk.available.total(MB)") + node.DiskRatio = 100 - helper.GetNodeCounterValue(perfClient, "disk.available.total.ratio") + node.MemUsedMB = helper.GetNodeCounterValue(perfClient, "memused.res") + node.BlockCacheMB = helper.GetNodeCounterValue(perfClient, "rdb.block_cache.memory_usage") + node.MemTableMB = helper.GetNodeAggregateCounterValue(perfClient, "rdb.memtable.memory_usage") >> 20 + node.MemIdxMB = helper.GetNodeAggregateCounterValue(perfClient, "rdb.index_and_filter_blocks.memory_usage") >> 20 + return nil +} + +func (node *NodeDetailInfo) setQPSInfo(client *Client, addr string) error { + perfClient, err := client.GetPerfCounterClient(addr) + if err != nil { + return err + } + + node.GetQPS = helper.GetNodeAggregateCounterValue(perfClient, "get_qps") + node.PutQPS = helper.GetNodeAggregateCounterValue(perfClient, "put_qps") + node.MgetQPS = helper.GetNodeAggregateCounterValue(perfClient, "multi_get_qps") + node.MputQPS = helper.GetNodeAggregateCounterValue(perfClient, "multi_put_qps") + + node.GetBytes = helper.GetNodeAggregateCounterValue(perfClient, "get_bytes") + node.PutBytes = helper.GetNodeAggregateCounterValue(perfClient, "put_bytes") + node.MgetBytes = helper.GetNodeAggregateCounterValue(perfClient, "multi_get_bytes") + node.MputBytes = helper.GetNodeAggregateCounterValue(perfClient, "multi_put_bytes") + + return nil +} diff --git a/executor/list_tables.go b/executor/list_tables.go index c059d31..40701ab 100644 --- a/executor/list_tables.go +++ b/executor/list_tables.go @@ -33,7 +33,7 @@ func ListTables(client *Client, useJSON bool) error { tbList = append(tbList, tableStruct{ AppID: tb.AppID, Name: tb.AppName, - PartitionCount: *&tb.PartitionCount, + PartitionCount: tb.PartitionCount, CreateTime: time.Unix(tb.CreateSecond, 0).Format("2006-01-02"), Envs: tb.Envs, }) diff --git a/executor/remote_command.go b/executor/remote_command.go new file mode 100644 index 0000000..84dd63e --- /dev/null +++ b/executor/remote_command.go @@ -0,0 +1,94 @@ +package executor + +import ( + "admin-cli/helper" + "context" + "fmt" + "strings" + "time" + + "github.com/XiaoMi/pegasus-go-client/session" +) + +type commandResult struct { + NodeType session.NodeType + Address string + Command string + Result string +} + +func RemoteCommand(client *Client, nodeType session.NodeType, node string, cmd string, args string) error { + + if len(node) != 0 { + var addr, err = helper.Resolve(node, helper.Host2Addr) + if err == nil { + node = addr + } + } + + arguments := strings.Split(args, " ") + if arguments[0] == "" { + arguments = nil + } + + var results []*commandResult + + if len(node) == 0 { + if nodeType == session.NodeTypeMeta { + resp, err := sendRemoteCommand(client, nodeType, client.MetaAddresses, cmd, arguments) + if err != nil { + return err + } + results = resp + } else if nodeType == session.NodeTypeReplica { + resp, err := sendRemoteCommand(client, nodeType, client.ReplicaAddresses, cmd, arguments) + if err != nil { + return err + } + results = resp + } + } else { + resp, err := sendRemoteCommand(client, nodeType, []string{node}, cmd, arguments) + if err != nil { + return err + } + results = resp + } + + for _, result := range results { + fmt.Printf("[%s]%s[%s] : %s\n", result.NodeType, result.Address, result.Command, result.Result) + } + return nil +} + +func sendRemoteCommand(client *Client, nodeType session.NodeType, nodes []string, cmd string, args []string) ([]*commandResult, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var results []*commandResult + for _, addr := range nodes { + remoteClient, errGet := client.GetRemoteCommandClient(addr, nodeType) + if errGet != nil { + return nil, errGet + } + resp, errCall := remoteClient.Call(ctx, cmd, args) + + var host, errResolve = helper.Resolve(addr, helper.Addr2Host) + if errResolve == nil { + addr = fmt.Sprintf("%s[%s]", host, addr) + } + if errCall != nil { + fmt.Printf("Node[%s] send remote command error[%s]\n", addr, errCall) + continue + } + + results = append(results, &commandResult{ + NodeType: nodeType, + Address: addr, + Command: cmd, + Result: resp, + }) + } + + return results, nil +} diff --git a/executor/server_info.go b/executor/server_info.go new file mode 100644 index 0000000..e0ec870 --- /dev/null +++ b/executor/server_info.go @@ -0,0 +1,35 @@ +package executor + +import ( + "github.com/XiaoMi/pegasus-go-client/session" + "github.com/olekukonko/tablewriter" +) + +// ServerInfo command +func ServerInfo(client *Client) error { + + var results []*commandResult + respMetas, errMeta := sendRemoteCommand(client, session.NodeTypeMeta, client.MetaAddresses, "server-info", []string{""}) + if errMeta != nil { + return errMeta + } + respReplicas, errReplica := sendRemoteCommand(client, session.NodeTypeReplica, client.ReplicaAddresses, "server-info", []string{""}) + if errReplica != nil { + return errReplica + } + results = append(results, respMetas...) + results = append(results, respReplicas...) + + tabular := tablewriter.NewWriter(client) + tabular.SetAutoFormatHeaders(false) + tabular.SetAlignment(tablewriter.ALIGN_CENTER) + tabular.SetColWidth(150) + tabular.SetHeader([]string{"Server", "Node", "Version"}) + + for _, result := range results { + tabular.Append([]string{string(result.NodeType), result.Address, result.Result}) + } + tabular.Render() + + return nil +} diff --git a/go.mod b/go.mod index 9a14316..e1c7394 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module admin-cli go 1.14 require ( - github.com/XiaoMi/pegasus-go-client v0.0.0-20201119112224-45f30cd560c7 + github.com/XiaoMi/pegasus-go-client v0.0.0-20201123074735-60f98674a317 github.com/cheggaaa/pb/v3 v3.0.5 github.com/desertbit/grumble v1.0.8 github.com/dustin/go-humanize v1.0.0 @@ -11,7 +11,6 @@ require ( github.com/olekukonko/tablewriter v0.0.4 github.com/pegasus-kv/collector v0.0.0-20201118145415-3b855d1b1b8f github.com/sirupsen/logrus v1.7.0 - github.com/stretchr/testify v1.6.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index d85e1a6..df087da 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/XiaoMi/pegasus-go-client v0.0.0-20201119112224-45f30cd560c7 h1:v3DH4plTH/5f2RTfbNrp5rg8Fw3Bdk7i9Cf+Mu/j8Zo= github.com/XiaoMi/pegasus-go-client v0.0.0-20201119112224-45f30cd560c7/go.mod h1:H83dStz3HvHBFC0n7QK+qLHAjqsIghFOFx2TCvq6vzI= +github.com/XiaoMi/pegasus-go-client v0.0.0-20201123074735-60f98674a317 h1:3tg5aADbDBZslnjU040ubg1aNvJ1uAmwbhNupa0zb64= +github.com/XiaoMi/pegasus-go-client v0.0.0-20201123074735-60f98674a317/go.mod h1:H83dStz3HvHBFC0n7QK+qLHAjqsIghFOFx2TCvq6vzI= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/helper/common_utils.go b/helper/common_utils.go new file mode 100644 index 0000000..3e5fc7b --- /dev/null +++ b/helper/common_utils.go @@ -0,0 +1,74 @@ +package helper + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/XiaoMi/pegasus-go-client/idl/base" +) + +type ResolveType int32 + +const ( + Host2Addr ResolveType = 0 + Addr2Host ResolveType = 1 +) + +// node's format, for example, pegasus.onebox.com:34801 or 127.0.0.1:34801 +func Resolve(node string, resolveType ResolveType) (string, error) { + splitResult := strings.Split(node, ":") + if len(splitResult) < 2 { + return node, fmt.Errorf("Invalid pegasus server node[%s]", node) + } + var ip = splitResult[0] + var port = splitResult[1] + + var nodes []string + switch resolveType { + case Host2Addr: + result, err := net.LookupHost(ip) + if err != nil { + return node, err + } + nodes = result + case Addr2Host: + result, err := net.LookupAddr(ip) + if err != nil { + return node, err + } + nodes = result + } + + if len(nodes) == 0 || len(nodes) > 1 { + return node, fmt.Errorf("Invalid pegasus server node(node resolve results = 0 or >1) [%s]", node) + } + ip = nodes[0] + + // Addr2Host result has suffix `.`, for example, `pegasus.onebox.com.` we need delete the suffix + if resolveType == Addr2Host { + ip = strings.TrimSuffix(nodes[0], ".") + } + return fmt.Sprintf("%s:%s", ip, port), nil +} + +func Str2Gpid(gpid string) (*base.Gpid, error) { + splitResult := strings.Split(gpid, ".") + if len(splitResult) < 2 { + return &base.Gpid{}, fmt.Errorf("Invalid gpid format [%s]", gpid) + } + + appId, err := strconv.ParseInt(splitResult[0], 10, 32) + + if err != nil { + return &base.Gpid{}, fmt.Errorf("Invalid gpid format [%s]", gpid) + } + + partitionId, err := strconv.ParseInt(splitResult[1], 10, 32) + if err != nil { + return &base.Gpid{}, fmt.Errorf("Invalid gpid format [%s]", gpid) + } + + return &base.Gpid{Appid: int32(appId), PartitionIndex: int32(partitionId)}, nil +} diff --git a/helper/perf_counter.go b/helper/perf_counter.go new file mode 100644 index 0000000..3f96d50 --- /dev/null +++ b/helper/perf_counter.go @@ -0,0 +1,44 @@ +package helper + +import ( + "fmt" + + "github.com/pegasus-kv/collector/aggregate" +) + +func GetReplicaCounterValue(perfClient *aggregate.PerfSession, counter string, gpid string) int64 { + counters, err := perfClient.GetPerfCounters(fmt.Sprintf("%s@%s", counter, gpid)) + if err != nil { + return -1 + } + + if len(counters) == 1 { + return int64(counters[0].Value) + } + return -1 +} + +func GetNodeCounterValue(perfClient *aggregate.PerfSession, counter string) int64 { + counters, err := perfClient.GetPerfCounters(counter) + if err != nil { + return -1 + } + + if len(counters) == 1 { + return int64(counters[0].Value) + } + return -1 +} + +func GetNodeAggregateCounterValue(perfClient *aggregate.PerfSession, counter string) int64 { + counters, err := perfClient.GetPerfCounters(counter) + if err != nil { + return -1 + } + + var value float64 + for _, counter := range counters { + value += counter.Value + } + return int64(value) +} diff --git a/tabular/perf.go b/tabular/perf.go index 8e37c13..e4adaa7 100644 --- a/tabular/perf.go +++ b/tabular/perf.go @@ -55,7 +55,7 @@ type statFormatter func(float64) string // according to the predefined template. func PrintTableStatsTabular(writer io.Writer, tables map[int32]*aggregate.TableStats) { var sections map[string]interface{} - yaml.Unmarshal([]byte(tableStatsTemplate), §ions) + _ = yaml.Unmarshal([]byte(tableStatsTemplate), §ions) for sect, columns := range sections { // print section @@ -65,7 +65,7 @@ func PrintTableStatsTabular(writer io.Writer, tables map[int32]*aggregate.TableS table.SetHeader([]string{sect}) table.Render() - header := append([]string{"AppID", "Name", "Partitions"}) + header := []string{"AppID", "Name", "Partitions"} var formatters []statFormatter for columnName, attrs := range columns.(map[interface{}]interface{}) { header = append(header, columnName.(string))