Skip to content

Commit

Permalink
v2 volume: support backup and restore
Browse files Browse the repository at this point in the history
Longhorn 6138

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit authored and David Ko committed Jan 3, 2024
1 parent 2ab63be commit 82530a7
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 119 deletions.
33 changes: 31 additions & 2 deletions pkg/client/proxy_backup.go
Expand Up @@ -182,8 +182,37 @@ func (c *ProxyClient) BackupRestore(dataEngine, engineName, volumeName, serviceA
return nil
}

func (c *ProxyClient) BackupRestoreStatus(dataEngine, engineName, volumeName,
serviceAddress string) (status map[string]*BackupRestoreStatus, err error) {
func (c *ProxyClient) BackupRestoreFinish(dataEngine, engineName, volumeName, serviceAddress string) error {
input := map[string]string{
"engineName": engineName,
"volumeName": volumeName,
"serviceAddress": serviceAddress,
}
if err := validateProxyMethodParameters(input); err != nil {
return errors.Wrap(err, "failed to finishing backup restoration")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to finishing backup restoration: invalid data engine %v", dataEngine)
}

req := &rpc.EngineBackupRestoreFinishRequest{
ProxyEngineRequest: &rpc.ProxyEngineRequest{
Address: serviceAddress,
EngineName: engineName,
// nolint:all replaced with DataEngine
BackendStoreDriver: rpc.BackendStoreDriver(driver),
DataEngine: rpc.DataEngine(driver),
// This is the name we will use for validation when communicating with the restoring engine.
VolumeName: volumeName,
},
}
_, err := c.service.BackupRestoreFinish(getContextWithGRPCTimeout(c.ctx), req)
return err
}

func (c *ProxyClient) BackupRestoreStatus(dataEngine, engineName, volumeName, serviceAddress string) (status map[string]*BackupRestoreStatus, err error) {
input := map[string]string{
"engineName": engineName,
"volumeName": volumeName,
Expand Down
1 change: 0 additions & 1 deletion pkg/disk/disk.go
Expand Up @@ -248,7 +248,6 @@ func spdkDiskToDisk(disk *spdkrpc.Disk) *rpc.Disk {
func replicaToReplicaInstance(r *api.Replica) *rpc.ReplicaInstance {
return &rpc.ReplicaInstance{
Name: r.Name,
Uuid: r.UUID,
DiskName: r.LvsName,
DiskUuid: r.LvsUUID,
SpecSize: r.SpecSize,
Expand Down
255 changes: 187 additions & 68 deletions pkg/proxy/backup.go
Expand Up @@ -20,6 +20,9 @@ import (
esync "github.com/longhorn/longhorn-engine/pkg/sync"
etypes "github.com/longhorn/longhorn-engine/pkg/types"
eptypes "github.com/longhorn/longhorn-engine/proto/ptypes"
spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client"

"github.com/longhorn/longhorn-instance-manager/pkg/util"

rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc"
)
Expand All @@ -40,39 +43,28 @@ func (p *Proxy) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku
})
log.Infof("Backing up snapshot %v to backup %v", req.SnapshotName, req.BackupName)

op, ok := p.ops[req.ProxyEngineRequest.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine)
}
return op.SnapshotBackup(ctx, req)
}

func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
for _, env := range req.Envs {
part := strings.SplitN(env, "=", 2)
if len(part) < 2 {
continue
}

if err := os.Setenv(part[0], part[1]); err != nil {
return nil, err
}
if err := setEnv(req.Envs); err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to set envs").Error())
}

credential, err := butil.GetBackupCredential(req.BackupTarget)
if err != nil {
return nil, err
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, errors.Wrapf(err, "failed to get backup credential").Error())
}

labels := []string{}
for k, v := range req.Labels {
labels = append(labels, fmt.Sprintf("%s=%s", k, v))
labels := getLabels(req.Labels)

op, ok := p.ops[req.ProxyEngineRequest.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine)
}
return op.SnapshotBackup(ctx, req, credential, labels)
}

task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName,
req.ProxyEngineRequest.EngineName)
func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest, credential map[string]string, labels []string) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName)
if err != nil {
return nil, err
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create task").Error())
}

recv, err := task.CreateBackup(
Expand All @@ -88,7 +80,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.Eng
credential,
)
if err != nil {
return nil, err
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create backup").Error())
}

return &rpc.EngineSnapshotBackupProxyResponse{
Expand All @@ -98,9 +90,41 @@ func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.Eng
}, nil
}

func (ops V2DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
// TODO: implement this
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
func (ops V2DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest, credential map[string]string, labels []string) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error())
}
defer c.Close()

snapshotName := req.SnapshotName
if snapshotName == "" {
snapshotName = util.UUID()
}

recv, err := c.EngineBackupCreate(&spdkclient.BackupCreateRequest{
BackupName: req.BackupName,
SnapshotName: snapshotName,
VolumeName: req.ProxyEngineRequest.VolumeName,
EngineName: req.ProxyEngineRequest.EngineName,
BackupTarget: req.BackupTarget,
StorageClassName: req.StorageClassName,
BackingImageName: req.BackingImageName,
BackingImageChecksum: req.BackingImageChecksum,
CompressionMethod: req.CompressionMethod,
ConcurrentLimit: req.ConcurrentLimit,
Labels: labels,
Credential: credential,
})
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create backup").Error())
}

return &rpc.EngineSnapshotBackupProxyResponse{
BackupId: recv.Backup,
Replica: recv.ReplicaAddress,
IsIncremental: recv.IsIncremental,
}, nil
}

func (p *Proxy) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
Expand All @@ -127,7 +151,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r
}
defer c.Close()

replicas, err := ops.ReplicaList(ctx, req.ProxyEngineRequest, "")
replicas, err := ops.ReplicaList(ctx, req.ProxyEngineRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,7 +194,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r
}
mode := eptypes.GRPCReplicaModeToReplicaMode(r.Mode)
if mode != etypes.RW {
return nil, errors.Errorf("failed to get %v backup status on unknown replica %s", req.BackupName, replicaAddress)
return nil, errors.Errorf("failed to get backup %v status on unknown replica %s", req.BackupName, replicaAddress)
}
}

Expand All @@ -197,60 +221,54 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r
}

func (ops V2DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
// TODO: implement this
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
}

func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{
"serviceURL": req.ProxyEngineRequest.Address,
"engineName": req.ProxyEngineRequest.EngineName,
"volumeName": req.ProxyEngineRequest.VolumeName,
"dataEngine": req.ProxyEngineRequest.DataEngine,
})
log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName)
c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error())
}
defer c.Close()

op, ok := p.ops[req.ProxyEngineRequest.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine)
status, err := c.EngineBackupStatus(req.BackupName, req.ProxyEngineRequest.EngineName, req.ReplicaAddress)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get backup status").Error())
}
return op.BackupRestore(ctx, req)

return &rpc.EngineSnapshotBackupStatusProxyResponse{
BackupUrl: status.BackupUrl,
Error: status.Error,
Progress: int32(status.Progress),
SnapshotName: status.SnapshotName,
State: status.State,
ReplicaAddress: status.ReplicaAddress,
}, nil
}

func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{
"serviceURL": req.ProxyEngineRequest.Address,
"engineName": req.ProxyEngineRequest.EngineName,
"volumeName": req.ProxyEngineRequest.VolumeName,
"dataEngine": req.ProxyEngineRequest.DataEngine,
})
log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName)

for _, env := range req.Envs {
part := strings.SplitN(env, "=", 2)
if len(part) < 2 {
continue
}

if err := os.Setenv(part[0], part[1]); err != nil {
return nil, err
}
if err := setEnv(req.Envs); err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to set envs").Error())
}

credential, err := butil.GetBackupCredential(req.Target)
if err != nil {
return nil, err
}

task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName,
req.ProxyEngineRequest.EngineName)
if err != nil {
return nil, err
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, errors.Wrapf(err, "failed to get backup credential").Error())
}

resp = &rpc.EngineBackupRestoreProxyResponse{
TaskError: []byte{},
}
err = task.RestoreBackup(req.Url, credential, int(req.ConcurrentLimit))

op, ok := p.ops[req.ProxyEngineRequest.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine)
}
err = op.BackupRestore(ctx, req, credential)
if err != nil {
errInfo, jsonErr := json.Marshal(err)
if jsonErr != nil {
Expand All @@ -267,10 +285,65 @@ func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.Engi
return resp, nil
}

func (ops V2DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest, credential map[string]string) error {
task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName,
req.ProxyEngineRequest.EngineName)
if err != nil {
return err
}
return task.RestoreBackup(req.Url, credential, int(req.ConcurrentLimit))
}

func (ops V2DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest, credential map[string]string) error {
c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address)
if err != nil {
return grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error())
}
defer c.Close()

return c.EngineBackupRestore(&spdkclient.BackupRestoreRequest{
BackupUrl: req.Url,
EngineName: req.ProxyEngineRequest.EngineName,
Credential: credential,
ConcurrentLimit: req.ConcurrentLimit,
})
}

func (p *Proxy) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (resp *emptypb.Empty, err error) {
log := logrus.WithFields(logrus.Fields{
"serviceURL": req.ProxyEngineRequest.Address,
"engineName": req.ProxyEngineRequest.EngineName,
"volumeName": req.ProxyEngineRequest.VolumeName,
"dataEngine": req.ProxyEngineRequest.DataEngine,
})
log.Info("Finishing backup restoration")

op, ok := p.ops[req.ProxyEngineRequest.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine)
}
return op.BackupRestoreFinish(ctx, req)
}

func (ops V1DataEngineProxyOps) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (*emptypb.Empty, error) {
// TODO: implement this
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
}

func (ops V2DataEngineProxyOps) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (*emptypb.Empty, error) {
c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error())
}
defer c.Close()

err = c.EngineBackupRestoreFinish(req.ProxyEngineRequest.EngineName)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to finish backup restoration").Error())
}
return &emptypb.Empty{}, nil
}

func (p *Proxy) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{
"serviceURL": req.Address,
Expand Down Expand Up @@ -318,8 +391,54 @@ func (ops V1DataEngineProxyOps) BackupRestoreStatus(ctx context.Context, req *rp
}

func (ops V2DataEngineProxyOps) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
/* TODO: implement this */
return &rpc.EngineBackupRestoreStatusProxyResponse{
c, err := getSPDKClientFromEngineAddress(req.Address)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.Address).Error())
}
defer c.Close()

recv, err := c.EngineRestoreStatus(req.EngineName)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get restore status").Error())
}

resp = &rpc.EngineBackupRestoreStatusProxyResponse{
Status: map[string]*rpc.EngineBackupRestoreStatus{},
}, nil
}
for address, status := range recv.Status {
replicaURL := "tcp://" + address
resp.Status[replicaURL] = &rpc.EngineBackupRestoreStatus{
IsRestoring: status.IsRestoring,
LastRestored: status.LastRestored,
CurrentRestoringBackup: status.CurrentRestoringBackup,
Progress: int32(status.Progress),
Error: status.Error,
State: status.State,
BackupUrl: status.BackupUrl,
Filename: status.DestFileName,
}
}
return resp, nil
}

func setEnv(envs []string) error {
for _, env := range envs {
part := strings.SplitN(env, "=", 2)
if len(part) < 2 {
continue
}

if err := os.Setenv(part[0], part[1]); err != nil {
return err
}
}
return nil
}

func getLabels(labels map[string]string) []string {
kvs := []string{}
for k, v := range labels {
kvs = append(kvs, fmt.Sprintf("%s=%s", k, v))
}
return kvs
}

0 comments on commit 82530a7

Please sign in to comment.