From 82530a79722f27899e6b39eb863f27f80476d361 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 31 Dec 2023 15:20:24 +0000 Subject: [PATCH] v2 volume: support backup and restore Longhorn 6138 Signed-off-by: Derek Su --- pkg/client/proxy_backup.go | 33 ++++- pkg/disk/disk.go | 1 - pkg/proxy/backup.go | 255 +++++++++++++++++++++++++++---------- pkg/proxy/proxy.go | 51 +++++--- pkg/proxy/replica.go | 36 +++--- pkg/proxy/snapshot.go | 110 +++++++++++++++- pkg/proxy/volume.go | 14 +- pkg/types/types.go | 8 ++ 8 files changed, 389 insertions(+), 119 deletions(-) diff --git a/pkg/client/proxy_backup.go b/pkg/client/proxy_backup.go index 17127d11b..79b6a4138 100644 --- a/pkg/client/proxy_backup.go +++ b/pkg/client/proxy_backup.go @@ -182,8 +182,37 @@ func (c *ProxyClient) BackupRestore(dataEngine, engineName, volumeName, serviceA return nil } -func (c *ProxyClient) BackupRestoreStatus(dataEngine, engineName, volumeName, - serviceAddress string) (status map[string]*BackupRestoreStatus, err error) { +func (c *ProxyClient) BackupRestoreFinish(dataEngine, engineName, volumeName, serviceAddress string) error { + input := map[string]string{ + "engineName": engineName, + "volumeName": volumeName, + "serviceAddress": serviceAddress, + } + if err := validateProxyMethodParameters(input); err != nil { + return errors.Wrap(err, "failed to finishing backup restoration") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to finishing backup restoration: invalid data engine %v", dataEngine) + } + + req := &rpc.EngineBackupRestoreFinishRequest{ + ProxyEngineRequest: &rpc.ProxyEngineRequest{ + Address: serviceAddress, + EngineName: engineName, + // nolint:all replaced with DataEngine + BackendStoreDriver: rpc.BackendStoreDriver(driver), + DataEngine: rpc.DataEngine(driver), + // This is the name we will use for validation when communicating with the restoring engine. + VolumeName: volumeName, + }, + } + _, err := c.service.BackupRestoreFinish(getContextWithGRPCTimeout(c.ctx), req) + return err +} + +func (c *ProxyClient) BackupRestoreStatus(dataEngine, engineName, volumeName, serviceAddress string) (status map[string]*BackupRestoreStatus, err error) { input := map[string]string{ "engineName": engineName, "volumeName": volumeName, diff --git a/pkg/disk/disk.go b/pkg/disk/disk.go index b8d53751a..5138faf50 100644 --- a/pkg/disk/disk.go +++ b/pkg/disk/disk.go @@ -248,7 +248,6 @@ func spdkDiskToDisk(disk *spdkrpc.Disk) *rpc.Disk { func replicaToReplicaInstance(r *api.Replica) *rpc.ReplicaInstance { return &rpc.ReplicaInstance{ Name: r.Name, - Uuid: r.UUID, DiskName: r.LvsName, DiskUuid: r.LvsUUID, SpecSize: r.SpecSize, diff --git a/pkg/proxy/backup.go b/pkg/proxy/backup.go index 5f94440a4..6d8f2b6e3 100644 --- a/pkg/proxy/backup.go +++ b/pkg/proxy/backup.go @@ -20,6 +20,9 @@ import ( esync "github.com/longhorn/longhorn-engine/pkg/sync" etypes "github.com/longhorn/longhorn-engine/pkg/types" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" + spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client" + + "github.com/longhorn/longhorn-instance-manager/pkg/util" rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" ) @@ -40,39 +43,28 @@ func (p *Proxy) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku }) log.Infof("Backing up snapshot %v to backup %v", req.SnapshotName, req.BackupName) - op, ok := p.ops[req.ProxyEngineRequest.DataEngine] - if !ok { - return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) - } - return op.SnapshotBackup(ctx, req) -} - -func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { - for _, env := range req.Envs { - part := strings.SplitN(env, "=", 2) - if len(part) < 2 { - continue - } - - if err := os.Setenv(part[0], part[1]); err != nil { - return nil, err - } + if err := setEnv(req.Envs); err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to set envs").Error()) } credential, err := butil.GetBackupCredential(req.BackupTarget) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, errors.Wrapf(err, "failed to get backup credential").Error()) } - labels := []string{} - for k, v := range req.Labels { - labels = append(labels, fmt.Sprintf("%s=%s", k, v)) + labels := getLabels(req.Labels) + + op, ok := p.ops[req.ProxyEngineRequest.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) } + return op.SnapshotBackup(ctx, req, credential, labels) +} - task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, - req.ProxyEngineRequest.EngineName) +func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest, credential map[string]string, labels []string) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { + task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { - return nil, err + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create task").Error()) } recv, err := task.CreateBackup( @@ -88,7 +80,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.Eng credential, ) if err != nil { - return nil, err + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create backup").Error()) } return &rpc.EngineSnapshotBackupProxyResponse{ @@ -98,9 +90,41 @@ func (ops V1DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.Eng }, nil } -func (ops V2DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { - // TODO: implement this - return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") +func (ops V2DataEngineProxyOps) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest, credential map[string]string, labels []string) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) + } + defer c.Close() + + snapshotName := req.SnapshotName + if snapshotName == "" { + snapshotName = util.UUID() + } + + recv, err := c.EngineBackupCreate(&spdkclient.BackupCreateRequest{ + BackupName: req.BackupName, + SnapshotName: snapshotName, + VolumeName: req.ProxyEngineRequest.VolumeName, + EngineName: req.ProxyEngineRequest.EngineName, + BackupTarget: req.BackupTarget, + StorageClassName: req.StorageClassName, + BackingImageName: req.BackingImageName, + BackingImageChecksum: req.BackingImageChecksum, + CompressionMethod: req.CompressionMethod, + ConcurrentLimit: req.ConcurrentLimit, + Labels: labels, + Credential: credential, + }) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create backup").Error()) + } + + return &rpc.EngineSnapshotBackupProxyResponse{ + BackupId: recv.Backup, + Replica: recv.ReplicaAddress, + IsIncremental: recv.IsIncremental, + }, nil } func (p *Proxy) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { @@ -127,7 +151,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r } defer c.Close() - replicas, err := ops.ReplicaList(ctx, req.ProxyEngineRequest, "") + replicas, err := ops.ReplicaList(ctx, req.ProxyEngineRequest) if err != nil { return nil, err } @@ -170,7 +194,7 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r } mode := eptypes.GRPCReplicaModeToReplicaMode(r.Mode) if mode != etypes.RW { - return nil, errors.Errorf("failed to get %v backup status on unknown replica %s", req.BackupName, replicaAddress) + return nil, errors.Errorf("failed to get backup %v status on unknown replica %s", req.BackupName, replicaAddress) } } @@ -197,60 +221,54 @@ func (ops V1DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *r } func (ops V2DataEngineProxyOps) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { - // TODO: implement this - return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") -} - -func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { - log := logrus.WithFields(logrus.Fields{ - "serviceURL": req.ProxyEngineRequest.Address, - "engineName": req.ProxyEngineRequest.EngineName, - "volumeName": req.ProxyEngineRequest.VolumeName, - "dataEngine": req.ProxyEngineRequest.DataEngine, - }) - log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName) + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) + } + defer c.Close() - op, ok := p.ops[req.ProxyEngineRequest.DataEngine] - if !ok { - return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) + status, err := c.EngineBackupStatus(req.BackupName, req.ProxyEngineRequest.EngineName, req.ReplicaAddress) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get backup status").Error()) } - return op.BackupRestore(ctx, req) + + return &rpc.EngineSnapshotBackupStatusProxyResponse{ + BackupUrl: status.BackupUrl, + Error: status.Error, + Progress: int32(status.Progress), + SnapshotName: status.SnapshotName, + State: status.State, + ReplicaAddress: status.ReplicaAddress, + }, nil } -func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { +func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { log := logrus.WithFields(logrus.Fields{ "serviceURL": req.ProxyEngineRequest.Address, "engineName": req.ProxyEngineRequest.EngineName, "volumeName": req.ProxyEngineRequest.VolumeName, "dataEngine": req.ProxyEngineRequest.DataEngine, }) + log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName) - for _, env := range req.Envs { - part := strings.SplitN(env, "=", 2) - if len(part) < 2 { - continue - } - - if err := os.Setenv(part[0], part[1]); err != nil { - return nil, err - } + if err := setEnv(req.Envs); err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to set envs").Error()) } credential, err := butil.GetBackupCredential(req.Target) if err != nil { - return nil, err - } - - task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, - req.ProxyEngineRequest.EngineName) - if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, errors.Wrapf(err, "failed to get backup credential").Error()) } resp = &rpc.EngineBackupRestoreProxyResponse{ TaskError: []byte{}, } - err = task.RestoreBackup(req.Url, credential, int(req.ConcurrentLimit)) + + op, ok := p.ops[req.ProxyEngineRequest.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) + } + err = op.BackupRestore(ctx, req, credential) if err != nil { errInfo, jsonErr := json.Marshal(err) if jsonErr != nil { @@ -267,10 +285,65 @@ func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.Engi return resp, nil } -func (ops V2DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { +func (ops V1DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest, credential map[string]string) error { + task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, + req.ProxyEngineRequest.EngineName) + if err != nil { + return err + } + return task.RestoreBackup(req.Url, credential, int(req.ConcurrentLimit)) +} + +func (ops V2DataEngineProxyOps) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest, credential map[string]string) error { + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) + if err != nil { + return grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) + } + defer c.Close() + + return c.EngineBackupRestore(&spdkclient.BackupRestoreRequest{ + BackupUrl: req.Url, + EngineName: req.ProxyEngineRequest.EngineName, + Credential: credential, + ConcurrentLimit: req.ConcurrentLimit, + }) +} + +func (p *Proxy) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (resp *emptypb.Empty, err error) { + log := logrus.WithFields(logrus.Fields{ + "serviceURL": req.ProxyEngineRequest.Address, + "engineName": req.ProxyEngineRequest.EngineName, + "volumeName": req.ProxyEngineRequest.VolumeName, + "dataEngine": req.ProxyEngineRequest.DataEngine, + }) + log.Info("Finishing backup restoration") + + op, ok := p.ops[req.ProxyEngineRequest.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) + } + return op.BackupRestoreFinish(ctx, req) +} + +func (ops V1DataEngineProxyOps) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (*emptypb.Empty, error) { + // TODO: implement this return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } +func (ops V2DataEngineProxyOps) BackupRestoreFinish(ctx context.Context, req *rpc.EngineBackupRestoreFinishRequest) (*emptypb.Empty, error) { + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) + } + defer c.Close() + + err = c.EngineBackupRestoreFinish(req.ProxyEngineRequest.EngineName) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to finish backup restoration").Error()) + } + return &emptypb.Empty{}, nil +} + func (p *Proxy) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { log := logrus.WithFields(logrus.Fields{ "serviceURL": req.Address, @@ -318,8 +391,54 @@ func (ops V1DataEngineProxyOps) BackupRestoreStatus(ctx context.Context, req *rp } func (ops V2DataEngineProxyOps) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { - /* TODO: implement this */ - return &rpc.EngineBackupRestoreStatusProxyResponse{ + c, err := getSPDKClientFromEngineAddress(req.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.Address).Error()) + } + defer c.Close() + + recv, err := c.EngineRestoreStatus(req.EngineName) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get restore status").Error()) + } + + resp = &rpc.EngineBackupRestoreStatusProxyResponse{ Status: map[string]*rpc.EngineBackupRestoreStatus{}, - }, nil + } + for address, status := range recv.Status { + replicaURL := "tcp://" + address + resp.Status[replicaURL] = &rpc.EngineBackupRestoreStatus{ + IsRestoring: status.IsRestoring, + LastRestored: status.LastRestored, + CurrentRestoringBackup: status.CurrentRestoringBackup, + Progress: int32(status.Progress), + Error: status.Error, + State: status.State, + BackupUrl: status.BackupUrl, + Filename: status.DestFileName, + } + } + return resp, nil +} + +func setEnv(envs []string) error { + for _, env := range envs { + part := strings.SplitN(env, "=", 2) + if len(part) < 2 { + continue + } + + if err := os.Setenv(part[0], part[1]); err != nil { + return err + } + } + return nil +} + +func getLabels(labels map[string]string) []string { + kvs := []string{} + for k, v := range labels { + kvs = append(kvs, fmt.Sprintf("%s=%s", k, v)) + } + return kvs } diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 39d2dbd49..913613cdd 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -1,12 +1,16 @@ package proxy import ( + "net" + "strconv" + + eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" + eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/protobuf/types/known/emptypb" - eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" - eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" + spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client" "github.com/longhorn/longhorn-instance-manager/pkg/types" @@ -14,16 +18,16 @@ import ( ) type ProxyOps interface { - VolumeGet(context.Context, *rpc.ProxyEngineRequest, string) (*rpc.EngineVolumeGetProxyResponse, error) + VolumeGet(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineVolumeGetProxyResponse, error) VolumeExpand(context.Context, *rpc.EngineVolumeExpandRequest) (*emptypb.Empty, error) VolumeFrontendStart(context.Context, *rpc.EngineVolumeFrontendStartRequest) (*emptypb.Empty, error) VolumeFrontendShutdown(context.Context, *rpc.ProxyEngineRequest) (*emptypb.Empty, error) VolumeUnmapMarkSnapChainRemovedSet(context.Context, *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (*emptypb.Empty, error) - ReplicaAdd(context.Context, *rpc.EngineReplicaAddRequest, string) (*emptypb.Empty, error) - ReplicaList(context.Context, *rpc.ProxyEngineRequest, string) (*rpc.EngineReplicaListProxyResponse, error) + ReplicaAdd(context.Context, *rpc.EngineReplicaAddRequest) (*emptypb.Empty, error) + ReplicaList(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineReplicaListProxyResponse, error) ReplicaRebuildingStatus(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineReplicaRebuildStatusProxyResponse, error) - ReplicaRemove(context.Context, *rpc.EngineReplicaRemoveRequest, string) (*emptypb.Empty, error) + ReplicaRemove(context.Context, *rpc.EngineReplicaRemoveRequest) (*emptypb.Empty, error) ReplicaVerifyRebuild(context.Context, *rpc.EngineReplicaVerifyRebuildRequest) (*emptypb.Empty, error) ReplicaModeUpdate(context.Context, *rpc.EngineReplicaModeUpdateRequest) (*emptypb.Empty, error) @@ -38,9 +42,10 @@ type ProxyOps interface { SnapshotHash(context.Context, *rpc.EngineSnapshotHashRequest) (*emptypb.Empty, error) SnapshotHashStatus(context.Context, *rpc.EngineSnapshotHashStatusRequest) (*rpc.EngineSnapshotHashStatusProxyResponse, error) - SnapshotBackup(context.Context, *rpc.EngineSnapshotBackupRequest) (*rpc.EngineSnapshotBackupProxyResponse, error) + SnapshotBackup(context.Context, *rpc.EngineSnapshotBackupRequest, map[string]string, []string) (*rpc.EngineSnapshotBackupProxyResponse, error) SnapshotBackupStatus(context.Context, *rpc.EngineSnapshotBackupStatusRequest) (*rpc.EngineSnapshotBackupStatusProxyResponse, error) - BackupRestore(context.Context, *rpc.EngineBackupRestoreRequest) (*rpc.EngineBackupRestoreProxyResponse, error) + BackupRestore(context.Context, *rpc.EngineBackupRestoreRequest, map[string]string) error + BackupRestoreFinish(context.Context, *rpc.EngineBackupRestoreFinishRequest) (*emptypb.Empty, error) BackupRestoreStatus(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineBackupRestoreStatusProxyResponse, error) } @@ -52,11 +57,7 @@ type Proxy struct { logsDir string shutdownCh chan error HealthChecker HealthChecker - - diskServiceAddress string - spdkServiceAddress string - - ops map[rpc.DataEngine]ProxyOps + ops map[rpc.DataEngine]ProxyOps } func NewProxy(ctx context.Context, logsDir, diskServiceAddress, spdkServiceAddress string) (*Proxy, error) { @@ -65,12 +66,10 @@ func NewProxy(ctx context.Context, logsDir, diskServiceAddress, spdkServiceAddre rpc.DataEngine_DATA_ENGINE_V2: V2DataEngineProxyOps{}, } p := &Proxy{ - ctx: ctx, - logsDir: logsDir, - HealthChecker: &GRPCHealthChecker{}, - diskServiceAddress: diskServiceAddress, - spdkServiceAddress: spdkServiceAddress, - ops: ops, + ctx: ctx, + logsDir: logsDir, + HealthChecker: &GRPCHealthChecker{}, + ops: ops, } go p.startMonitoring() @@ -121,3 +120,17 @@ func (p *Proxy) ServerVersionGet(ctx context.Context, req *rpc.ProxyEngineReques }, }, nil } + +func getSPDKClientFromEngineAddress(address string) (*spdkclient.SPDKClient, error) { + host, _, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } + + spdkServiceAddress := net.JoinHostPort(host, strconv.Itoa(types.InstanceManagerSpdkServiceDefaultPort)) + if err != nil { + return nil, err + } + + return spdkclient.NewSPDKClient(spdkServiceAddress) +} diff --git a/pkg/proxy/replica.go b/pkg/proxy/replica.go index 5f71073d6..5de1fad27 100644 --- a/pkg/proxy/replica.go +++ b/pkg/proxy/replica.go @@ -3,6 +3,7 @@ package proxy import ( "strings" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" grpccodes "google.golang.org/grpc/codes" @@ -12,7 +13,6 @@ import ( eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" esync "github.com/longhorn/longhorn-engine/pkg/sync" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" - spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client" spdktypes "github.com/longhorn/longhorn-spdk-engine/pkg/types" rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" @@ -36,10 +36,10 @@ func (p *Proxy) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest if !ok { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) } - return op.ReplicaAdd(ctx, req, p.spdkServiceAddress) + return op.ReplicaAdd(ctx, req) } -func (ops V1DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest, unused string) (resp *emptypb.Empty, err error) { +func (ops V1DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -59,10 +59,10 @@ func (ops V1DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineR return &emptypb.Empty{}, nil } -func (ops V2DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest, spdkServiceAddress string) (resp *emptypb.Empty, err error) { - c, err := spdkclient.NewSPDKClient(spdkServiceAddress) +func (ops V2DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest) (resp *emptypb.Empty, err error) { + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) } defer c.Close() @@ -70,7 +70,7 @@ func (ops V2DataEngineProxyOps) ReplicaAdd(ctx context.Context, req *rpc.EngineR err = c.EngineReplicaAdd(req.ProxyEngineRequest.EngineName, req.ReplicaName, replicaAddress) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to add replica %v", replicaAddress).Error()) } return &emptypb.Empty{}, nil } @@ -88,10 +88,10 @@ func (p *Proxy) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (r if !ok { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) } - return op.ReplicaList(ctx, req, p.spdkServiceAddress) + return op.ReplicaList(ctx, req) } -func (ops V1DataEngineProxyOps) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest, ununsed string) (resp *rpc.EngineReplicaListProxyResponse, err error) { +func (ops V1DataEngineProxyOps) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaListProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -133,16 +133,16 @@ func replicaModeToGRPCReplicaMode(mode spdktypes.Mode) eptypes.ReplicaMode { return eptypes.ReplicaMode_ERR } -func (ops V2DataEngineProxyOps) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest, spdkServiceAddress string) (resp *rpc.EngineReplicaListProxyResponse, err error) { - c, err := spdkclient.NewSPDKClient(spdkServiceAddress) +func (ops V2DataEngineProxyOps) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaListProxyResponse, err error) { + c, err := getSPDKClientFromEngineAddress(req.Address) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.Address).Error()) } defer c.Close() recv, err := c.EngineGet(req.EngineName) if err != nil { - return nil, err + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to get engine %v", req.EngineName).Error()) } replicas := []*eptypes.ControllerReplica{} @@ -264,10 +264,10 @@ func (p *Proxy) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveR if !ok { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.ProxyEngineRequest.DataEngine) } - return op.ReplicaRemove(ctx, req, p.spdkServiceAddress) + return op.ReplicaRemove(ctx, req) } -func (ops V1DataEngineProxyOps) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest, unused string) (*emptypb.Empty, error) { +func (ops V1DataEngineProxyOps) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) (*emptypb.Empty, error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -278,10 +278,10 @@ func (ops V1DataEngineProxyOps) ReplicaRemove(ctx context.Context, req *rpc.Engi return nil, c.ReplicaDelete(req.ReplicaAddress) } -func (ops V2DataEngineProxyOps) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest, spdkServiceAddress string) (*emptypb.Empty, error) { - c, err := spdkclient.NewSPDKClient(spdkServiceAddress) +func (ops V2DataEngineProxyOps) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) (*emptypb.Empty, error) { + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) } defer c.Close() diff --git a/pkg/proxy/snapshot.go b/pkg/proxy/snapshot.go index 601e30f6b..9cc7198c4 100644 --- a/pkg/proxy/snapshot.go +++ b/pkg/proxy/snapshot.go @@ -1,6 +1,9 @@ package proxy import ( + "strconv" + + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" grpccodes "google.golang.org/grpc/codes" @@ -10,6 +13,9 @@ import ( eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" esync "github.com/longhorn/longhorn-engine/pkg/sync" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" + "github.com/longhorn/longhorn-spdk-engine/pkg/types" + + "github.com/longhorn/longhorn-instance-manager/pkg/util" rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" ) @@ -51,7 +57,26 @@ func (ops V1DataEngineProxyOps) VolumeSnapshot(ctx context.Context, req *rpc.Eng } func (ops V2DataEngineProxyOps) VolumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapshotRequest) (resp *rpc.EngineVolumeSnapshotProxyResponse, err error) { - return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") + c, err := getSPDKClientFromEngineAddress(req.ProxyEngineRequest.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.ProxyEngineRequest.Address).Error()) + } + defer c.Close() + + snapshotName := req.SnapshotVolume.Name + if snapshotName == "" { + snapshotName = util.UUID() + } + + _, err = c.EngineSnapshotCreate(req.ProxyEngineRequest.EngineName, snapshotName) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to create snapshot %v", snapshotName).Error()) + } + return &rpc.EngineVolumeSnapshotProxyResponse{ + Snapshot: &eptypes.VolumeSnapshotReply{ + Name: snapshotName, + }, + }, nil } func (p *Proxy) SnapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { @@ -107,10 +132,87 @@ func (ops V1DataEngineProxyOps) SnapshotList(ctx context.Context, req *rpc.Proxy } func (ops V2DataEngineProxyOps) SnapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { - /* TODO: implement this */ - return &rpc.EngineSnapshotListProxyResponse{ + disks, err := getSpdkSnapshotsInfo(req.EngineName, req.Address) + if err != nil { + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get snapshots info for engine %v", req.EngineName).Error()) + } + + resp = &rpc.EngineSnapshotListProxyResponse{ Disks: map[string]*rpc.EngineSnapshotDiskInfo{}, - }, nil + } + for k, v := range disks { + resp.Disks[k] = &rpc.EngineSnapshotDiskInfo{ + Name: v.Name, + Parent: v.Parent, + Children: v.Children, + Removed: v.Removed, + UserCreated: v.UserCreated, + Created: v.Created, + Size: v.Size, + Labels: v.Labels, + } + } + return resp, nil +} + +func getSpdkSnapshotsInfo(engineName, address string) (map[string]*rpc.EngineSnapshotDiskInfo, error) { + c, err := getSPDKClientFromEngineAddress(address) + if err != nil { + return nil, errors.Wrapf(err, "failed to get SPDK client from engine address %v", address) + } + defer c.Close() + + engine, err := c.EngineGet(engineName) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v", engineName) + } + + recv, err := c.EngineReplicaList(engineName) + if err != nil { + return nil, errors.Wrapf(err, "failed to get replica list from engine %v", engineName) + } + replicas := recv.Replicas + + disks := map[string]*rpc.EngineSnapshotDiskInfo{} + for replicaName, replicaMode := range engine.ReplicaModeMap { + if replicaMode != types.ModeRW { + continue + } + replica, ok := replicas[replicaName] + if !ok { + continue + } + + newDisks := map[string]*rpc.EngineSnapshotDiskInfo{} + + for name, snapshot := range replica.Snapshots { + children := map[string]bool{} + + for child, value := range snapshot.Children { + children[child] = value + } + + newDisks[name] = &rpc.EngineSnapshotDiskInfo{ + Name: name, + Parent: snapshot.Parent, + Children: children, + Removed: false, + UserCreated: true, + Created: snapshot.CreationTime, + Size: strconv.FormatUint(snapshot.SpecSize, 10), + Labels: map[string]string{}, + } + } + + // we treat the healthy replica with the most snapshots as the + // source of the truth, since that means something are still in + // progress and haven't completed yet. + if len(newDisks) > len(disks) { + disks = newDisks + } + } + + return disks, nil } func (p *Proxy) SnapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneRequest) (resp *emptypb.Empty, err error) { diff --git a/pkg/proxy/volume.go b/pkg/proxy/volume.go index 4de96e09b..918b717f1 100644 --- a/pkg/proxy/volume.go +++ b/pkg/proxy/volume.go @@ -1,6 +1,7 @@ package proxy import ( + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" grpccodes "google.golang.org/grpc/codes" @@ -9,7 +10,6 @@ import ( eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" - spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client" rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" ) @@ -27,10 +27,10 @@ func (p *Proxy) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (res if !ok { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) } - return op.VolumeGet(ctx, req, p.spdkServiceAddress) + return op.VolumeGet(ctx, req) } -func (ops V1DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest, ununsed string) (resp *rpc.EngineVolumeGetProxyResponse, err error) { +func (ops V1DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -60,16 +60,16 @@ func (ops V1DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEng }, nil } -func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest, spdkServiceAddress string) (resp *rpc.EngineVolumeGetProxyResponse, err error) { - c, err := spdkclient.NewSPDKClient(spdkServiceAddress) +func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { + c, err := getSPDKClientFromEngineAddress(req.Address) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get SPDK client from engine address %v", req.Address).Error()) } defer c.Close() recv, err := c.EngineGet(req.EngineName) if err != nil { - return nil, err + return nil, grpcstatus.Errorf(grpccodes.Internal, errors.Wrapf(err, "failed to get engine %v", req.EngineName).Error()) } return &rpc.EngineVolumeGetProxyResponse{ diff --git a/pkg/types/types.go b/pkg/types/types.go index e875d4e9f..493b4f5ab 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -20,6 +20,14 @@ const ( ProxyGRPCService = "proxy gRPC server" ) +const ( + InstanceManagerProcessManagerServiceDefaultPort = 8500 + InstanceManagerProxyServiceDefaultPort = InstanceManagerProcessManagerServiceDefaultPort + 1 // 8501 + InstanceManagerDiskServiceDefaultPort = InstanceManagerProcessManagerServiceDefaultPort + 2 // 8502 + InstanceManagerInstanceServiceDefaultPort = InstanceManagerProcessManagerServiceDefaultPort + 3 // 8503 + InstanceManagerSpdkServiceDefaultPort = InstanceManagerProcessManagerServiceDefaultPort + 4 // 8504 +) + var ( WaitInterval = 100 * time.Millisecond WaitCount = 600