From 1b0172555f4f9b916a32547774a41545da3a48f1 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 9 Dec 2022 14:43:16 +0800 Subject: [PATCH] Implement backup v2 to fetch extra meta for milvus (#72) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- models/backup_header.go | 46 ++++- states/backup_mock_connect.go | 107 +++++++++- states/etcd_backup.go | 357 ++++++++++++++++++++++++++++++---- states/etcd_restore.go | 247 ++++++++++++++++++----- states/force_release.go | 17 +- states/instance.go | 2 + states/metrics.go | 70 +++++++ 7 files changed, 737 insertions(+), 109 deletions(-) create mode 100644 states/metrics.go diff --git a/models/backup_header.go b/models/backup_header.go index 34ccbc5..78ee070 100644 --- a/models/backup_header.go +++ b/models/backup_header.go @@ -2,7 +2,7 @@ package models import "github.com/golang/protobuf/proto" -// BackupHeader stores etcd backup header information +// BackupHeader stores birdwatcher backup header information. type BackupHeader struct { // Version number for backup format Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` @@ -30,3 +30,47 @@ func (v *BackupHeader) String() string { // String implements protoiface.MessageV1 func (v *BackupHeader) ProtoMessage() {} + +// PartType enum type for PartHeader.ParType. +type PartType int32 + +const ( + // EtcdBackup part stores Etcd KV backup. + EtcdBackup PartType = 1 + // MetricsBackup metrics from /metrics. + MetricsBackup PartType = 2 + // MetricsDefaultBackup metrics from /metrics_default. + MetricsDefaultBackup PartType = 3 + // Configurations configuration fetched from milvus server. + Configurations PartType = 4 + // AppMetrics is metrics fetched via grpc metrics api. + AppMetrics PartType = 5 + // LoadedSegments is segment info fetched from querynode(Milvus2.2+). + LoadedSegments PartType = 6 +) + +// PartHeader stores backup part information. +// Added since backup version 2. +type PartHeader struct { + // PartType represent next part type. + PartType int32 `protobuf:"varint,1,opt,name=part_type,proto3" json:"version,omitempty"` + // PartLen stands for part length in bytes. + // -1 for not sure. + // used for fast skipping one part. + PartLen int64 `protobuf:"varint,2,opt,name=part_len,proto3" json:"entries,omitempty"` + // Extra used for extra info storage. + Extra []byte `protobuf:"bytes,3,opt,name=extra,proto3" json:"-"` +} + +// Reset implements protoiface.MessageV1 +func (v *PartHeader) Reset() { + *v = PartHeader{} +} + +// String implements protoiface.MessageV1 +func (v *PartHeader) String() string { + return proto.CompactTextString(v) +} + +// String implements protoiface.MessageV1 +func (v *PartHeader) ProtoMessage() {} diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index 38276ba..67ec121 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -1,13 +1,19 @@ package states import ( + "bufio" + "compress/gzip" + "encoding/binary" "fmt" + "io" "io/ioutil" "net/url" "os" "path" "strings" + "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/models" "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" @@ -25,7 +31,11 @@ type embedEtcdMockState struct { // Close implements State. // Clean up embed etcd folder content. func (s *embedEtcdMockState) Close() { + if s.client != nil { + s.client.Close() + } if s.server != nil { + s.server.Close() os.RemoveAll(s.server.Config().Dir) } } @@ -46,9 +56,17 @@ func (s *embedEtcdMockState) SetupCommands() { cleanEmptySegments(s.client, path.Join(s.instanceName, metaPath)), // remove-segment-by-id removeSegmentByID(s.client, path.Join(s.instanceName, metaPath)), + // force-release + getForceReleaseCmd(s.client, path.Join(s.instanceName, metaPath)), + // disconnect getDisconnectCmd(s), + // repair-segment + getRepairSegmentCmd(s.client, path.Join(s.instanceName, metaPath)), + // repair-channel + getRepairChannelCmd(s.client, path.Join(s.instanceName, metaPath)), + // raw get getEtcdRawCmd(s.client), @@ -61,6 +79,10 @@ func (s *embedEtcdMockState) SetupCommands() { s.setupFn = s.SetupCommands } +func (s *embedEtcdMockState) SetInstance(instanceName string) { + s.cmdState.label = fmt.Sprintf("Backup(%s)", instanceName) +} + func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName string) State { state := &embedEtcdMockState{ @@ -77,6 +99,19 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName return state } +func getEmbedEtcdInstanceV2(server *embed.Etcd) *embedEtcdMockState { + + client := v3client.New(server.Server) + state := &embedEtcdMockState{ + cmdState: cmdState{}, + server: server, + client: client, + } + + state.SetupCommands() + return state +} + func getLoadBackupCmd(state State) *cobra.Command { cmd := &cobra.Command{ Use: "load-backup [file]", @@ -106,29 +141,85 @@ func getLoadBackupCmd(state State) *cobra.Command { return } + f, err := os.Open(arg) + if err != nil { + fmt.Printf("failed to open backup file %s, err: %s\n", arg, err.Error()) + return + } + r, err := gzip.NewReader(f) + if err != nil { + fmt.Println("failed to open gzip reader, err:", err.Error()) + return + } + defer r.Close() + + rd := bufio.NewReader(r) + var header models.BackupHeader + err = readFixLengthHeader(rd, &header) + if err != nil { + fmt.Println("failed to load backup header", err.Error()) + return + } + server, err := startEmbedEtcdServer() if err != nil { fmt.Println("failed to start embed etcd server:", err.Error()) return } fmt.Println("using data dir:", server.Config().Dir) - - var rootPath string - client := v3client.New(server.Server) - rootPath, _, err = restoreEtcd(client, arg) - if err != nil { - fmt.Printf("failed to restore file: %s, error: %s", arg, err.Error()) - server.Close() + nextState := getEmbedEtcdInstanceV2(server) + switch header.Version { + case 1: + err = restoreFromV1File(nextState.client, rd, &header) + if err != nil { + fmt.Println("failed to restore v1 backup file", err.Error()) + nextState.Close() + return + } + nextState.SetInstance(header.Instance) + case 2: + err = restoreV2File(rd, nextState) + if err != nil { + fmt.Println("failed to restore v2 backup file", err.Error()) + nextState.Close() + return + } + default: + fmt.Printf("backup version %d not supported\n", header.Version) + nextState.Close() return } - state.SetNext(getEmbedEtcdInstance(server, client, rootPath)) + state.SetNext(nextState) }, } return cmd } +func readFixLengthHeader[T proto.Message](rd *bufio.Reader, header T) error { + lb := make([]byte, 8) + lenRead, err := rd.Read(lb) + if err == io.EOF || lenRead < 8 { + return fmt.Errorf("File does not contains valid header") + } + + nextBytes := binary.LittleEndian.Uint64(lb) + headerBs := make([]byte, nextBytes) + lenRead, err = io.ReadFull(rd, headerBs) + if err != nil { + return fmt.Errorf("failed to read header bytes, %w", err) + } + if lenRead != int(nextBytes) { + return fmt.Errorf("not enough bytes for header") + } + err = proto.Unmarshal(headerBs, header) + if err != nil { + return fmt.Errorf("failed to unmarshal header, err: %w", err) + } + return nil +} + // testFile check file path exists and has access func testFile(file string) error { fi, err := os.Stat(file) diff --git a/states/etcd_backup.go b/states/etcd_backup.go index d7ee1d9..22b18b0 100644 --- a/states/etcd_backup.go +++ b/states/etcd_backup.go @@ -5,8 +5,10 @@ import ( "compress/gzip" "context" "encoding/binary" + "encoding/json" "errors" "fmt" + "io" "os" "path" "strings" @@ -16,8 +18,18 @@ import ( "github.com/gosuri/uilive" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" + "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" + "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" + "github.com/milvus-io/birdwatcher/proto/v2.0/rootcoordpb" + datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + indexpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/indexpb" + internalpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/internalpb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + rootcoordpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/rootcoordpb" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" ) type milvusComponent string @@ -62,11 +74,12 @@ func getBackupEtcdCmd(cli *clientv3.Client, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "backup", Short: "backup etcd key-values", - RunE: func(cmd *cobra.Command, args []string) error { + Run: func(cmd *cobra.Command, args []string) { ignoreRevision, err := cmd.Flags().GetBool("ignoreRevision") if err != nil { - return err + fmt.Println(err.Error()) + return } prefix := "" @@ -76,26 +89,66 @@ func getBackupEtcdCmd(cli *clientv3.Client, basePath string) *cobra.Command { case compQueryCoord: prefix = `queryCoord-` default: - return fmt.Errorf("component %s not support yet", component) + fmt.Printf("component %s not supported for separate backup, use ALL instead\n", component) + return } - now := time.Now() - err = backupEtcd(cli, basePath, prefix, component.String(), fmt.Sprintf("bw_etcd_%s.%s.bak.gz", component, now.Format("060102-150405")), ignoreRevision) + f, err := getBackupFile(component.String()) + if err != nil { + fmt.Println("failed to open backup file:", err.Error()) + return + } + defer f.Close() + + gw := gzip.NewWriter(f) + defer gw.Close() + w := bufio.NewWriter(gw) + + // write backup header + // version 2 used for now + err = writeBackupHeader(w, 2) + + err = backupEtcdV2(cli, basePath, prefix, w, ignoreRevision) if err != nil { fmt.Printf("backup etcd failed, error: %v\n", err) } - return nil + backupMetrics(cli, basePath, w) + backupConfiguration(cli, basePath, w) + backupAppMetrics(cli, basePath, w) + fmt.Printf("backup for prefix done, stored in file: %s\n", f.Name()) }, } - cmd.Flags().Var(&component, "component", "component to backup") + cmd.Flags().Var(&component, "ALL", "component to backup") cmd.Flags().Bool("ignoreRevision", false, "backup ignore revision change, ONLY shall works with no nodes online") return cmd } -// backupEtcd backup all key-values with prefix provided into local file. -// implements gzip compression for now. -func backupEtcd(cli *clientv3.Client, base, prefix string, component string, filePath string, ignoreRevision bool) error { +func getBackupFile(component string) (*os.File, error) { + now := time.Now() + filePath := fmt.Sprintf("bw_etcd_%s.%s.bak.gz", component, now.Format("060102-150405")) + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + return nil, err + } + return f, nil +} + +func writeBackupHeader(w io.Writer, version int32) error { + lb := make([]byte, 8) + header := &models.BackupHeader{Version: version} + bs, err := proto.Marshal(header) + if err != nil { + fmt.Println("failed to marshal backup header,", err.Error()) + return err + } + binary.LittleEndian.PutUint64(lb, uint64(len(bs))) + w.Write(lb) + w.Write(bs) + return nil +} + +func backupEtcdV2(cli *clientv3.Client, base, prefix string, w *bufio.Writer, ignoreRevision bool) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, path.Join(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix()) @@ -107,41 +160,36 @@ func backupEtcd(cli *clientv3.Client, base, prefix string, component string, fil fmt.Println("WARNING!!! doing backup ignore revision! please make sure no instanc of milvus is online!") } + // meta stored in extra + meta := make(map[string]string) + cnt := resp.Count rev := resp.Header.Revision - - fmt.Printf("found %d keys, at revision %d, starting backup...\n", cnt, rev) - - f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - return err - } - defer f.Close() - - gw := gzip.NewWriter(f) - defer gw.Close() - w := bufio.NewWriter(gw) - - var instance, meta string + meta["cnt"] = fmt.Sprintf("%d", cnt) + meta["rev"] = fmt.Sprintf("%d", cnt) + var instance, metaPath string parts := strings.Split(base, "/") if len(parts) > 1 { - meta = parts[len(parts)-1] + metaPath = parts[len(parts)-1] instance = path.Join(parts[:len(parts)-1]...) } else { instance = base } - - lb := make([]byte, 8) - header := &models.BackupHeader{Version: 1, Instance: instance, MetaPath: meta, Entries: cnt} - bs, err := proto.Marshal(header) + meta["instance"] = instance + meta["metaPath"] = metaPath + + bs, _ := json.Marshal(meta) + ph := models.PartHeader{ + PartType: int32(models.EtcdBackup), + PartLen: -1, // not sure for length + Extra: bs, + } + bs, err = proto.Marshal(&ph) if err != nil { - fmt.Println("failed to marshal backup header,", err.Error()) + fmt.Println("failed to marshal part header for etcd backup", err.Error()) return err } - binary.LittleEndian.PutUint64(lb, uint64(len(bs))) - fmt.Println("header length:", len(bs)) - w.Write(lb) - w.Write(bs) + writeBackupBytes(w, bs) progressDisplay := uilive.New() progressFmt := "Backing up ... %d%%(%d/%d)\n" @@ -170,9 +218,8 @@ func backupEtcd(cli *clientv3.Client, base, prefix string, component string, fil return err } - binary.LittleEndian.PutUint64(lb, uint64(len(bs))) - w.Write(lb) - w.Write(bs) + writeBackupBytes(w, bs) + currentKey = string(append(kvs.Key, 0)) } @@ -182,7 +229,241 @@ func backupEtcd(cli *clientv3.Client, base, prefix string, component string, fil w.Flush() progressDisplay.Stop() - fmt.Printf("backup etcd for prefix %s done, stored in file: %s\n", prefix, filePath) + // write stopper + writeBackupBytes(w, nil) + + w.Flush() + + fmt.Printf("backup etcd for prefix %s done\n", prefix) + return nil +} + +func backupMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) error { + sessions, err := listSessions(cli, basePath) + if err != nil { + return err + } + + ph := models.PartHeader{ + PartType: int32(models.MetricsBackup), + PartLen: -1, // not sure for length + } + // write stopper + bs, err := proto.Marshal(&ph) + if err != nil { + fmt.Println("failed to marshal part header for etcd backup", err.Error()) + return err + } + writeBackupBytes(w, bs) + defer writeBackupBytes(w, nil) + + for _, session := range sessions { + mbs, dmbs, err := fetchInstanceMetrics(session) + if err != nil { + fmt.Printf("failed to fetch metrics for %s(%d), %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + + bs, err := json.Marshal(session) + if err != nil { + continue + } + // [session info] + // [metrics] + // [default metrics] + writeBackupBytes(w, bs) + writeBackupBytes(w, mbs) + writeBackupBytes(w, dmbs) + } + + return nil +} + +func backupAppMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) error { + sessions, err := listSessions(cli, basePath) + if err != nil { + return err + } + + ph := models.PartHeader{ + PartType: int32(models.AppMetrics), + PartLen: -1, // not sure for length + } + // write stopper + bs, err := proto.Marshal(&ph) + if err != nil { + fmt.Println("failed to marshal part header for etcd backup", err.Error()) + return err + } + writeBackupBytes(w, bs) + defer writeBackupBytes(w, nil) + + for _, session := range sessions { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + + var client metricsSource + switch strings.ToLower(session.ServerName) { + case "rootcoord": + client = rootcoordpb.NewRootCoordClient(conn) + case "datacoord": + client = datapb.NewDataCoordClient(conn) + case "indexcoord": + client = indexpb.NewIndexCoordClient(conn) + case "querycoord": + client = querypb.NewQueryCoordClient(conn) + case "datanode": + client = datapb.NewDataNodeClient(conn) + case "querynode": + client = querypb.NewQueryNodeClient(conn) + case "indexnode": + client = indexpb.NewIndexNodeClient(conn) + } + if client == nil { + continue + } + data, err := getMetrics(context.Background(), client) + if err != nil { + continue + } + + labelBs, err := json.Marshal(session) + if err != nil { + continue + } + + writeBackupBytes(w, labelBs) + writeBackupBytes(w, []byte(data)) + } + + return nil + +} + +func backupConfiguration(cli *clientv3.Client, basePath string, w *bufio.Writer) error { + sessions, err := listSessions(cli, basePath) + if err != nil { + return err + } + + ph := models.PartHeader{ + PartType: int32(models.Configurations), + PartLen: -1, // not sure for length + } + // write stopper + bs, err := proto.Marshal(&ph) + if err != nil { + fmt.Println("failed to marshal part header for etcd backup", err.Error()) + return err + } + writeBackupBytes(w, bs) + defer writeBackupBytes(w, nil) + + for _, session := range sessions { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + + var client configurationSource + switch strings.ToLower(session.ServerName) { + case "rootcoord": + client = rootcoordpbv2.NewRootCoordClient(conn) + case "datacoord": + client = datapbv2.NewDataCoordClient(conn) + case "indexcoord": + client = indexpbv2.NewIndexCoordClient(conn) + case "querycoord": + client = querypbv2.NewQueryCoordClient(conn) + case "datanode": + client = datapbv2.NewDataNodeClient(conn) + case "querynode": + client = querypbv2.NewQueryNodeClient(conn) + case "indexnode": + client = indexpbv2.NewIndexNodeClient(conn) + } + if client == nil { + continue + } + configurations, err := getConfiguration(context.Background(), client, session.ServerID) + if err != nil { + continue + } + + labelBs, err := json.Marshal(session) + if err != nil { + continue + } + + // wrap with response + model := &internalpbv2.ShowConfigurationsResponse{ + Configuations: configurations, + } + bs, err := proto.Marshal(model) + if err != nil { + continue + } + + writeBackupBytes(w, labelBs) + writeBackupBytes(w, bs) + } return nil } + +func writeBackupBytes(w *bufio.Writer, data []byte) { + lb := make([]byte, 8) + binary.LittleEndian.PutUint64(lb, uint64(len(data))) + w.Write(lb) + if len(data) > 0 { + w.Write(data) + } +} + +func readBackupBytes(rd io.Reader) ([]byte, uint64, error) { + lb := make([]byte, 8) + var nextBytes uint64 + bsRead, err := io.ReadFull(rd, lb) //rd.Read(lb) + // all file read + if err == io.EOF { + return nil, nextBytes, err + } + if err != nil { + fmt.Println("failed to read file:", err.Error()) + return nil, nextBytes, err + } + if bsRead < 8 { + fmt.Printf("fail to read next length %d instead of 8 read\n", bsRead) + return nil, nextBytes, errors.New("invalid file format") + } + + nextBytes = binary.LittleEndian.Uint64(lb) + + data := make([]byte, nextBytes) + // cannot use rd.Read(bs), since proto marshal may generate a stopper + bsRead, err = io.ReadFull(rd, data) + if err != nil { + return nil, nextBytes, err + } + if uint64(bsRead) != nextBytes { + fmt.Printf("bytesRead(%d)is not equal to nextBytes(%d)\n", bsRead, nextBytes) + return nil, nextBytes, err + } + return data, nextBytes, nil +} diff --git a/states/etcd_restore.go b/states/etcd_restore.go index 277ab60..dc0ee4e 100644 --- a/states/etcd_restore.go +++ b/states/etcd_restore.go @@ -2,13 +2,13 @@ package states import ( "bufio" - "compress/gzip" "context" "encoding/binary" + "encoding/json" "errors" "fmt" "io" - "os" + "strconv" "time" "github.com/golang/protobuf/proto" @@ -18,61 +18,112 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -// restoreEtcd write back backup file content to etcd -// need backup first before calling this function -// basePath is needed for key indication -func restoreEtcd(cli *clientv3.Client, filePath string) (string, string, error) { - f, err := os.Open(filePath) - if err != nil { - return "", "", err - } - defer f.Close() - - r, err := gzip.NewReader(f) - if err != nil { - return "", "", err - } - defer r.Close() +func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.BackupHeader) error { + var nextBytes uint64 + var bs []byte lb := make([]byte, 8) - rd := bufio.NewReader(r) - lenRead, err := rd.Read(lb) - if err == io.EOF || lenRead < 8 { - fmt.Println("File does not contains valid header") - } + i := 0 + progressDisplay := uilive.New() + progressFmt := "Restoring backup ... %d%%(%d/%d)\n" - nextBytes := binary.LittleEndian.Uint64(lb) - headerBs := make([]byte, nextBytes) + progressDisplay.Start() + fmt.Fprintf(progressDisplay, progressFmt, 0, 0, header.Entries) + defer progressDisplay.Stop() + + for { + + bsRead, err := io.ReadFull(rd, lb) //rd.Read(lb) + // all file read + if err == io.EOF { + return nil + } + if err != nil { + fmt.Println("failed to read file:", err.Error()) + return err + } + if bsRead < 8 { + fmt.Printf("fail to read next length %d instead of 8 read\n", bsRead) + return errors.New("invalid file format") + } + + nextBytes = binary.LittleEndian.Uint64(lb) + bs = make([]byte, nextBytes) + + // cannot use rd.Read(bs), since proto marshal may generate a stopper + bsRead, err = io.ReadFull(rd, bs) + if err != nil { + fmt.Println("failed to read next kv data", err.Error()) + return err + } + if uint64(bsRead) != nextBytes { + fmt.Printf("bytesRead(%d)is not equal to nextBytes(%d)\n", bsRead, nextBytes) + return errors.New("bad file format") + } + + entry := &commonpb.KeyDataPair{} + err = proto.Unmarshal(bs, entry) + if err != nil { + //Skip for now + fmt.Printf("fail to parse line: %s, skip for now\n", err.Error()) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + _, err = cli.Put(ctx, entry.Key, string(entry.Data)) + if err != nil { + fmt.Println("failed save kv into etcd, ", err.Error()) + return err + } + i++ + progress := i * 100 / int(header.Entries) + + fmt.Fprintf(progressDisplay, progressFmt, progress, i, header.Entries) - lenRead, err = rd.Read(headerBs) - if err != nil { - fmt.Println("failed to read header", err.Error()) - return "", "", nil } +} - if lenRead != int(nextBytes) { - fmt.Println("not enough bytes for header") - return "", "", nil +func restoreV2File(rd *bufio.Reader, state *embedEtcdMockState) error { + var err error + for { + var ph models.PartHeader + err = readFixLengthHeader(rd, &ph) + if err != nil { + //TODO check EOF + return nil + } + + switch ph.PartType { + case int32(models.EtcdBackup): + instance, err := restoreEtcdFromBackV2(state.client, rd, ph) + if err != nil { + fmt.Println("failed to restore etcd from backup file", err.Error()) + return err + } + state.SetInstance(instance) + case int32(models.MetricsBackup): + testRestoreMetrics(rd, ph) + case int32(models.Configurations): + testRestoreConfigurations(rd, ph) + case int32(models.AppMetrics): + testRestoreConfigurations(rd, ph) + } } +} - header := &models.BackupHeader{} - err = proto.Unmarshal(headerBs, header) +func restoreEtcdFromBackV2(cli *clientv3.Client, rd io.Reader, ph models.PartHeader) (string, error) { + meta := make(map[string]string) + err := json.Unmarshal(ph.Extra, &meta) if err != nil { - fmt.Println("cannot parse backup header", err.Error()) - return "", "", err + return "", err } - fmt.Printf("header: %#v\n", header) - - switch header.Version { - case 1: - return header.Instance, header.MetaPath, restoreFromV1File(cli, rd, header) - default: - fmt.Printf("backup version %d not supported\n", header.Version) - return "", "", fmt.Errorf("backup version %d not supported", header.Version) + + cnt, err := strconv.ParseInt(meta["cnt"], 10, 64) + if err != nil { + return "", err } -} -func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.BackupHeader) error { var nextBytes uint64 var bs []byte @@ -82,7 +133,7 @@ func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.Backup progressFmt := "Restoring backup ... %d%%(%d/%d)\n" progressDisplay.Start() - fmt.Fprintf(progressDisplay, progressFmt, 0, 0, header.Entries) + fmt.Fprintf(progressDisplay, progressFmt, 0, 0, cnt) defer progressDisplay.Stop() for { @@ -90,29 +141,33 @@ func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.Backup bsRead, err := io.ReadFull(rd, lb) //rd.Read(lb) // all file read if err == io.EOF { - return nil + return meta["instance"], nil } if err != nil { fmt.Println("failed to read file:", err.Error()) - return err + return "", err } if bsRead < 8 { fmt.Printf("fail to read next length %d instead of 8 read\n", bsRead) - return errors.New("invalid file format") + return "", errors.New("invalid file format") } nextBytes = binary.LittleEndian.Uint64(lb) + // stopper found + if nextBytes == 0 { + return meta["instance"], nil + } bs = make([]byte, nextBytes) // cannot use rd.Read(bs), since proto marshal may generate a stopper bsRead, err = io.ReadFull(rd, bs) if err != nil { fmt.Println("failed to read next kv data", err.Error()) - return err + return "", err } if uint64(bsRead) != nextBytes { fmt.Printf("bytesRead(%d)is not equal to nextBytes(%d)\n", bsRead, nextBytes) - return errors.New("bad file format") + return "", errors.New("bad file format") } entry := &commonpb.KeyDataPair{} @@ -128,12 +183,96 @@ func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.Backup _, err = cli.Put(ctx, entry.Key, string(entry.Data)) if err != nil { fmt.Println("failed save kv into etcd, ", err.Error()) - return err + return "", err } i++ - progress := i * 100 / int(header.Entries) + progress := i * 100 / int(cnt) - fmt.Fprintf(progressDisplay, progressFmt, progress, i, header.Entries) + fmt.Fprintf(progressDisplay, progressFmt, progress, i, cnt) + } + +} + +func testRestoreMetrics(rd io.Reader, ph models.PartHeader) error { + for { + bs, nb, err := readBackupBytes(rd) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + // stopper + if nb == 0 { + return nil + } + + // session + fmt.Println(string(bs)) + + bs, _, err = readBackupBytes(rd) + if err != nil { + return err + } + + fmt.Println("metrics len:", len(bs)) + bs, _, err = readBackupBytes(rd) + if err != nil { + return err + } + fmt.Println("default metrics len:", len(bs)) + } +} + +func testRestoreConfigurations(rd io.Reader, ph models.PartHeader) error { + for { + bs, nb, err := readBackupBytes(rd) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + // stopper + if nb == 0 { + return nil + } + + // session + fmt.Println(string(bs)) + + bs, _, err = readBackupBytes(rd) + if err != nil { + return err + } + + fmt.Println("configuration len:", len(bs)) + + } +} + +func testRestoreAppMetrics(rd io.Reader, ph models.PartHeader) error { + for { + bs, nb, err := readBackupBytes(rd) + if err != nil { + if err == io.EOF { + return nil + } + return err + } + // stopper + if nb == 0 { + return nil + } + + // session + fmt.Println(string(bs)) + + bs, _, err = readBackupBytes(rd) + if err != nil { + return err + } + fmt.Println("app metrics len:", len(bs)) } } diff --git a/states/force_release.go b/states/force_release.go index e3cc4c7..ecac844 100644 --- a/states/force_release.go +++ b/states/force_release.go @@ -18,18 +18,19 @@ func getForceReleaseCmd(cli *clientv3.Client, basePath string) *cobra.Command { Use: "force-release", Short: "Force release the collections from QueryCoord", Run: func(cmd *cobra.Command, args []string) { - // basePath = 'by-dev/meta/' - // queryCoord prefix = 'queryCoord-' - now := time.Now() - err := backupEtcd(cli, basePath, "queryCoord-", string(compQueryCoord), fmt.Sprintf("bw_etcd_querycoord.%s.bak.gz", now.Format("060102-150405")), false) - if err != nil { - fmt.Printf("backup etcd failed, error: %v, stop doing force-release\n", err) - } + /* + // basePath = 'by-dev/meta/' + // queryCoord prefix = 'queryCoord-' + now := time.Now() + err := backupEtcd(cli, basePath, "queryCoord-", string(compQueryCoord), fmt.Sprintf("bw_etcd_querycoord.%s.bak.gz", now.Format("060102-150405")), false) + if err != nil { + fmt.Printf("backup etcd failed, error: %v, stop doing force-release\n", err) + }*/ // remove all keys start with [basePath]/queryCoord- ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, err = cli.Delete(ctx, path.Join(basePath, "queryCoord-"), clientv3.WithPrefix()) + _, err := cli.Delete(ctx, path.Join(basePath, "queryCoord-"), clientv3.WithPrefix()) if err != nil { fmt.Printf("failed to remove queryCoord etcd kv, err: %v\n", err) } diff --git a/states/instance.go b/states/instance.go index 47b44c2..a6a5206 100644 --- a/states/instance.go +++ b/states/instance.go @@ -58,6 +58,8 @@ func (s *instanceState) SetupCommands() { getRepairChannelCmd(cli, path.Join(instanceName, metaPath)), // reset-checkpoint getResetCheckpointCmd(cli, path.Join(instanceName, metaPath)), + // fetch-metrics + getFetchMetricsCmd(cli, path.Join(instanceName, metaPath)), // dry-mode getDryModeCmd(cli, s, s.etcdState), // disconnect diff --git a/states/metrics.go b/states/metrics.go new file mode 100644 index 0000000..d35da0c --- /dev/null +++ b/states/metrics.go @@ -0,0 +1,70 @@ +package states + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/milvus-io/birdwatcher/models" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func getFetchMetricsCmd(cli *clientv3.Client, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "fetch-metrics", + Short: "fetch metrics from milvus instances", + Run: func(cmd *cobra.Command, args []string) { + sessions, err := listSessions(cli, basePath) + if err != nil { + fmt.Println("failed to list session", err.Error()) + return + } + + for _, session := range sessions { + metrics, defaultMetrics, _ := fetchInstanceMetrics(session) + fmt.Println(session) + //TODO parse metrics + fmt.Println(metrics) + fmt.Println(defaultMetrics) + } + }, + } + + return cmd +} + +func fetchInstanceMetrics(session *models.Session) ([]byte, []byte, error) { + addr := session.Address + if strings.Contains(session.Address, ":") { + addr = strings.Split(addr, ":")[0] + } + + url := fmt.Sprintf("http://%s:%d/metrics", addr, 9091) + fmt.Println(url) + + // #nosec + resp, err := http.Get(url) + if err != nil { + return nil, nil, err + } + + metricsBs, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, nil, err + } + + url = fmt.Sprintf("http://%s:%d/metrics_default", addr, 9091) + // #nosec + resp, err = http.Get(url) + if err != nil { + return nil, nil, err + } + + defaultMetricsBs, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, nil, err + } + return metricsBs, defaultMetricsBs, nil +}