Skip to content

Commit

Permalink
Add host info to xxxReplicationDriverData, add failback function , fi…
Browse files Browse the repository at this point in the history
…xed some bugs
  • Loading branch information
xxwjj committed Apr 25, 2018
1 parent b510a44 commit 5f6008c
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 114 deletions.
15 changes: 10 additions & 5 deletions contrib/drivers/huawei/dorado/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func (r *ReplicationDriver) CreateReplication(opt *pb.CreateReplicationOpts) (*m

func (r *ReplicationDriver) DeleteReplication(opt *pb.DeleteReplicationOpts) error {
pairId, ok := opt.GetMetadata()[KPairId]
sLunId := opt.SecondaryReplicationDriverData[KLunId]
var sLunId string
if opt.SecondaryVolumeId == "" {
sLunId = opt.SecondaryReplicationDriverData[KLunId]
}
if !ok {
msg := fmt.Sprintf("Can find pair id in metadata")
log.Errorf(msg)
Expand Down Expand Up @@ -115,7 +118,11 @@ func (r *ReplicationDriver) FailoverReplication(opt *pb.FailoverReplicationOpts)
log.Errorf(msg)
return fmt.Errorf(msg)
}
return r.mgr.Failover(pairId)
if opt.SecondaryBackendId == model.ReplicationBackendIdDefault {
return r.mgr.Failover(pairId)
} else {
return r.mgr.Failback(pairId)
}
}

func NewReplicaPairMgr(conf *DoradoConfig) (r *ReplicaPairMgr, err error) {
Expand Down Expand Up @@ -349,9 +356,7 @@ func (r *ReplicaPairMgr) DeleteReplication(pairId, rmtLunId string) error {
// 4. Switch the role of replication pairs.
// 5. Enable replications.

func (r *ReplicaPairMgr) Failback(metaData, primaryDriverData, secondaryDriverData map[string]string) error {
pairId := metaData[KPairId]
//rmtLunId := secondaryDriverData[KLunId]
func (r *ReplicaPairMgr) Failback(pairId string) error {
r.localDriver.Enable(pairId, false)
r.localDriver.WaitReplicaReady(pairId)
r.remoteDriver.Failover(pairId)
Expand Down
2 changes: 1 addition & 1 deletion osdsctl/cli/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func init() {
replicationUpdateCommand.Flags().StringVarP(&replicationName, "name", "n", "", "the name of updated replication")
replicationUpdateCommand.Flags().StringVarP(&replicationDesp, "description", "d", "", "the description of updated replication")
replicationFailoverCommand.Flags().BoolVarP(&allowAttachedVolume, "allow_attached_volume", "a", false, "whether allow attached volume when failing over replication")
replicationFailoverCommand.Flags().StringVarP(&secondaryBackendId, "secondary_backend_id", "s", "", "the secondary backend id of failoverr replication")
replicationFailoverCommand.Flags().StringVarP(&secondaryBackendId, "secondary_backend_id", "s", model.ReplicationBackendIdDefault, "the secondary backend id of failoverr replication(default:default)")
replicationCommand.AddCommand(replicationShowCommand)
replicationCommand.AddCommand(replicationListCommand)
replicationCommand.AddCommand(replicationDeleteCommand)
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func DeleteVolumeDBEntry(ctx *c.Context, in *model.VolumeSpec) error {
}

func DeleteReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error {
invalidStatus := []string{model.ReplicationCreating, model.ReplicationDeleting,
model.ReplicationEnabling, model.ReplicationDisabling, model.ReplicationFailingOver}
invalidStatus := []string{model.ReplicationCreating, model.ReplicationDeleting, model.ReplicationEnabling,
model.ReplicationDisabling, model.ReplicationFailingOver, model.ReplicationFailingBack}

if utils.Contained(in.Status, invalidStatus) {
errMsg := fmt.Sprintf("Can't delete the replication in %s", in.Status)
Expand Down Expand Up @@ -262,14 +262,17 @@ func DisableReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error
return nil
}

func FailoverReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error {
func FailoverReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec, secondaryBackendId string) error {
if in.Status != model.ReplicationAvailable {
errMsg := "Only the replication with the status available can be failover"
log.Error(errMsg)
return errors.New(errMsg)
}

in.Status = model.ReplicationFailingOver
if secondaryBackendId == model.ReplicationBackendIdDefault {
in.Status = model.ReplicationFailingOver
} else {
in.Status = model.ReplicationFailingBack
}
_, err := db.C.UpdateReplication(ctx, in.Id, in)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (this *ReplicationPortal) FailoverReplication(ctx *context.Context) {
return
}

if err := FailoverReplicationDBEntry(c.GetContext(ctx), r); err != nil {
if err := FailoverReplicationDBEntry(c.GetContext(ctx), r, failover.SecondaryBackendId); err != nil {
model.HttpError(ctx, http.StatusBadRequest, err.Error())
return
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,14 +623,26 @@ func (c *Controller) FailoverReplication(ctx *c.Context, replication *model.Repl
if err != nil {
return err
}
err = c.drController.FailoverReplication(ctx, replication, pvol, svol)
if err != nil {
replication.Status = model.ReplicationErrorDisabling
replication.ReplicationStatus = "--"

err = c.drController.FailoverReplication(ctx, replication, failover, pvol, svol)
if failover.SecondaryBackendId == model.ReplicationBackendIdDefault {
if err != nil {
replication.Status = model.ReplicationErrorFailover
replication.ReplicationStatus = "--"
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationFailover
}
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationFailover
if err != nil {
replication.Status = model.ReplicationErrorFailback
replication.ReplicationStatus = "--"
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationEnabled
}
}

if _, err := db.C.UpdateReplication(ctx, replication.Id, replication); err != nil {
log.Error("update replication in db error, ", err)
}
Expand Down
188 changes: 97 additions & 91 deletions pkg/controller/dr/drcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Controller interface {
DeleteReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
EnableReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
DisableReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
FailoverReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
FailoverReplication(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
}

type DrController struct {
Expand Down Expand Up @@ -145,14 +145,15 @@ func (d *DrController) DisableReplication(ctx *c.Context, replica *ReplicationSp
return nil
}

func (d *DrController) FailoverReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error {
func (d *DrController) FailoverReplication(ctx *c.Context, replica *ReplicationSpec,
failover *FailoverReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error {
pPool, _ := db.C.GetPool(ctx, primaryVol.PoolId)
d.LoadOperator(pPool.ReplicationType)
err := d.primaryOp.Failover(ctx, replica, primaryVol)
err := d.primaryOp.Failover(ctx, replica, failover, primaryVol)
if err != nil {
return err
}
err = d.secondaryOp.Failover(ctx, replica, secondaryVol)
err = d.secondaryOp.Failover(ctx, replica, failover, secondaryVol)
if err != nil {
return err
}
Expand All @@ -164,7 +165,7 @@ type ReplicationOperator interface {
Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Enable(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Disable(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Failover(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Failover(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, vol *VolumeSpec) error
}

type ReplicationFactory interface {
Expand Down Expand Up @@ -203,89 +204,6 @@ func (a *ArrayBasedFactory) GetSecondaryOperator(controller volume.Controller) R
return NewArrayPairOperator(controller, false)
}

func NewArrayPairOperator(controller volume.Controller, isPrimary bool) *ArrayPairOperator {
return &ArrayPairOperator{
BaseOperator{
volumeController: controller,
isPrimary: isPrimary,
},
}
}

type ArrayPairOperator struct {
BaseOperator
}

func (a *ArrayPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) (*ReplicationSpec, error) {
if !a.isPrimary {
// TODO: create replication pair in remote device.
return replica, nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return nil, err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return nil, err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.CreateReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
ReplicationMode: replica.ReplicationModel,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
IsPrimary: a.isPrimary,
}
return a.volumeController.CreateReplication(opt)
}

func (a *ArrayPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
if !a.isPrimary {
return nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.DeleteReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
}
return a.volumeController.DeleteReplication(opt)
}

func NewHostPairOperator(controller volume.Controller, isPrimary bool) *HostPairOperator {
return &HostPairOperator{
BaseOperator{
Expand Down Expand Up @@ -317,7 +235,7 @@ func (h *HostPairOperator) attach(ctx *c.Context, vol *VolumeSpec, provisionerDo
HostInfo: &pb.HostInfo{
Platform: attacherDock.Metadata["Platform"],
OsType: attacherDock.Metadata["OsType"],
Ip: attacherDock.Metadata["Host"],
Ip: attacherDock.Metadata["HostIp"],
Host: attacherDock.NodeId,
Initiator: attacherDock.Metadata["Initiator"],
},
Expand Down Expand Up @@ -366,7 +284,8 @@ func (h *HostPairOperator) attach(ctx *c.Context, vol *VolumeSpec, provisionerDo
AccessProtocol: atm.DriverVolumeType,
ConnectionData: string(connData),
Metadata: map[string]string{},
Context: ctx.ToJson()}
Context: ctx.ToJson(),
}
mountPoint, err := h.volumeController.AttachVolume(attachOpt)
if err != nil {
rollback = true
Expand Down Expand Up @@ -454,6 +373,8 @@ func (h *HostPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol
data := map[string]string{
"Mountpoint": atm.Mountpoint,
"AttachmentId": atm.Id,
"HostName": atm.Host,
"HostIp": atm.Ip,
}

volList, _ := db.C.ListVolumes(ctx)
Expand Down Expand Up @@ -535,6 +456,89 @@ func (h *HostPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol
return h.detach(ctx, attachmentId, vol, provisionerDock)
}

func NewArrayPairOperator(controller volume.Controller, isPrimary bool) *ArrayPairOperator {
return &ArrayPairOperator{
BaseOperator{
volumeController: controller,
isPrimary: isPrimary,
},
}
}

type ArrayPairOperator struct {
BaseOperator
}

func (a *ArrayPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) (*ReplicationSpec, error) {
if !a.isPrimary {
// TODO: create replication pair in remote device.
return &ReplicationSpec{}, nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return nil, err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return nil, err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.CreateReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
ReplicationMode: replica.ReplicationModel,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
IsPrimary: a.isPrimary,
}
return a.volumeController.CreateReplication(opt)
}

func (a *ArrayPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
if !a.isPrimary {
return nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.DeleteReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
}
return a.volumeController.DeleteReplication(opt)
}

type BaseOperator struct {
volumeController volume.Controller
isPrimary bool
Expand Down Expand Up @@ -608,7 +612,7 @@ func (h *BaseOperator) Disable(ctx *c.Context, replica *ReplicationSpec, vol *Vo
return h.volumeController.DisableReplication(opt)
}

func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, vol *VolumeSpec) error {
if !h.isPrimary {
return nil
}
Expand Down Expand Up @@ -637,6 +641,8 @@ func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, vol *V
DriverName: pool.ReplicationDriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
AllowAttachedVolume: failover.AllowAttachedVolume,
SecondaryBackendId: failover.SecondaryBackendId,
}
h.volumeController.SetDock(provisionerDock)
return h.volumeController.FailoverReplication(opt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dock/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (add *attachDockDiscoverer) Discover() error {
Metadata: map[string]string{
"Platform": runtime.GOARCH,
"OsType": runtime.GOOS,
"Ip": bindIp,
"HostIp": bindIp,
"Initiator": localIqn,
},
}
Expand Down

0 comments on commit 5f6008c

Please sign in to comment.