Skip to content

Commit

Permalink
Implement backup v2 to fetch extra meta for milvus (#72)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Dec 9, 2022
1 parent 2b1f3c1 commit 1b01725
Show file tree
Hide file tree
Showing 7 changed files with 737 additions and 109 deletions.
46 changes: 45 additions & 1 deletion models/backup_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package models

import "github.com/golang/protobuf/proto"

// BackupHeader stores etcd backup header information
// BackupHeader stores birdwatcher backup header information.
type BackupHeader struct {
// Version number for backup format
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
Expand Down Expand Up @@ -30,3 +30,47 @@ func (v *BackupHeader) String() string {

// String implements protoiface.MessageV1
func (v *BackupHeader) ProtoMessage() {}

// PartType enum type for PartHeader.ParType.
type PartType int32

const (
// EtcdBackup part stores Etcd KV backup.
EtcdBackup PartType = 1
// MetricsBackup metrics from /metrics.
MetricsBackup PartType = 2
// MetricsDefaultBackup metrics from /metrics_default.
MetricsDefaultBackup PartType = 3
// Configurations configuration fetched from milvus server.
Configurations PartType = 4
// AppMetrics is metrics fetched via grpc metrics api.
AppMetrics PartType = 5
// LoadedSegments is segment info fetched from querynode(Milvus2.2+).
LoadedSegments PartType = 6
)

// PartHeader stores backup part information.
// Added since backup version 2.
type PartHeader struct {
// PartType represent next part type.
PartType int32 `protobuf:"varint,1,opt,name=part_type,proto3" json:"version,omitempty"`
// PartLen stands for part length in bytes.
// -1 for not sure.
// used for fast skipping one part.
PartLen int64 `protobuf:"varint,2,opt,name=part_len,proto3" json:"entries,omitempty"`
// Extra used for extra info storage.
Extra []byte `protobuf:"bytes,3,opt,name=extra,proto3" json:"-"`
}

// Reset implements protoiface.MessageV1
func (v *PartHeader) Reset() {
*v = PartHeader{}
}

// String implements protoiface.MessageV1
func (v *PartHeader) String() string {
return proto.CompactTextString(v)
}

// String implements protoiface.MessageV1
func (v *PartHeader) ProtoMessage() {}
107 changes: 99 additions & 8 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package states

import (
"bufio"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path"
"strings"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/birdwatcher/models"
"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -25,7 +31,11 @@ type embedEtcdMockState struct {
// Close implements State.
// Clean up embed etcd folder content.
func (s *embedEtcdMockState) Close() {
if s.client != nil {
s.client.Close()
}
if s.server != nil {
s.server.Close()
os.RemoveAll(s.server.Config().Dir)
}
}
Expand All @@ -46,9 +56,17 @@ func (s *embedEtcdMockState) SetupCommands() {
cleanEmptySegments(s.client, path.Join(s.instanceName, metaPath)),
// remove-segment-by-id
removeSegmentByID(s.client, path.Join(s.instanceName, metaPath)),
// force-release
getForceReleaseCmd(s.client, path.Join(s.instanceName, metaPath)),

// disconnect
getDisconnectCmd(s),

// repair-segment
getRepairSegmentCmd(s.client, path.Join(s.instanceName, metaPath)),
// repair-channel
getRepairChannelCmd(s.client, path.Join(s.instanceName, metaPath)),

// raw get
getEtcdRawCmd(s.client),

Expand All @@ -61,6 +79,10 @@ func (s *embedEtcdMockState) SetupCommands() {
s.setupFn = s.SetupCommands
}

func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.cmdState.label = fmt.Sprintf("Backup(%s)", instanceName)
}

func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName string) State {

state := &embedEtcdMockState{
Expand All @@ -77,6 +99,19 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName
return state
}

func getEmbedEtcdInstanceV2(server *embed.Etcd) *embedEtcdMockState {

client := v3client.New(server.Server)
state := &embedEtcdMockState{
cmdState: cmdState{},
server: server,
client: client,
}

state.SetupCommands()
return state
}

func getLoadBackupCmd(state State) *cobra.Command {
cmd := &cobra.Command{
Use: "load-backup [file]",
Expand Down Expand Up @@ -106,29 +141,85 @@ func getLoadBackupCmd(state State) *cobra.Command {
return
}

f, err := os.Open(arg)
if err != nil {
fmt.Printf("failed to open backup file %s, err: %s\n", arg, err.Error())
return
}
r, err := gzip.NewReader(f)
if err != nil {
fmt.Println("failed to open gzip reader, err:", err.Error())
return
}
defer r.Close()

rd := bufio.NewReader(r)
var header models.BackupHeader
err = readFixLengthHeader(rd, &header)
if err != nil {
fmt.Println("failed to load backup header", err.Error())
return
}

server, err := startEmbedEtcdServer()
if err != nil {
fmt.Println("failed to start embed etcd server:", err.Error())
return
}
fmt.Println("using data dir:", server.Config().Dir)

var rootPath string
client := v3client.New(server.Server)
rootPath, _, err = restoreEtcd(client, arg)
if err != nil {
fmt.Printf("failed to restore file: %s, error: %s", arg, err.Error())
server.Close()
nextState := getEmbedEtcdInstanceV2(server)
switch header.Version {
case 1:
err = restoreFromV1File(nextState.client, rd, &header)
if err != nil {
fmt.Println("failed to restore v1 backup file", err.Error())
nextState.Close()
return
}
nextState.SetInstance(header.Instance)
case 2:
err = restoreV2File(rd, nextState)
if err != nil {
fmt.Println("failed to restore v2 backup file", err.Error())
nextState.Close()
return
}
default:
fmt.Printf("backup version %d not supported\n", header.Version)
nextState.Close()
return
}

state.SetNext(getEmbedEtcdInstance(server, client, rootPath))
state.SetNext(nextState)
},
}

return cmd
}

func readFixLengthHeader[T proto.Message](rd *bufio.Reader, header T) error {
lb := make([]byte, 8)
lenRead, err := rd.Read(lb)
if err == io.EOF || lenRead < 8 {
return fmt.Errorf("File does not contains valid header")
}

nextBytes := binary.LittleEndian.Uint64(lb)
headerBs := make([]byte, nextBytes)
lenRead, err = io.ReadFull(rd, headerBs)
if err != nil {
return fmt.Errorf("failed to read header bytes, %w", err)
}
if lenRead != int(nextBytes) {
return fmt.Errorf("not enough bytes for header")
}
err = proto.Unmarshal(headerBs, header)
if err != nil {
return fmt.Errorf("failed to unmarshal header, err: %w", err)
}
return nil
}

// testFile check file path exists and has access
func testFile(file string) error {
fi, err := os.Stat(file)
Expand Down
Loading

0 comments on commit 1b01725

Please sign in to comment.