Skip to content
Permalink
Browse files

Short-circuit when we have failed the sync out

Signed-off-by: Patrick Uiterwijk <patrick@puiterwijk.org>
  • Loading branch information...
puiterwijk authored and mergify committed May 2, 2019
1 parent 227e7c6 commit e4df0439a0f696f71c1f2fee285dc2022d435839
Showing with 13 additions and 1 deletion.
  1. +13 −1 server/service/clustered_storage.go
@@ -60,6 +60,7 @@ type clusterStorageProjectPushDriverInstance struct {
objectSyncAllSubmitted bool
objectSyncNewObjects *sync.Cond
syncersWg *sync.WaitGroup
hasFailed bool

innerPusher storage.ProjectStoragePushDriver
}
@@ -461,6 +462,10 @@ func (d *clusterStorageProjectPushDriverInstance) dbAddObject(objid storage.Obje
}

func (d *clusterStorageProjectPushDriverInstance) dbGetNextObject(nodeid uint64) (storage.ObjectID, error) {
if d.hasFailed {
return storage.ZeroID, errors.New("Object sync has already failed")
}

d.objectSyncMux.Lock()
defer d.objectSyncMux.Unlock()

@@ -535,6 +540,7 @@ func (d *clusterStorageProjectPushDriverInstance) dbReportObject(objid storage.O
} else if outstanding < neededpeers {
// We still need more successes than we will ever get (happens if too many peers fail sync)
d.d.d.cfg.log.Info("Impossible to get to needed number of peers now")
d.hasFailed = true
d.errchan <- errors.Errorf(
"Too many nodes fail. Needed: %d, outstanding: %d",
neededpeers,
@@ -587,7 +593,7 @@ func (d *clusterStorageProjectDriverInstance) GetPusher(pushuuid string) storage

inst := &clusterStorageProjectPushDriverInstance{
d: d,
errchan: make(chan error),
errchan: make(chan error, 5),
outstandingobjects: new(sync.WaitGroup),
pushuuid: pushuuid,

@@ -626,6 +632,9 @@ func (d *clusterStorageProjectDriverInstance) GetPusher(pushuuid string) storage
}

func (d *clusterStorageProjectPushDriverInstance) StageObject(objtype storage.ObjectType, objsize uint) (storage.StagedObject, error) {
if d.hasFailed {
return nil, errors.New("Pusher has already failed prior")
}
// For writing, just write directly to underlying storage
inner, err := d.innerPusher.StageObject(objtype, objsize)
isdelta := false
@@ -669,6 +678,9 @@ func (d *clusterStorageProjectPushDriverInstance) Done() {
}

func (d *clusterStorageProjectPushDriverInstance) startObjectSync(objid storage.ObjectID) {
if d.hasFailed {
return
}
if len(d.objectSyncPeers) != 0 {
d.outstandingobjects.Add(1)
err := d.dbAddObject(objid)

0 comments on commit e4df043

Please sign in to comment.
You can’t perform that action at this time.