Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement backup v2 to fetch extra meta for milvus #72

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion models/backup_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package models

import "github.com/golang/protobuf/proto"

// BackupHeader stores etcd backup header information
// BackupHeader stores birdwatcher backup header information.
type BackupHeader struct {
// Version number for backup format
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
Expand Down Expand Up @@ -30,3 +30,47 @@ func (v *BackupHeader) String() string {

// String implements protoiface.MessageV1
func (v *BackupHeader) ProtoMessage() {}

// PartType enum type for PartHeader.ParType.
type PartType int32

const (
// EtcdBackup part stores Etcd KV backup.
EtcdBackup PartType = 1
// MetricsBackup metrics from /metrics.
MetricsBackup PartType = 2
// MetricsDefaultBackup metrics from /metrics_default.
MetricsDefaultBackup PartType = 3
// Configurations configuration fetched from milvus server.
Configurations PartType = 4
// AppMetrics is metrics fetched via grpc metrics api.
AppMetrics PartType = 5
// LoadedSegments is segment info fetched from querynode(Milvus2.2+).
LoadedSegments PartType = 6
)

// PartHeader stores backup part information.
// Added since backup version 2.
type PartHeader struct {
// PartType represent next part type.
PartType int32 `protobuf:"varint,1,opt,name=part_type,proto3" json:"version,omitempty"`
// PartLen stands for part length in bytes.
// -1 for not sure.
// used for fast skipping one part.
PartLen int64 `protobuf:"varint,2,opt,name=part_len,proto3" json:"entries,omitempty"`
// Extra used for extra info storage.
Extra []byte `protobuf:"bytes,3,opt,name=extra,proto3" json:"-"`
}

// Reset implements protoiface.MessageV1
func (v *PartHeader) Reset() {
*v = PartHeader{}
}

// String implements protoiface.MessageV1
func (v *PartHeader) String() string {
return proto.CompactTextString(v)
}

// String implements protoiface.MessageV1
func (v *PartHeader) ProtoMessage() {}
107 changes: 99 additions & 8 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package states

import (
"bufio"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path"
"strings"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/birdwatcher/models"
"github.com/mitchellh/go-homedir"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -25,7 +31,11 @@ type embedEtcdMockState struct {
// Close implements State.
// Clean up embed etcd folder content.
func (s *embedEtcdMockState) Close() {
if s.client != nil {
s.client.Close()
}
if s.server != nil {
s.server.Close()
os.RemoveAll(s.server.Config().Dir)
}
}
Expand All @@ -46,9 +56,17 @@ func (s *embedEtcdMockState) SetupCommands() {
cleanEmptySegments(s.client, path.Join(s.instanceName, metaPath)),
// remove-segment-by-id
removeSegmentByID(s.client, path.Join(s.instanceName, metaPath)),
// force-release
getForceReleaseCmd(s.client, path.Join(s.instanceName, metaPath)),

// disconnect
getDisconnectCmd(s),

// repair-segment
getRepairSegmentCmd(s.client, path.Join(s.instanceName, metaPath)),
// repair-channel
getRepairChannelCmd(s.client, path.Join(s.instanceName, metaPath)),

// raw get
getEtcdRawCmd(s.client),

Expand All @@ -61,6 +79,10 @@ func (s *embedEtcdMockState) SetupCommands() {
s.setupFn = s.SetupCommands
}

func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.cmdState.label = fmt.Sprintf("Backup(%s)", instanceName)
}

func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName string) State {

state := &embedEtcdMockState{
Expand All @@ -77,6 +99,19 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName
return state
}

func getEmbedEtcdInstanceV2(server *embed.Etcd) *embedEtcdMockState {

client := v3client.New(server.Server)
state := &embedEtcdMockState{
cmdState: cmdState{},
server: server,
client: client,
}

state.SetupCommands()
return state
}

func getLoadBackupCmd(state State) *cobra.Command {
cmd := &cobra.Command{
Use: "load-backup [file]",
Expand Down Expand Up @@ -106,29 +141,85 @@ func getLoadBackupCmd(state State) *cobra.Command {
return
}

f, err := os.Open(arg)
if err != nil {
fmt.Printf("failed to open backup file %s, err: %s\n", arg, err.Error())
return
}
r, err := gzip.NewReader(f)
if err != nil {
fmt.Println("failed to open gzip reader, err:", err.Error())
return
}
defer r.Close()

rd := bufio.NewReader(r)
var header models.BackupHeader
err = readFixLengthHeader(rd, &header)
if err != nil {
fmt.Println("failed to load backup header", err.Error())
return
}

server, err := startEmbedEtcdServer()
if err != nil {
fmt.Println("failed to start embed etcd server:", err.Error())
return
}
fmt.Println("using data dir:", server.Config().Dir)

var rootPath string
client := v3client.New(server.Server)
rootPath, _, err = restoreEtcd(client, arg)
if err != nil {
fmt.Printf("failed to restore file: %s, error: %s", arg, err.Error())
server.Close()
nextState := getEmbedEtcdInstanceV2(server)
switch header.Version {
case 1:
err = restoreFromV1File(nextState.client, rd, &header)
if err != nil {
fmt.Println("failed to restore v1 backup file", err.Error())
nextState.Close()
return
}
nextState.SetInstance(header.Instance)
case 2:
err = restoreV2File(rd, nextState)
if err != nil {
fmt.Println("failed to restore v2 backup file", err.Error())
nextState.Close()
return
}
default:
fmt.Printf("backup version %d not supported\n", header.Version)
nextState.Close()
return
}

state.SetNext(getEmbedEtcdInstance(server, client, rootPath))
state.SetNext(nextState)
},
}

return cmd
}

func readFixLengthHeader[T proto.Message](rd *bufio.Reader, header T) error {
lb := make([]byte, 8)
lenRead, err := rd.Read(lb)
if err == io.EOF || lenRead < 8 {
return fmt.Errorf("File does not contains valid header")
}

nextBytes := binary.LittleEndian.Uint64(lb)
headerBs := make([]byte, nextBytes)
lenRead, err = io.ReadFull(rd, headerBs)
if err != nil {
return fmt.Errorf("failed to read header bytes, %w", err)
}
if lenRead != int(nextBytes) {
return fmt.Errorf("not enough bytes for header")
}
err = proto.Unmarshal(headerBs, header)
if err != nil {
return fmt.Errorf("failed to unmarshal header, err: %w", err)
}
return nil
}

// testFile check file path exists and has access
func testFile(file string) error {
fi, err := os.Stat(file)
Expand Down
Loading