diff --git a/pkg/proxy/backup.go b/pkg/proxy/backup.go index 2dc42013d..55cb4ab49 100644 --- a/pkg/proxy/backup.go +++ b/pkg/proxy/backup.go @@ -40,17 +40,11 @@ func (p *Proxy) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku }) log.Infof("Backing up snapshot %v to backup %v", req.SnapshotName, req.BackupName) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotBackup(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotBackup(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotBackup, req.ProxyEngineRequest.DataEngine, req) + return v.(*rpc.EngineSnapshotBackupProxyResponse), err } -func (p *Proxy) snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { +func snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { for _, env := range req.Envs { part := strings.SplitN(env, "=", 2) if len(part) < 2 { @@ -101,7 +95,8 @@ func (p *Proxy) snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku }, nil } -func (p *Proxy) spdkSnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { +func spdkSnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) { + // TODO: implement this return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -114,17 +109,11 @@ func (p *Proxy) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapsho }) log.Tracef("Getting %v backup status from replica %v", req.BackupName, req.ReplicaAddress) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotBackupStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotBackupStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotBackupStatus, req.ProxyEngineRequest.DataEngine, req) + return v.(*rpc.EngineSnapshotBackupStatusProxyResponse), err } -func (p *Proxy) snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { +func snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -132,7 +121,7 @@ func (p *Proxy) snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapsho } defer c.Close() - replicas, err := p.ReplicaList(ctx, req.ProxyEngineRequest) + replicas, err := replicaList(ctx, req.ProxyEngineRequest) if err != nil { return nil, err } @@ -201,7 +190,8 @@ func (p *Proxy) snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapsho }, nil } -func (p *Proxy) spdkSnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { +func spdkSnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) { + // TODO: implement this return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -214,17 +204,11 @@ func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreR }) log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.backupRestore(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkBackupRestore(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsBackupRestore, req.ProxyEngineRequest.DataEngine, req) + return v.(*rpc.EngineBackupRestoreProxyResponse), err } -func (p *Proxy) backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { +func backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { log := logrus.WithFields(logrus.Fields{ "serviceURL": req.ProxyEngineRequest.Address, "engineName": req.ProxyEngineRequest.EngineName, @@ -274,7 +258,7 @@ func (p *Proxy) backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreR return resp, nil } -func (p *Proxy) spdkBackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { +func spdkBackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -287,17 +271,11 @@ func (p *Proxy) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineReq }) log.Trace("Getting backup restore status") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.backupRestoreStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkBackupRestoreStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsBackupRestoreStatus, req.DataEngine, req) + return v.(*rpc.EngineBackupRestoreStatusProxyResponse), err } -func (p *Proxy) backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { +func backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { task, err := esync.NewTask(ctx, req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -327,7 +305,7 @@ func (p *Proxy) backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineReq return resp, nil } -func (p *Proxy) spdkBackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { +func spdkBackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) { /* TODO: implement this */ return &rpc.EngineBackupRestoreStatusProxyResponse{ Status: map[string]*rpc.EngineBackupRestoreStatus{}, diff --git a/pkg/proxy/operations.go b/pkg/proxy/operations.go new file mode 100644 index 000000000..1a59f9bd7 --- /dev/null +++ b/pkg/proxy/operations.go @@ -0,0 +1,262 @@ +package proxy + +import ( + "golang.org/x/net/context" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" +) + +type ProxyOps int + +const ( + // Volume operations + ProxyOpsVolumeGet ProxyOps = iota + ProxyOpsVolumeExpand + ProxyOpsVolumeFrontendStart + ProxyOpsVolumeFrontendShutdown + ProxyOpsVolumeUnmapMarkSnapChainRemovedSet + + // Replica operations + ProxyOpsReplicaAdd + ProxyOpsReplicaList + ProxyOpsReplicaRebuildingStatus + ProxyOpsReplicaRemove + ProxyOpsReplicaVerifyRebuild + ProxyOpsReplicaModeUpdate + + // Snapshot operations + ProxyOpsVolumeSnapshot + ProxyOpsSnapshotList + ProxyOpsSnapshotClone + ProxyOpsSnapshotCloneStatus + ProxyOpsSnapshotRevert + ProxyOpsSnapshotPurge + ProxyOpsSnapshotPurgeStatus + ProxyOpsSnapshotRemove + ProxyOpsSnapshotHash + ProxyOpsSnapshotHashStatus + + // Backup operations + ProxyOpsSnapshotBackup + ProxyOpsSnapshotBackupStatus + ProxyOpsBackupRestore + ProxyOpsBackupRestoreStatus +) + +// Type definitions for the functions that execute the operations +type volumeGetFunc func(context.Context, *rpc.ProxyEngineRequest, ...string) (*rpc.EngineVolumeGetProxyResponse, error) +type volumeExpandFunc func(context.Context, *rpc.EngineVolumeExpandRequest) (*emptypb.Empty, error) +type volumeFrontendStartFunc func(context.Context, *rpc.EngineVolumeFrontendStartRequest) (*emptypb.Empty, error) +type volumeFrontendShutdownFunc func(context.Context, *rpc.ProxyEngineRequest) (*emptypb.Empty, error) +type volumeUnmapMarkSnapChainRemovedSetFunc func(context.Context, *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (*emptypb.Empty, error) + +type replicaAddFunc func(context.Context, *rpc.EngineReplicaAddRequest, ...string) (*emptypb.Empty, error) +type replicaListFunc func(context.Context, *rpc.ProxyEngineRequest, ...string) (*rpc.EngineReplicaListProxyResponse, error) +type replicaRebuildingStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineReplicaRebuildStatusProxyResponse, error) +type replicaRemoveFunc func(context.Context, *rpc.EngineReplicaRemoveRequest, ...string) (*emptypb.Empty, error) +type replicaVerifyRebuildFunc func(context.Context, *rpc.EngineReplicaVerifyRebuildRequest) (*emptypb.Empty, error) +type replicaModeUpdateFunc func(context.Context, *rpc.EngineReplicaModeUpdateRequest) (*emptypb.Empty, error) + +type volumeSnapshotFunc func(context.Context, *rpc.EngineVolumeSnapshotRequest) (*rpc.EngineVolumeSnapshotProxyResponse, error) +type snapshotListFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotListProxyResponse, error) +type snapshotCloneFunc func(context.Context, *rpc.EngineSnapshotCloneRequest) (*emptypb.Empty, error) +type snapshotCloneStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotCloneStatusProxyResponse, error) +type snapshotRevertFunc func(context.Context, *rpc.EngineSnapshotRevertRequest) (*emptypb.Empty, error) +type snapshotPurgeFunc func(context.Context, *rpc.EngineSnapshotPurgeRequest) (*emptypb.Empty, error) +type snapshotPurgeStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotPurgeStatusProxyResponse, error) +type snapshotRemoveFunc func(context.Context, *rpc.EngineSnapshotRemoveRequest) (*emptypb.Empty, error) +type snapshotHashFunc func(context.Context, *rpc.EngineSnapshotHashRequest) (*emptypb.Empty, error) +type snapshotHashStatusFunc func(context.Context, *rpc.EngineSnapshotHashStatusRequest) (*rpc.EngineSnapshotHashStatusProxyResponse, error) + +type snapshotBackupFunc func(context.Context, *rpc.EngineSnapshotBackupRequest) (*rpc.EngineSnapshotBackupProxyResponse, error) +type snapshotBackupStatusFunc func(context.Context, *rpc.EngineSnapshotBackupStatusRequest) (*rpc.EngineSnapshotBackupStatusProxyResponse, error) +type backupRestoreFunc func(context.Context, *rpc.EngineBackupRestoreRequest) (*rpc.EngineBackupRestoreProxyResponse, error) +type backupRestoreStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineBackupRestoreStatusProxyResponse, error) + +// ProxyOpsFuncs is a map of ProxyOps to a map of DataEngine to the function that executes the operation +var ( + ProxyOpsFuncs = map[ProxyOps]map[rpc.DataEngine]interface{}{ + // Volume operations + ProxyOpsVolumeGet: { + rpc.DataEngine_DATA_ENGINE_V1: volumeGet, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeGet, + }, + ProxyOpsVolumeExpand: { + rpc.DataEngine_DATA_ENGINE_V1: volumeExpand, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeExpand, + }, + ProxyOpsVolumeFrontendStart: { + rpc.DataEngine_DATA_ENGINE_V1: volumeFrontendStart, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeFrontendStart, + }, + ProxyOpsVolumeFrontendShutdown: { + rpc.DataEngine_DATA_ENGINE_V1: volumeFrontendShutdown, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeFrontendShutdown, + }, + ProxyOpsVolumeUnmapMarkSnapChainRemovedSet: { + rpc.DataEngine_DATA_ENGINE_V1: volumeUnmapMarkSnapChainRemovedSet, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeUnmapMarkSnapChainRemovedSet, + }, + + // Replica operations + ProxyOpsReplicaAdd: { + rpc.DataEngine_DATA_ENGINE_V1: replicaAdd, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaAdd, + }, + ProxyOpsReplicaList: { + rpc.DataEngine_DATA_ENGINE_V1: replicaList, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaList, + }, + ProxyOpsReplicaRebuildingStatus: { + rpc.DataEngine_DATA_ENGINE_V1: replicaRebuildingStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaRebuildingStatus, + }, + ProxyOpsReplicaRemove: { + rpc.DataEngine_DATA_ENGINE_V1: replicaRemove, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaRemove, + }, + ProxyOpsReplicaVerifyRebuild: { + rpc.DataEngine_DATA_ENGINE_V1: replicaVerifyRebuild, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaVerifyRebuild, + }, + ProxyOpsReplicaModeUpdate: { + rpc.DataEngine_DATA_ENGINE_V1: replicaModeUpdate, + rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaModeUpdate, + }, + + // Snapshot operations + ProxyOpsVolumeSnapshot: { + rpc.DataEngine_DATA_ENGINE_V1: volumeSnapshot, + rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeSnapshot, + }, + ProxyOpsSnapshotList: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotList, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotList, + }, + ProxyOpsSnapshotClone: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotClone, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotClone, + }, + ProxyOpsSnapshotCloneStatus: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotCloneStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotCloneStatus, + }, + ProxyOpsSnapshotRevert: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotRevert, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotRevert, + }, + ProxyOpsSnapshotPurge: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotPurge, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotPurge, + }, + ProxyOpsSnapshotPurgeStatus: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotPurgeStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotPurgeStatus, + }, + ProxyOpsSnapshotRemove: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotRemove, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotRemove, + }, + ProxyOpsSnapshotHash: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotHash, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotHash, + }, + ProxyOpsSnapshotHashStatus: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotHashStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotHashStatus, + }, + + // Backup operations + ProxyOpsSnapshotBackup: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotBackup, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotBackup, + }, + ProxyOpsSnapshotBackupStatus: { + rpc.DataEngine_DATA_ENGINE_V1: snapshotBackupStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotBackupStatus, + }, + ProxyOpsBackupRestore: { + rpc.DataEngine_DATA_ENGINE_V1: backupRestore, + rpc.DataEngine_DATA_ENGINE_V2: spdkBackupRestore, + }, + ProxyOpsBackupRestoreStatus: { + rpc.DataEngine_DATA_ENGINE_V1: backupRestoreStatus, + rpc.DataEngine_DATA_ENGINE_V2: spdkBackupRestoreStatus, + }, + } +) + +func executeProxyOp(ctx context.Context, op ProxyOps, dataEngine rpc.DataEngine, req interface{}) (interface{}, error) { + opFuncs, ok := ProxyOpsFuncs[op] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown proxy operation %v", op) + } + + fn, ok := opFuncs[dataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", dataEngine) + } + + switch op { + // Volume operations + case ProxyOpsVolumeGet: + return fn.(volumeGetFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsVolumeExpand: + return fn.(volumeExpandFunc)(ctx, req.(*rpc.EngineVolumeExpandRequest)) + case ProxyOpsVolumeFrontendStart: + return fn.(volumeFrontendStartFunc)(ctx, req.(*rpc.EngineVolumeFrontendStartRequest)) + case ProxyOpsVolumeFrontendShutdown: + return fn.(volumeFrontendShutdownFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsVolumeUnmapMarkSnapChainRemovedSet: + return fn.(volumeUnmapMarkSnapChainRemovedSetFunc)(ctx, req.(*rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest)) + // Replica operations + case ProxyOpsReplicaAdd: + return fn.(replicaAddFunc)(ctx, req.(*rpc.EngineReplicaAddRequest)) + case ProxyOpsReplicaList: + return fn.(replicaListFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsReplicaRebuildingStatus: + return fn.(replicaRebuildingStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsReplicaRemove: + return fn.(replicaRemoveFunc)(ctx, req.(*rpc.EngineReplicaRemoveRequest)) + case ProxyOpsReplicaVerifyRebuild: + return fn.(replicaVerifyRebuildFunc)(ctx, req.(*rpc.EngineReplicaVerifyRebuildRequest)) + case ProxyOpsReplicaModeUpdate: + return fn.(replicaModeUpdateFunc)(ctx, req.(*rpc.EngineReplicaModeUpdateRequest)) + // Snapshot operations + case ProxyOpsVolumeSnapshot: + return fn.(volumeSnapshotFunc)(ctx, req.(*rpc.EngineVolumeSnapshotRequest)) + case ProxyOpsSnapshotList: + return fn.(snapshotListFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsSnapshotClone: + return fn.(snapshotCloneFunc)(ctx, req.(*rpc.EngineSnapshotCloneRequest)) + case ProxyOpsSnapshotCloneStatus: + return fn.(snapshotCloneStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsSnapshotRevert: + return fn.(snapshotRevertFunc)(ctx, req.(*rpc.EngineSnapshotRevertRequest)) + case ProxyOpsSnapshotPurge: + return fn.(snapshotPurgeFunc)(ctx, req.(*rpc.EngineSnapshotPurgeRequest)) + case ProxyOpsSnapshotPurgeStatus: + return fn.(snapshotPurgeStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + case ProxyOpsSnapshotRemove: + return fn.(snapshotRemoveFunc)(ctx, req.(*rpc.EngineSnapshotRemoveRequest)) + case ProxyOpsSnapshotHash: + return fn.(snapshotHashFunc)(ctx, req.(*rpc.EngineSnapshotHashRequest)) + case ProxyOpsSnapshotHashStatus: + return fn.(snapshotHashStatusFunc)(ctx, req.(*rpc.EngineSnapshotHashStatusRequest)) + // Backup operations + case ProxyOpsSnapshotBackup: + return fn.(snapshotBackupFunc)(ctx, req.(*rpc.EngineSnapshotBackupRequest)) + case ProxyOpsSnapshotBackupStatus: + return fn.(snapshotBackupStatusFunc)(ctx, req.(*rpc.EngineSnapshotBackupStatusRequest)) + case ProxyOpsBackupRestore: + return fn.(backupRestoreFunc)(ctx, req.(*rpc.EngineBackupRestoreRequest)) + case ProxyOpsBackupRestoreStatus: + return fn.(backupRestoreStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest)) + + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown proxy operation") + } +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 540e411c4..a5017408c 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -4,11 +4,12 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/net/context" - rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" - "github.com/longhorn/longhorn-instance-manager/pkg/types" - eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" eptypes "github.com/longhorn/longhorn-engine/proto/ptypes" + + "github.com/longhorn/longhorn-instance-manager/pkg/types" + + rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc" ) type Proxy struct { diff --git a/pkg/proxy/replica.go b/pkg/proxy/replica.go index 1f7788a69..218f80bfb 100644 --- a/pkg/proxy/replica.go +++ b/pkg/proxy/replica.go @@ -32,17 +32,11 @@ func (p *Proxy) ReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest }) log.Info("Adding replica") - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.replicaAdd(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkReplicaAdd(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsReplicaAdd, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) replicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest) (resp *emptypb.Empty, err error) { +func replicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -62,8 +56,8 @@ func (p *Proxy) replicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest return &emptypb.Empty{}, nil } -func (p *Proxy) spdkReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest) (resp *emptypb.Empty, err error) { - c, err := spdkclient.NewSPDKClient(p.spdkServiceAddress) +func spdkReplicaAdd(ctx context.Context, req *rpc.EngineReplicaAddRequest, spdkServiceAddress string) (resp *emptypb.Empty, err error) { + c, err := spdkclient.NewSPDKClient(spdkServiceAddress) if err != nil { return nil, err } @@ -88,17 +82,11 @@ func (p *Proxy) ReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (r }) log.Trace("Listing replicas") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.replicaList(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkReplicaList(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsReplicaList, req.DataEngine, req) + return v.(*rpc.EngineReplicaListProxyResponse), err } -func (p *Proxy) replicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaListProxyResponse, err error) { +func replicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaListProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -140,8 +128,8 @@ func replicaModeToGRPCReplicaMode(mode spdktypes.Mode) eptypes.ReplicaMode { return eptypes.ReplicaMode_ERR } -func (p *Proxy) spdkReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaListProxyResponse, err error) { - c, err := spdkclient.NewSPDKClient(p.spdkServiceAddress) +func spdkReplicaList(ctx context.Context, req *rpc.ProxyEngineRequest, spdkServiceAddress string) (resp *rpc.EngineReplicaListProxyResponse, err error) { + c, err := spdkclient.NewSPDKClient(spdkServiceAddress) if err != nil { return nil, err } @@ -183,17 +171,11 @@ func (p *Proxy) ReplicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngin }) log.Trace("Getting replica rebuilding status") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.replicaRebuildingStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkReplicaRebuildingStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsReplicaRebuildingStatus, req.DataEngine, req) + return v.(*rpc.EngineReplicaRebuildStatusProxyResponse), err } -func (p *Proxy) replicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaRebuildStatusProxyResponse, err error) { +func replicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaRebuildStatusProxyResponse, err error) { task, err := esync.NewTask(ctx, req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -220,7 +202,7 @@ func (p *Proxy) replicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngin return resp, nil } -func (p *Proxy) spdkReplicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaRebuildStatusProxyResponse, err error) { +func spdkReplicaRebuildingStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineReplicaRebuildStatusProxyResponse, err error) { /* TODO: implement this */ return &rpc.EngineReplicaRebuildStatusProxyResponse{ Status: make(map[string]*eptypes.ReplicaRebuildStatusResponse), @@ -228,9 +210,16 @@ func (p *Proxy) spdkReplicaRebuildingStatus(ctx context.Context, req *rpc.ProxyE } func (p *Proxy) ReplicaVerifyRebuild(ctx context.Context, req *rpc.EngineReplicaVerifyRebuildRequest) (resp *emptypb.Empty, err error) { - log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address}) + log := logrus.WithFields(logrus.Fields{ + "serviceURL": req.ProxyEngineRequest.Address, + }) log.Infof("Verifying replica %v rebuild", req.ReplicaAddress) + v, err := executeProxyOp(ctx, ProxyOpsReplicaVerifyRebuild, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err +} + +func replicaVerifyRebuild(ctx context.Context, req *rpc.EngineReplicaVerifyRebuildRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -245,6 +234,11 @@ func (p *Proxy) ReplicaVerifyRebuild(ctx context.Context, req *rpc.EngineReplica return &emptypb.Empty{}, nil } +func spdkReplicaVerifyRebuild(ctx context.Context, req *rpc.EngineReplicaVerifyRebuildRequest) (resp *emptypb.Empty, err error) { + /* TODO: implement this */ + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") +} + func (p *Proxy) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) (resp *emptypb.Empty, err error) { log := logrus.WithFields(logrus.Fields{ "serviceURL": req.ProxyEngineRequest.Address, @@ -255,44 +249,42 @@ func (p *Proxy) ReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveR }) log.Info("Removing replica") - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - if err := p.replicaDelete(ctx, req); err != nil { - return nil, err - } - case rpc.DataEngine_DATA_ENGINE_V2: - if err := p.spdkReplicaDelete(ctx, req); err != nil { - return nil, err - } - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } - - return &emptypb.Empty{}, nil + v, err := executeProxyOp(ctx, ProxyOpsReplicaRemove, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) replicaDelete(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) error { +func replicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) (*emptypb.Empty, error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { - return err + return nil, err } defer c.Close() - return c.ReplicaDelete(req.ReplicaAddress) + return nil, c.ReplicaDelete(req.ReplicaAddress) } -func (p *Proxy) spdkReplicaDelete(ctx context.Context, req *rpc.EngineReplicaRemoveRequest) error { - c, err := spdkclient.NewSPDKClient(p.spdkServiceAddress) +func spdkReplicaRemove(ctx context.Context, req *rpc.EngineReplicaRemoveRequest, spdkServiceAddress string) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(spdkServiceAddress) if err != nil { - return err + return nil, err } defer c.Close() - return c.EngineReplicaDelete(req.ProxyEngineRequest.EngineName, req.ReplicaName, req.ReplicaAddress) + return nil, c.EngineReplicaDelete(req.ProxyEngineRequest.EngineName, req.ReplicaName, req.ReplicaAddress) } func (p *Proxy) ReplicaModeUpdate(ctx context.Context, req *rpc.EngineReplicaModeUpdateRequest) (resp *emptypb.Empty, err error) { + log := logrus.WithFields(logrus.Fields{ + "serviceURL": req.ProxyEngineRequest.Address, + }) + log.Infof("Updating replica mode to %v", req.Mode) + + v, err := executeProxyOp(ctx, ProxyOpsReplicaModeUpdate, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err +} + +func replicaModeUpdate(ctx context.Context, req *rpc.EngineReplicaModeUpdateRequest) (resp *emptypb.Empty, err error) { log := logrus.WithFields(logrus.Fields{"serviceURL": req.ProxyEngineRequest.Address}) log.Infof("Updating replica mode to %v", req.Mode) @@ -309,3 +301,8 @@ func (p *Proxy) ReplicaModeUpdate(ctx context.Context, req *rpc.EngineReplicaMod return &emptypb.Empty{}, nil } + +func spdkReplicaModeUpdate(ctx context.Context, req *rpc.EngineReplicaModeUpdateRequest) (resp *emptypb.Empty, err error) { + /* TODO: implement this */ + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") +} diff --git a/pkg/proxy/snapshot.go b/pkg/proxy/snapshot.go index a58e20a15..396d1fa53 100644 --- a/pkg/proxy/snapshot.go +++ b/pkg/proxy/snapshot.go @@ -23,17 +23,11 @@ func (p *Proxy) VolumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapsho }) log.Infof("Snapshotting volume: snapshot %v", req.SnapshotVolume.Name) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeSnapshot(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeSnapshot(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsVolumeSnapshot, req.ProxyEngineRequest.DataEngine, req) + return v.(*rpc.EngineVolumeSnapshotProxyResponse), err } -func (p *Proxy) volumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapshotRequest) (resp *rpc.EngineVolumeSnapshotProxyResponse, err error) { +func volumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapshotRequest) (resp *rpc.EngineVolumeSnapshotProxyResponse, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -53,7 +47,7 @@ func (p *Proxy) volumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapsho }, nil } -func (p *Proxy) spdkVolumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapshotRequest) (resp *rpc.EngineVolumeSnapshotProxyResponse, err error) { +func spdkVolumeSnapshot(ctx context.Context, req *rpc.EngineVolumeSnapshotRequest) (resp *rpc.EngineVolumeSnapshotProxyResponse, err error) { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -66,17 +60,12 @@ func (p *Proxy) SnapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) ( }) log.Trace("Listing snapshots") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotList(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotList(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotList, req.DataEngine, req) + return v.(*rpc.EngineSnapshotListProxyResponse), err + } -func (p *Proxy) snapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { +func snapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -112,7 +101,7 @@ func (p *Proxy) snapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) ( return resp, nil } -func (p *Proxy) spdkSnapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { +func spdkSnapshotList(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotListProxyResponse, err error) { /* TODO: implement this */ return &rpc.EngineSnapshotListProxyResponse{ Disks: map[string]*rpc.EngineSnapshotDiskInfo{}, @@ -128,17 +117,11 @@ func (p *Proxy) SnapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneR }) log.Infof("Cloning snapshot from %v to %v", req.FromEngineAddress, req.ProxyEngineRequest.Address) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotClone(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotClone(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotClone, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) snapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneRequest) (resp *emptypb.Empty, err error) { +func snapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneRequest) (resp *emptypb.Empty, err error) { cFrom, err := eclient.NewControllerClient(req.FromEngineAddress, req.FromVolumeName, req.FromEngineName) if err != nil { return nil, err @@ -161,7 +144,7 @@ func (p *Proxy) snapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneR return &emptypb.Empty{}, nil } -func (p *Proxy) spdkSnapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneRequest) (resp *emptypb.Empty, err error) { +func spdkSnapshotClone(ctx context.Context, req *rpc.EngineSnapshotCloneRequest) (resp *emptypb.Empty, err error) { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -174,17 +157,11 @@ func (p *Proxy) SnapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineReq }) log.Trace("Getting snapshot clone status") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotCloneStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotCloneStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotCloneStatus, req.DataEngine, req) + return v.(*rpc.EngineSnapshotCloneStatusProxyResponse), err } -func (p *Proxy) snapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotCloneStatusProxyResponse, err error) { +func snapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotCloneStatusProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -213,7 +190,7 @@ func (p *Proxy) snapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineReq return resp, nil } -func (p *Proxy) spdkSnapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotCloneStatusProxyResponse, err error) { +func spdkSnapshotCloneStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotCloneStatusProxyResponse, err error) { /* TODO: implement this */ return &rpc.EngineSnapshotCloneStatusProxyResponse{ Status: map[string]*eptypes.SnapshotCloneStatusResponse{}, @@ -229,17 +206,11 @@ func (p *Proxy) SnapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRever }) log.Infof("Reverting snapshot %v", req.Name) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotRevert(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotRevert(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotRevert, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) snapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRevertRequest) (resp *emptypb.Empty, err error) { +func snapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRevertRequest) (resp *emptypb.Empty, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -254,7 +225,7 @@ func (p *Proxy) snapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRever return &emptypb.Empty{}, nil } -func (p *Proxy) spdkSnapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRevertRequest) (resp *emptypb.Empty, err error) { +func spdkSnapshotRevert(ctx context.Context, req *rpc.EngineSnapshotRevertRequest) (resp *emptypb.Empty, err error) { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -267,17 +238,11 @@ func (p *Proxy) SnapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeR }) log.Info("Purging snapshots") - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotPurge(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotPurge(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotPurge, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) snapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeRequest) (resp *emptypb.Empty, err error) { +func snapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -291,7 +256,7 @@ func (p *Proxy) snapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeR return &emptypb.Empty{}, nil } -func (p *Proxy) spdkSnapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeRequest) (resp *emptypb.Empty, err error) { +func spdkSnapshotPurge(ctx context.Context, req *rpc.EngineSnapshotPurgeRequest) (resp *emptypb.Empty, err error) { /* TODO: implement this */ return &emptypb.Empty{}, nil } @@ -305,17 +270,11 @@ func (p *Proxy) SnapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineReq }) log.Trace("Getting snapshot purge status") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotPurgeStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotPurgeStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotPurgeStatus, req.DataEngine, req) + return v.(*rpc.EngineSnapshotPurgeStatusProxyResponse), err } -func (p *Proxy) snapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) { +func snapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) { task, err := esync.NewTask(ctx, req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -341,7 +300,7 @@ func (p *Proxy) snapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineReq return resp, nil } -func (p *Proxy) spdkSnapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) { +func spdkSnapshotPurgeStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineSnapshotPurgeStatusProxyResponse, err error) { /* TODO: implement this */ return &rpc.EngineSnapshotPurgeStatusProxyResponse{ Status: map[string]*eptypes.SnapshotPurgeStatusResponse{}, @@ -357,17 +316,11 @@ func (p *Proxy) SnapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemov }) log.Infof("Removing snapshots %v", req.Names) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotRemove(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotRemove(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotRemove, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) snapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemoveRequest) (resp *emptypb.Empty, err error) { +func snapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemoveRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -385,7 +338,8 @@ func (p *Proxy) snapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemov return &emptypb.Empty{}, lastErr } -func (p *Proxy) spdkSnapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemoveRequest) (resp *emptypb.Empty, err error) { +func spdkSnapshotRemove(ctx context.Context, req *rpc.EngineSnapshotRemoveRequest) (resp *emptypb.Empty, err error) { + /* TODO: implement this */ return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -398,17 +352,11 @@ func (p *Proxy) SnapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashReq }) log.Infof("Hashing snapshot %v with rehash %v", req.SnapshotName, req.Rehash) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotHash(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotHash(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotHash, req.ProxyEngineRequest.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) snapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *emptypb.Empty, err error) { +func snapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *emptypb.Empty, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -422,7 +370,8 @@ func (p *Proxy) snapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashReq return &emptypb.Empty{}, nil } -func (p *Proxy) spdkSnapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *emptypb.Empty, err error) { +func spdkSnapshotHash(ctx context.Context, req *rpc.EngineSnapshotHashRequest) (resp *emptypb.Empty, err error) { + /* TODO: implement this */ return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -435,17 +384,11 @@ func (p *Proxy) SnapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotH }) log.Trace("Getting snapshot hash status") - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.snapshotHashStatus(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkSnapshotHashStatus(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsSnapshotHashStatus, req.ProxyEngineRequest.DataEngine, req) + return v.(*rpc.EngineSnapshotHashStatusProxyResponse), err } -func (p *Proxy) snapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) { +func snapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) { task, err := esync.NewTask(ctx, req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -472,6 +415,7 @@ func (p *Proxy) snapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotH return resp, nil } -func (p *Proxy) spdkSnapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) { +func spdkSnapshotHashStatus(ctx context.Context, req *rpc.EngineSnapshotHashStatusRequest) (resp *rpc.EngineSnapshotHashStatusProxyResponse, err error) { + /* TODO: implement this */ return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } diff --git a/pkg/proxy/volume.go b/pkg/proxy/volume.go index b08ad0004..73cc9d642 100644 --- a/pkg/proxy/volume.go +++ b/pkg/proxy/volume.go @@ -23,17 +23,11 @@ func (p *Proxy) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (res }) log.Trace("Getting volume") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeGet(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeGet(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsVolumeGet, req.DataEngine, req) + return v.(*rpc.EngineVolumeGetProxyResponse), err } -func (p *Proxy) volumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { +func volumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -63,8 +57,8 @@ func (p *Proxy) volumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (res }, nil } -func (p *Proxy) spdkVolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { - c, err := spdkclient.NewSPDKClient(p.spdkServiceAddress) +func spdkVolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest, spdkServiceAddress string) (resp *rpc.EngineVolumeGetProxyResponse, err error) { + c, err := spdkclient.NewSPDKClient(spdkServiceAddress) if err != nil { return nil, err } @@ -100,17 +94,11 @@ func (p *Proxy) VolumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandReq }) log.Infof("Expanding volume to size %v", req.Expand.Size) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeExpand(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeExpand(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsVolumeExpand, req.ProxyEngineRequest.DataEngine, req.ProxyEngineRequest) + return v.(*emptypb.Empty), err } -func (p *Proxy) volumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandRequest) (resp *emptypb.Empty, err error) { +func volumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandRequest) (resp *emptypb.Empty, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -126,7 +114,7 @@ func (p *Proxy) volumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandReq return &emptypb.Empty{}, nil } -func (p *Proxy) spdkVolumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandRequest) (resp *emptypb.Empty, err error) { +func spdkVolumeExpand(ctx context.Context, req *rpc.EngineVolumeExpandRequest) (resp *emptypb.Empty, err error) { return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented") } @@ -139,17 +127,11 @@ func (p *Proxy) VolumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFr }) log.Infof("Starting volume frontend %v", req.FrontendStart.Frontend) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeFrontendStart(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeFrontendStart(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsVolumeFrontendStart, req.ProxyEngineRequest.DataEngine, req.ProxyEngineRequest) + return v.(*emptypb.Empty), err } -func (p *Proxy) volumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFrontendStartRequest) (resp *emptypb.Empty, err error) { +func volumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFrontendStartRequest) (resp *emptypb.Empty, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -165,7 +147,7 @@ func (p *Proxy) volumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFr return &emptypb.Empty{}, nil } -func (p *Proxy) spdkVolumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFrontendStartRequest) (resp *emptypb.Empty, err error) { +func spdkVolumeFrontendStart(ctx context.Context, req *rpc.EngineVolumeFrontendStartRequest) (resp *emptypb.Empty, err error) { /* Not implemented */ return &emptypb.Empty{}, nil } @@ -179,17 +161,11 @@ func (p *Proxy) VolumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngine }) log.Info("Shutting down volume frontend") - switch req.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeFrontendShutdown(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeFrontendShutdown(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine) - } + v, err := executeProxyOp(ctx, ProxyOpsVolumeFrontendShutdown, req.DataEngine, req) + return v.(*emptypb.Empty), err } -func (p *Proxy) volumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *emptypb.Empty, err error) { +func volumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *emptypb.Empty, err error) { c, err := eclient.NewControllerClient(req.Address, req.VolumeName, req.EngineName) if err != nil { return nil, err @@ -204,7 +180,7 @@ func (p *Proxy) volumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngine return &emptypb.Empty{}, nil } -func (p *Proxy) spdkVolumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *emptypb.Empty, err error) { +func spdkVolumeFrontendShutdown(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *emptypb.Empty, err error) { /* Not implemented */ return &emptypb.Empty{}, nil } @@ -218,18 +194,11 @@ func (p *Proxy) VolumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc }) log.Infof("Setting volume flag UnmapMarkSnapChainRemoved to %v", req.UnmapMarkSnap.Enabled) - switch req.ProxyEngineRequest.DataEngine { - case rpc.DataEngine_DATA_ENGINE_V1: - return p.volumeUnmapMarkSnapChainRemovedSet(ctx, req) - case rpc.DataEngine_DATA_ENGINE_V2: - return p.spdkVolumeUnmapMarkSnapChainRemovedSet(ctx, req) - default: - return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine) - } - + v, err := executeProxyOp(ctx, ProxyOpsVolumeUnmapMarkSnapChainRemovedSet, req.ProxyEngineRequest.DataEngine, req.ProxyEngineRequest) + return v.(*emptypb.Empty), err } -func (p *Proxy) volumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (resp *emptypb.Empty, err error) { +func volumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (resp *emptypb.Empty, err error) { c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName, req.ProxyEngineRequest.EngineName) if err != nil { @@ -245,7 +214,7 @@ func (p *Proxy) volumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc return &emptypb.Empty{}, nil } -func (p *Proxy) spdkVolumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (resp *emptypb.Empty, err error) { +func spdkVolumeUnmapMarkSnapChainRemovedSet(ctx context.Context, req *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (resp *emptypb.Empty, err error) { /* Not implemented */ return &emptypb.Empty{}, nil }