Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add host info to xxxReplicationDriverData, add failback function #364

Merged
merged 1 commit into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions contrib/drivers/huawei/dorado/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func (r *ReplicationDriver) CreateReplication(opt *pb.CreateReplicationOpts) (*m

func (r *ReplicationDriver) DeleteReplication(opt *pb.DeleteReplicationOpts) error {
pairId, ok := opt.GetMetadata()[KPairId]
sLunId := opt.SecondaryReplicationDriverData[KLunId]
var sLunId string
if opt.SecondaryVolumeId == "" {
sLunId = opt.SecondaryReplicationDriverData[KLunId]
}
if !ok {
msg := fmt.Sprintf("Can find pair id in metadata")
log.Errorf(msg)
Expand Down Expand Up @@ -115,7 +118,11 @@ func (r *ReplicationDriver) FailoverReplication(opt *pb.FailoverReplicationOpts)
log.Errorf(msg)
return fmt.Errorf(msg)
}
return r.mgr.Failover(pairId)
if opt.SecondaryBackendId == model.ReplicationBackendIdDefault {
return r.mgr.Failover(pairId)
} else {
return r.mgr.Failback(pairId)
}
}

func NewReplicaPairMgr(conf *DoradoConfig) (r *ReplicaPairMgr, err error) {
Expand Down Expand Up @@ -349,9 +356,7 @@ func (r *ReplicaPairMgr) DeleteReplication(pairId, rmtLunId string) error {
// 4. Switch the role of replication pairs.
// 5. Enable replications.

func (r *ReplicaPairMgr) Failback(metaData, primaryDriverData, secondaryDriverData map[string]string) error {
pairId := metaData[KPairId]
//rmtLunId := secondaryDriverData[KLunId]
func (r *ReplicaPairMgr) Failback(pairId string) error {
r.localDriver.Enable(pairId, false)
r.localDriver.WaitReplicaReady(pairId)
r.remoteDriver.Failover(pairId)
Expand Down
2 changes: 1 addition & 1 deletion osdsctl/cli/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func init() {
replicationUpdateCommand.Flags().StringVarP(&replicationName, "name", "n", "", "the name of updated replication")
replicationUpdateCommand.Flags().StringVarP(&replicationDesp, "description", "d", "", "the description of updated replication")
replicationFailoverCommand.Flags().BoolVarP(&allowAttachedVolume, "allow_attached_volume", "a", false, "whether allow attached volume when failing over replication")
replicationFailoverCommand.Flags().StringVarP(&secondaryBackendId, "secondary_backend_id", "s", "", "the secondary backend id of failoverr replication")
replicationFailoverCommand.Flags().StringVarP(&secondaryBackendId, "secondary_backend_id", "s", model.ReplicationBackendIdDefault, "the secondary backend id of failoverr replication(default:default)")
replicationCommand.AddCommand(replicationShowCommand)
replicationCommand.AddCommand(replicationListCommand)
replicationCommand.AddCommand(replicationDeleteCommand)
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func DeleteVolumeDBEntry(ctx *c.Context, in *model.VolumeSpec) error {
}

func DeleteReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error {
invalidStatus := []string{model.ReplicationCreating, model.ReplicationDeleting,
model.ReplicationEnabling, model.ReplicationDisabling, model.ReplicationFailingOver}
invalidStatus := []string{model.ReplicationCreating, model.ReplicationDeleting, model.ReplicationEnabling,
model.ReplicationDisabling, model.ReplicationFailingOver, model.ReplicationFailingBack}

if utils.Contained(in.Status, invalidStatus) {
errMsg := fmt.Sprintf("Can't delete the replication in %s", in.Status)
Expand Down Expand Up @@ -262,14 +262,17 @@ func DisableReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error
return nil
}

func FailoverReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec) error {
func FailoverReplicationDBEntry(ctx *c.Context, in *model.ReplicationSpec, secondaryBackendId string) error {
if in.Status != model.ReplicationAvailable {
errMsg := "Only the replication with the status available can be failover"
log.Error(errMsg)
return errors.New(errMsg)
}

in.Status = model.ReplicationFailingOver
if secondaryBackendId == model.ReplicationBackendIdDefault {
in.Status = model.ReplicationFailingOver
} else {
in.Status = model.ReplicationFailingBack
}
_, err := db.C.UpdateReplication(ctx, in.Id, in)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (this *ReplicationPortal) FailoverReplication(ctx *context.Context) {
return
}

if err := FailoverReplicationDBEntry(c.GetContext(ctx), r); err != nil {
if err := FailoverReplicationDBEntry(c.GetContext(ctx), r, failover.SecondaryBackendId); err != nil {
model.HttpError(ctx, http.StatusBadRequest, err.Error())
return
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,14 +623,26 @@ func (c *Controller) FailoverReplication(ctx *c.Context, replication *model.Repl
if err != nil {
return err
}
err = c.drController.FailoverReplication(ctx, replication, pvol, svol)
if err != nil {
replication.Status = model.ReplicationErrorDisabling
replication.ReplicationStatus = "--"

err = c.drController.FailoverReplication(ctx, replication, failover, pvol, svol)
if failover.SecondaryBackendId == model.ReplicationBackendIdDefault {
if err != nil {
replication.Status = model.ReplicationErrorFailover
replication.ReplicationStatus = "--"
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationFailover
}
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationFailover
if err != nil {
replication.Status = model.ReplicationErrorFailback
replication.ReplicationStatus = "--"
} else {
replication.Status = model.ReplicationAvailable
replication.ReplicationStatus = model.ReplicationEnabled
}
}

if _, err := db.C.UpdateReplication(ctx, replication.Id, replication); err != nil {
log.Error("update replication in db error, ", err)
}
Expand Down
188 changes: 97 additions & 91 deletions pkg/controller/dr/drcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Controller interface {
DeleteReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
EnableReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
DisableReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
FailoverReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
FailoverReplication(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error
}

type DrController struct {
Expand Down Expand Up @@ -145,14 +145,15 @@ func (d *DrController) DisableReplication(ctx *c.Context, replica *ReplicationSp
return nil
}

func (d *DrController) FailoverReplication(ctx *c.Context, replica *ReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error {
func (d *DrController) FailoverReplication(ctx *c.Context, replica *ReplicationSpec,
failover *FailoverReplicationSpec, primaryVol, secondaryVol *VolumeSpec) error {
pPool, _ := db.C.GetPool(ctx, primaryVol.PoolId)
d.LoadOperator(pPool.ReplicationType)
err := d.primaryOp.Failover(ctx, replica, primaryVol)
err := d.primaryOp.Failover(ctx, replica, failover, primaryVol)
if err != nil {
return err
}
err = d.secondaryOp.Failover(ctx, replica, secondaryVol)
err = d.secondaryOp.Failover(ctx, replica, failover, secondaryVol)
if err != nil {
return err
}
Expand All @@ -164,7 +165,7 @@ type ReplicationOperator interface {
Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Enable(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Disable(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Failover(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error
Failover(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, vol *VolumeSpec) error
}

type ReplicationFactory interface {
Expand Down Expand Up @@ -203,89 +204,6 @@ func (a *ArrayBasedFactory) GetSecondaryOperator(controller volume.Controller) R
return NewArrayPairOperator(controller, false)
}

func NewArrayPairOperator(controller volume.Controller, isPrimary bool) *ArrayPairOperator {
return &ArrayPairOperator{
BaseOperator{
volumeController: controller,
isPrimary: isPrimary,
},
}
}

type ArrayPairOperator struct {
BaseOperator
}

func (a *ArrayPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) (*ReplicationSpec, error) {
if !a.isPrimary {
// TODO: create replication pair in remote device.
return replica, nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return nil, err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return nil, err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.CreateReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
ReplicationMode: replica.ReplicationModel,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
IsPrimary: a.isPrimary,
}
return a.volumeController.CreateReplication(opt)
}

func (a *ArrayPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
if !a.isPrimary {
return nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.DeleteReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
}
return a.volumeController.DeleteReplication(opt)
}

func NewHostPairOperator(controller volume.Controller, isPrimary bool) *HostPairOperator {
return &HostPairOperator{
BaseOperator{
Expand Down Expand Up @@ -317,7 +235,7 @@ func (h *HostPairOperator) attach(ctx *c.Context, vol *VolumeSpec, provisionerDo
HostInfo: &pb.HostInfo{
Platform: attacherDock.Metadata["Platform"],
OsType: attacherDock.Metadata["OsType"],
Ip: attacherDock.Metadata["Host"],
Ip: attacherDock.Metadata["HostIp"],
Host: attacherDock.NodeId,
Initiator: attacherDock.Metadata["Initiator"],
},
Expand Down Expand Up @@ -366,7 +284,8 @@ func (h *HostPairOperator) attach(ctx *c.Context, vol *VolumeSpec, provisionerDo
AccessProtocol: atm.DriverVolumeType,
ConnectionData: string(connData),
Metadata: map[string]string{},
Context: ctx.ToJson()}
Context: ctx.ToJson(),
}
mountPoint, err := h.volumeController.AttachVolume(attachOpt)
if err != nil {
rollback = true
Expand Down Expand Up @@ -454,6 +373,8 @@ func (h *HostPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol
data := map[string]string{
"Mountpoint": atm.Mountpoint,
"AttachmentId": atm.Id,
"HostName": atm.Host,
"HostIp": atm.Ip,
}

volList, _ := db.C.ListVolumes(ctx)
Expand Down Expand Up @@ -535,6 +456,89 @@ func (h *HostPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol
return h.detach(ctx, attachmentId, vol, provisionerDock)
}

func NewArrayPairOperator(controller volume.Controller, isPrimary bool) *ArrayPairOperator {
return &ArrayPairOperator{
BaseOperator{
volumeController: controller,
isPrimary: isPrimary,
},
}
}

type ArrayPairOperator struct {
BaseOperator
}

func (a *ArrayPairOperator) Create(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) (*ReplicationSpec, error) {
if !a.isPrimary {
// TODO: create replication pair in remote device.
return &ReplicationSpec{}, nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return nil, err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return nil, err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.CreateReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
ReplicationMode: replica.ReplicationModel,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
IsPrimary: a.isPrimary,
}
return a.volumeController.CreateReplication(opt)
}

func (a *ArrayPairOperator) Delete(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
if !a.isPrimary {
return nil
}
pool, err := db.C.GetPool(ctx, vol.PoolId)
if err != nil {
log.Error("get pool failed", err)
return err
}

provisionerDock, err := db.C.GetDockByPoolId(ctx, vol.PoolId)
if err != nil {
log.Error("When search dock in db by pool id", err)
return err
}

a.volumeController.SetDock(provisionerDock)
opt := &pb.DeleteReplicationOpts{
Id: vol.Id,
Name: vol.Name,
Description: vol.Description,
PrimaryVolumeId: replica.PrimaryVolumeId,
SecondaryVolumeId: replica.SecondaryVolumeId,
PrimaryReplicationDriverData: replica.PrimaryReplicationDriverData,
SecondaryReplicationDriverData: replica.SecondaryReplicationDriverData,
PoolName: pool.Name,
DockId: provisionerDock.Id,
DriverName: provisionerDock.DriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
}
return a.volumeController.DeleteReplication(opt)
}

type BaseOperator struct {
volumeController volume.Controller
isPrimary bool
Expand Down Expand Up @@ -608,7 +612,7 @@ func (h *BaseOperator) Disable(ctx *c.Context, replica *ReplicationSpec, vol *Vo
return h.volumeController.DisableReplication(opt)
}

func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, vol *VolumeSpec) error {
func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, failover *FailoverReplicationSpec, vol *VolumeSpec) error {
if !h.isPrimary {
return nil
}
Expand Down Expand Up @@ -637,6 +641,8 @@ func (h *BaseOperator) Failover(ctx *c.Context, replica *ReplicationSpec, vol *V
DriverName: pool.ReplicationDriverName,
Context: ctx.ToJson(),
Metadata: replica.Metadata,
AllowAttachedVolume: failover.AllowAttachedVolume,
SecondaryBackendId: failover.SecondaryBackendId,
}
h.volumeController.SetDock(provisionerDock)
return h.volumeController.FailoverReplication(opt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dock/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (add *attachDockDiscoverer) Discover() error {
Metadata: map[string]string{
"Platform": runtime.GOARCH,
"OsType": runtime.GOOS,
"Ip": bindIp,
"HostIp": bindIp,
"Initiator": localIqn,
},
}
Expand Down