Skip to content

Commit

Permalink
proxy: remove redundant codes
Browse files Browse the repository at this point in the history
Longhorn 6138

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit authored and David Ko committed Jan 3, 2024
1 parent e09a1a8 commit 0c01fe3
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 250 deletions.
60 changes: 19 additions & 41 deletions pkg/proxy/backup.go
Expand Up @@ -40,17 +40,11 @@ func (p *Proxy) SnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku
})
log.Infof("Backing up snapshot %v to backup %v", req.SnapshotName, req.BackupName)

switch req.ProxyEngineRequest.DataEngine {
case rpc.DataEngine_DATA_ENGINE_V1:
return p.snapshotBackup(ctx, req)
case rpc.DataEngine_DATA_ENGINE_V2:
return p.spdkSnapshotBackup(ctx, req)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine)
}
v, err := executeProxyOp(ctx, ProxyOpsSnapshotBackup, req.ProxyEngineRequest.DataEngine, req)
return v.(*rpc.EngineSnapshotBackupProxyResponse), err
}

func (p *Proxy) snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
func snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
for _, env := range req.Envs {
part := strings.SplitN(env, "=", 2)
if len(part) < 2 {
Expand Down Expand Up @@ -101,7 +95,8 @@ func (p *Proxy) snapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBacku
}, nil
}

func (p *Proxy) spdkSnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
func spdkSnapshotBackup(ctx context.Context, req *rpc.EngineSnapshotBackupRequest) (resp *rpc.EngineSnapshotBackupProxyResponse, err error) {
// TODO: implement this
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
}

Expand All @@ -114,25 +109,19 @@ func (p *Proxy) SnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapsho
})
log.Tracef("Getting %v backup status from replica %v", req.BackupName, req.ReplicaAddress)

switch req.ProxyEngineRequest.DataEngine {
case rpc.DataEngine_DATA_ENGINE_V1:
return p.snapshotBackupStatus(ctx, req)
case rpc.DataEngine_DATA_ENGINE_V2:
return p.spdkSnapshotBackupStatus(ctx, req)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine)
}
v, err := executeProxyOp(ctx, ProxyOpsSnapshotBackupStatus, req.ProxyEngineRequest.DataEngine, req)
return v.(*rpc.EngineSnapshotBackupStatusProxyResponse), err
}

func (p *Proxy) snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
func snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
c, err := eclient.NewControllerClient(req.ProxyEngineRequest.Address, req.ProxyEngineRequest.VolumeName,
req.ProxyEngineRequest.EngineName)
if err != nil {
return nil, err
}
defer c.Close()

replicas, err := p.ReplicaList(ctx, req.ProxyEngineRequest)
replicas, err := replicaList(ctx, req.ProxyEngineRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +190,8 @@ func (p *Proxy) snapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapsho
}, nil
}

func (p *Proxy) spdkSnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
func spdkSnapshotBackupStatus(ctx context.Context, req *rpc.EngineSnapshotBackupStatusRequest) (resp *rpc.EngineSnapshotBackupStatusProxyResponse, err error) {
// TODO: implement this
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
}

Expand All @@ -214,17 +204,11 @@ func (p *Proxy) BackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreR
})
log.Infof("Restoring backup %v to %v", req.Url, req.VolumeName)

switch req.ProxyEngineRequest.DataEngine {
case rpc.DataEngine_DATA_ENGINE_V1:
return p.backupRestore(ctx, req)
case rpc.DataEngine_DATA_ENGINE_V2:
return p.spdkBackupRestore(ctx, req)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.ProxyEngineRequest.DataEngine)
}
v, err := executeProxyOp(ctx, ProxyOpsBackupRestore, req.ProxyEngineRequest.DataEngine, req)
return v.(*rpc.EngineBackupRestoreProxyResponse), err
}

func (p *Proxy) backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
func backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
log := logrus.WithFields(logrus.Fields{
"serviceURL": req.ProxyEngineRequest.Address,
"engineName": req.ProxyEngineRequest.EngineName,
Expand Down Expand Up @@ -274,7 +258,7 @@ func (p *Proxy) backupRestore(ctx context.Context, req *rpc.EngineBackupRestoreR
return resp, nil
}

func (p *Proxy) spdkBackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
func spdkBackupRestore(ctx context.Context, req *rpc.EngineBackupRestoreRequest) (resp *rpc.EngineBackupRestoreProxyResponse, err error) {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "not implemented")
}

Expand All @@ -287,17 +271,11 @@ func (p *Proxy) BackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineReq
})
log.Trace("Getting backup restore status")

switch req.DataEngine {
case rpc.DataEngine_DATA_ENGINE_V1:
return p.backupRestoreStatus(ctx, req)
case rpc.DataEngine_DATA_ENGINE_V2:
return p.spdkBackupRestoreStatus(ctx, req)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", req.DataEngine)
}
v, err := executeProxyOp(ctx, ProxyOpsBackupRestoreStatus, req.DataEngine, req)
return v.(*rpc.EngineBackupRestoreStatusProxyResponse), err
}

func (p *Proxy) backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
func backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
task, err := esync.NewTask(ctx, req.Address, req.VolumeName, req.EngineName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -327,7 +305,7 @@ func (p *Proxy) backupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineReq
return resp, nil
}

func (p *Proxy) spdkBackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
func spdkBackupRestoreStatus(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineBackupRestoreStatusProxyResponse, err error) {
/* TODO: implement this */
return &rpc.EngineBackupRestoreStatusProxyResponse{
Status: map[string]*rpc.EngineBackupRestoreStatus{},
Expand Down
262 changes: 262 additions & 0 deletions pkg/proxy/operations.go
@@ -0,0 +1,262 @@
package proxy

import (
"golang.org/x/net/context"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

rpc "github.com/longhorn/longhorn-instance-manager/pkg/imrpc"
)

type ProxyOps int

const (
// Volume operations
ProxyOpsVolumeGet ProxyOps = iota
ProxyOpsVolumeExpand
ProxyOpsVolumeFrontendStart
ProxyOpsVolumeFrontendShutdown
ProxyOpsVolumeUnmapMarkSnapChainRemovedSet

// Replica operations
ProxyOpsReplicaAdd
ProxyOpsReplicaList
ProxyOpsReplicaRebuildingStatus
ProxyOpsReplicaRemove
ProxyOpsReplicaVerifyRebuild
ProxyOpsReplicaModeUpdate

// Snapshot operations
ProxyOpsVolumeSnapshot
ProxyOpsSnapshotList
ProxyOpsSnapshotClone
ProxyOpsSnapshotCloneStatus
ProxyOpsSnapshotRevert
ProxyOpsSnapshotPurge
ProxyOpsSnapshotPurgeStatus
ProxyOpsSnapshotRemove
ProxyOpsSnapshotHash
ProxyOpsSnapshotHashStatus

// Backup operations
ProxyOpsSnapshotBackup
ProxyOpsSnapshotBackupStatus
ProxyOpsBackupRestore
ProxyOpsBackupRestoreStatus
)

// Type definitions for the functions that execute the operations
type volumeGetFunc func(context.Context, *rpc.ProxyEngineRequest, ...string) (*rpc.EngineVolumeGetProxyResponse, error)
type volumeExpandFunc func(context.Context, *rpc.EngineVolumeExpandRequest) (*emptypb.Empty, error)
type volumeFrontendStartFunc func(context.Context, *rpc.EngineVolumeFrontendStartRequest) (*emptypb.Empty, error)
type volumeFrontendShutdownFunc func(context.Context, *rpc.ProxyEngineRequest) (*emptypb.Empty, error)
type volumeUnmapMarkSnapChainRemovedSetFunc func(context.Context, *rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest) (*emptypb.Empty, error)

type replicaAddFunc func(context.Context, *rpc.EngineReplicaAddRequest, ...string) (*emptypb.Empty, error)
type replicaListFunc func(context.Context, *rpc.ProxyEngineRequest, ...string) (*rpc.EngineReplicaListProxyResponse, error)
type replicaRebuildingStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineReplicaRebuildStatusProxyResponse, error)
type replicaRemoveFunc func(context.Context, *rpc.EngineReplicaRemoveRequest, ...string) (*emptypb.Empty, error)
type replicaVerifyRebuildFunc func(context.Context, *rpc.EngineReplicaVerifyRebuildRequest) (*emptypb.Empty, error)
type replicaModeUpdateFunc func(context.Context, *rpc.EngineReplicaModeUpdateRequest) (*emptypb.Empty, error)

type volumeSnapshotFunc func(context.Context, *rpc.EngineVolumeSnapshotRequest) (*rpc.EngineVolumeSnapshotProxyResponse, error)
type snapshotListFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotListProxyResponse, error)
type snapshotCloneFunc func(context.Context, *rpc.EngineSnapshotCloneRequest) (*emptypb.Empty, error)
type snapshotCloneStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotCloneStatusProxyResponse, error)
type snapshotRevertFunc func(context.Context, *rpc.EngineSnapshotRevertRequest) (*emptypb.Empty, error)
type snapshotPurgeFunc func(context.Context, *rpc.EngineSnapshotPurgeRequest) (*emptypb.Empty, error)
type snapshotPurgeStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineSnapshotPurgeStatusProxyResponse, error)
type snapshotRemoveFunc func(context.Context, *rpc.EngineSnapshotRemoveRequest) (*emptypb.Empty, error)
type snapshotHashFunc func(context.Context, *rpc.EngineSnapshotHashRequest) (*emptypb.Empty, error)
type snapshotHashStatusFunc func(context.Context, *rpc.EngineSnapshotHashStatusRequest) (*rpc.EngineSnapshotHashStatusProxyResponse, error)

type snapshotBackupFunc func(context.Context, *rpc.EngineSnapshotBackupRequest) (*rpc.EngineSnapshotBackupProxyResponse, error)
type snapshotBackupStatusFunc func(context.Context, *rpc.EngineSnapshotBackupStatusRequest) (*rpc.EngineSnapshotBackupStatusProxyResponse, error)
type backupRestoreFunc func(context.Context, *rpc.EngineBackupRestoreRequest) (*rpc.EngineBackupRestoreProxyResponse, error)
type backupRestoreStatusFunc func(context.Context, *rpc.ProxyEngineRequest) (*rpc.EngineBackupRestoreStatusProxyResponse, error)

// ProxyOpsFuncs is a map of ProxyOps to a map of DataEngine to the function that executes the operation
var (
ProxyOpsFuncs = map[ProxyOps]map[rpc.DataEngine]interface{}{
// Volume operations
ProxyOpsVolumeGet: {
rpc.DataEngine_DATA_ENGINE_V1: volumeGet,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeGet,
},
ProxyOpsVolumeExpand: {
rpc.DataEngine_DATA_ENGINE_V1: volumeExpand,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeExpand,
},
ProxyOpsVolumeFrontendStart: {
rpc.DataEngine_DATA_ENGINE_V1: volumeFrontendStart,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeFrontendStart,
},
ProxyOpsVolumeFrontendShutdown: {
rpc.DataEngine_DATA_ENGINE_V1: volumeFrontendShutdown,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeFrontendShutdown,
},
ProxyOpsVolumeUnmapMarkSnapChainRemovedSet: {
rpc.DataEngine_DATA_ENGINE_V1: volumeUnmapMarkSnapChainRemovedSet,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeUnmapMarkSnapChainRemovedSet,
},

// Replica operations
ProxyOpsReplicaAdd: {
rpc.DataEngine_DATA_ENGINE_V1: replicaAdd,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaAdd,
},
ProxyOpsReplicaList: {
rpc.DataEngine_DATA_ENGINE_V1: replicaList,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaList,
},
ProxyOpsReplicaRebuildingStatus: {
rpc.DataEngine_DATA_ENGINE_V1: replicaRebuildingStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaRebuildingStatus,
},
ProxyOpsReplicaRemove: {
rpc.DataEngine_DATA_ENGINE_V1: replicaRemove,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaRemove,
},
ProxyOpsReplicaVerifyRebuild: {
rpc.DataEngine_DATA_ENGINE_V1: replicaVerifyRebuild,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaVerifyRebuild,
},
ProxyOpsReplicaModeUpdate: {
rpc.DataEngine_DATA_ENGINE_V1: replicaModeUpdate,
rpc.DataEngine_DATA_ENGINE_V2: spdkReplicaModeUpdate,
},

// Snapshot operations
ProxyOpsVolumeSnapshot: {
rpc.DataEngine_DATA_ENGINE_V1: volumeSnapshot,
rpc.DataEngine_DATA_ENGINE_V2: spdkVolumeSnapshot,
},
ProxyOpsSnapshotList: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotList,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotList,
},
ProxyOpsSnapshotClone: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotClone,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotClone,
},
ProxyOpsSnapshotCloneStatus: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotCloneStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotCloneStatus,
},
ProxyOpsSnapshotRevert: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotRevert,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotRevert,
},
ProxyOpsSnapshotPurge: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotPurge,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotPurge,
},
ProxyOpsSnapshotPurgeStatus: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotPurgeStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotPurgeStatus,
},
ProxyOpsSnapshotRemove: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotRemove,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotRemove,
},
ProxyOpsSnapshotHash: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotHash,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotHash,
},
ProxyOpsSnapshotHashStatus: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotHashStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotHashStatus,
},

// Backup operations
ProxyOpsSnapshotBackup: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotBackup,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotBackup,
},
ProxyOpsSnapshotBackupStatus: {
rpc.DataEngine_DATA_ENGINE_V1: snapshotBackupStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkSnapshotBackupStatus,
},
ProxyOpsBackupRestore: {
rpc.DataEngine_DATA_ENGINE_V1: backupRestore,
rpc.DataEngine_DATA_ENGINE_V2: spdkBackupRestore,
},
ProxyOpsBackupRestoreStatus: {
rpc.DataEngine_DATA_ENGINE_V1: backupRestoreStatus,
rpc.DataEngine_DATA_ENGINE_V2: spdkBackupRestoreStatus,
},
}
)

func executeProxyOp(ctx context.Context, op ProxyOps, dataEngine rpc.DataEngine, req interface{}) (interface{}, error) {
opFuncs, ok := ProxyOpsFuncs[op]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown proxy operation %v", op)
}

fn, ok := opFuncs[dataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown data engine %v", dataEngine)
}

switch op {
// Volume operations
case ProxyOpsVolumeGet:
return fn.(volumeGetFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsVolumeExpand:
return fn.(volumeExpandFunc)(ctx, req.(*rpc.EngineVolumeExpandRequest))
case ProxyOpsVolumeFrontendStart:
return fn.(volumeFrontendStartFunc)(ctx, req.(*rpc.EngineVolumeFrontendStartRequest))
case ProxyOpsVolumeFrontendShutdown:
return fn.(volumeFrontendShutdownFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsVolumeUnmapMarkSnapChainRemovedSet:
return fn.(volumeUnmapMarkSnapChainRemovedSetFunc)(ctx, req.(*rpc.EngineVolumeUnmapMarkSnapChainRemovedSetRequest))
// Replica operations
case ProxyOpsReplicaAdd:
return fn.(replicaAddFunc)(ctx, req.(*rpc.EngineReplicaAddRequest))
case ProxyOpsReplicaList:
return fn.(replicaListFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsReplicaRebuildingStatus:
return fn.(replicaRebuildingStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsReplicaRemove:
return fn.(replicaRemoveFunc)(ctx, req.(*rpc.EngineReplicaRemoveRequest))
case ProxyOpsReplicaVerifyRebuild:
return fn.(replicaVerifyRebuildFunc)(ctx, req.(*rpc.EngineReplicaVerifyRebuildRequest))
case ProxyOpsReplicaModeUpdate:
return fn.(replicaModeUpdateFunc)(ctx, req.(*rpc.EngineReplicaModeUpdateRequest))
// Snapshot operations
case ProxyOpsVolumeSnapshot:
return fn.(volumeSnapshotFunc)(ctx, req.(*rpc.EngineVolumeSnapshotRequest))
case ProxyOpsSnapshotList:
return fn.(snapshotListFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsSnapshotClone:
return fn.(snapshotCloneFunc)(ctx, req.(*rpc.EngineSnapshotCloneRequest))
case ProxyOpsSnapshotCloneStatus:
return fn.(snapshotCloneStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsSnapshotRevert:
return fn.(snapshotRevertFunc)(ctx, req.(*rpc.EngineSnapshotRevertRequest))
case ProxyOpsSnapshotPurge:
return fn.(snapshotPurgeFunc)(ctx, req.(*rpc.EngineSnapshotPurgeRequest))
case ProxyOpsSnapshotPurgeStatus:
return fn.(snapshotPurgeStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest))
case ProxyOpsSnapshotRemove:
return fn.(snapshotRemoveFunc)(ctx, req.(*rpc.EngineSnapshotRemoveRequest))
case ProxyOpsSnapshotHash:
return fn.(snapshotHashFunc)(ctx, req.(*rpc.EngineSnapshotHashRequest))
case ProxyOpsSnapshotHashStatus:
return fn.(snapshotHashStatusFunc)(ctx, req.(*rpc.EngineSnapshotHashStatusRequest))
// Backup operations
case ProxyOpsSnapshotBackup:
return fn.(snapshotBackupFunc)(ctx, req.(*rpc.EngineSnapshotBackupRequest))
case ProxyOpsSnapshotBackupStatus:
return fn.(snapshotBackupStatusFunc)(ctx, req.(*rpc.EngineSnapshotBackupStatusRequest))
case ProxyOpsBackupRestore:
return fn.(backupRestoreFunc)(ctx, req.(*rpc.EngineBackupRestoreRequest))
case ProxyOpsBackupRestoreStatus:
return fn.(backupRestoreStatusFunc)(ctx, req.(*rpc.ProxyEngineRequest))

default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown proxy operation")
}
}

0 comments on commit 0c01fe3

Please sign in to comment.