diff --git a/pkg/backend/snapshot.go b/pkg/backend/snapshot.go index b6fc072258bc..1d2d2b8f9994 100644 --- a/pkg/backend/snapshot.go +++ b/pkg/backend/snapshot.go @@ -639,6 +639,38 @@ func (sm *SnapshotManager) saveSnapshot() error { return nil } +// defaultServiceLoop saves a Snapshot whenever a mutation occurs +func (sm *SnapshotManager) defaultServiceLoop(mutationRequests chan mutationRequest, done chan error) { + // True if we have elided writes since the last actual write. + hasElidedWrites := false + + // Service each mutation request in turn. +serviceLoop: + for { + select { + case request := <-mutationRequests: + var err error + if request.mutator() { + err = sm.saveSnapshot() + hasElidedWrites = false + } else { + hasElidedWrites = true + } + request.result <- err + case <-sm.cancel: + break serviceLoop + } + } + + // If we still have elided writes once the channel has closed, flush the snapshot. + var err error + if hasElidedWrites { + logging.V(9).Infof("SnapshotManager: flushing elided writes...") + err = sm.saveSnapshot() + } + done <- err +} + // unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when // SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not // cause a Snapshot to be serialized to the user's state backend. @@ -677,46 +709,12 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) done: done, } - go func() { - unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" - if unsafeEnabled { - // This codepath skips writing back snapshots on all mutations. - // The final snapshot is written back when `SnapshotManager.Close()` is called. - // This trades reliability for speed and can cause state to be lost if Close() - // is not called or successful. - manager.unsafeServiceLoop(mutationRequests, done) - return - } - - // True if we have elided writes since the last actual write. - hasElidedWrites := false - - // Service each mutation request in turn. - serviceLoop: - for { - select { - case request := <-mutationRequests: - var err error - if request.mutator() { - err = manager.saveSnapshot() - hasElidedWrites = false - } else { - hasElidedWrites = true - } - request.result <- err - case <-cancel: - break serviceLoop - } - } - - // If we still have elided writes once the channel has closed, flush the snapshot. - var err error - if hasElidedWrites { - logging.V(9).Infof("SnapshotManager: flushing elided writes...") - err = manager.saveSnapshot() - } - done <- err - }() + serviceLoop := manager.defaultServiceLoop + unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != "" + if unsafeEnabled { + serviceLoop = manager.unsafeServiceLoop + } + go serviceLoop(mutationRequests, done) return manager }