Skip to content

Commit

Permalink
Refactor command registration logic (#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jul 4, 2023
1 parent 4f7744c commit 7fa12fe
Show file tree
Hide file tree
Showing 21 changed files with 980 additions and 545 deletions.
9 changes: 2 additions & 7 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,17 @@ func (s *embedEtcdMockState) SetupCommands() {
// force-release
getForceReleaseCmd(s.client, rootPath),

// disconnect
getDisconnectCmd(s, s.config),

// for testing
etcd.RepairCommand(s.client, rootPath),

getPrintMetricsCmd(s),

getListMetricsNodeCmd(s),

// exit
getExitCmd(s),
)
cmd.AddCommand(getGlobalUtilCommands()...)
cmd.AddCommand(etcd.RawCommands(s.client)...)

s.mergeFunctionCommands(cmd, s)

s.cmdState.rootCmd = cmd
s.setupFn = s.SetupCommands
}
Expand Down
71 changes: 71 additions & 0 deletions states/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,77 @@ import (
"google.golang.org/grpc"
)

type GetConfigurationParam struct {
ParamBase `use:"show configurations" desc:"iterate all online components and inspect configuration"`
Format string `name:"format" default:"line" desc:"output format"`
}

func (s *instanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error {
sessions, err := common.ListSessions(s.client, s.basePath)
if err != nil {
return err
}

results := make(map[string]map[string]string)

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithTimeout(2 * time.Second),
}

conn, err := grpc.DialContext(ctx, session.Address, opts...)
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
}
var client configurationSource
switch strings.ToLower(session.ServerName) {
case "rootcoord":
client = rootcoordpbv2.NewRootCoordClient(conn)
case "datacoord":
client = datapbv2.NewDataCoordClient(conn)
case "indexcoord":
client = indexpbv2.NewIndexCoordClient(conn)
case "querycoord":
client = querypbv2.NewQueryCoordClient(conn)
case "datanode":
client = datapbv2.NewDataNodeClient(conn)
case "querynode":
client = querypbv2.NewQueryNodeClient(conn)
case "indexnode":
client = indexpbv2.NewIndexNodeClient(conn)
}
if client == nil {
continue
}

configurations, err := getConfiguration(context.Background(), client, session.ServerID)
if err != nil {
continue
}

results[fmt.Sprintf("%s-%d", session.ServerName, session.ServerID)] = common.KVListMap(configurations)
}

switch strings.ToLower(p.Format) {
case "json":
bs, _ := json.MarshalIndent(results, "", "\t")
fmt.Println(string(bs))
case "line":
fallthrough
default:
for comp, configs := range results {
fmt.Println("Component", comp)
for key, value := range configs {
fmt.Printf("%s: %s\n", key, value)
}
}
}
return nil
}

// GetConfigurationCommand returns command to iterate all online components and fetch configurations.
func GetConfigurationCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Expand Down
30 changes: 30 additions & 0 deletions states/current_version.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package states

import (
"context"
"fmt"

"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/models"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/spf13/cobra"
Expand All @@ -19,6 +21,34 @@ func CurrentVersionCommand() *cobra.Command {
return cmd
}

type setCurrentVersionParam struct {
ParamBase `use:"set current-version" desc:"set current version for etcd meta parsing"`
newVersion string
}

func (p *setCurrentVersionParam) ParseArgs(args []string) error {
if len(args) != 1 {
return errors.New("invalid parameter number")
}
p.newVersion = args[0]
return nil
}

func (s *instanceState) SetCurrentVersionCommand(ctx context.Context, param setCurrentVersionParam) {
switch param.newVersion {
case models.LTEVersion2_1:
fallthrough
case "LTEVersion2_1":
etcdversion.SetVersion(models.LTEVersion2_1)
case models.GTEVersion2_2:
fallthrough
case "GTEVersion2_2":
etcdversion.SetVersion(models.GTEVersion2_2)
default:
fmt.Println("Invalid version string:", param.newVersion)
}
}

// SetCurrentVersionCommand returns command for set current-version.
func SetCurrentVersionCommand() *cobra.Command {
cmd := &cobra.Command{
Expand Down
24 changes: 24 additions & 0 deletions states/download_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)
Expand Down Expand Up @@ -66,6 +67,29 @@ func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command {
return cmd
}

func getMinioWithInfo(addr string, ak, sk string, bucketName string) (*minio.Client, string, error) {
cred := credentials.NewStaticV4(ak, sk, "")
minioClient, err := minio.New(addr, &minio.Options{
Creds: cred,
Secure: false,
})
if err != nil {
return nil, "", err
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}

func getMinioAccess() (*minio.Client, string, error) {
p := promptui.Prompt{
Label: "BucketName",
Expand Down
Loading

0 comments on commit 7fa12fe

Please sign in to comment.