Skip to content

Commit

Permalink
refactored defaultServiceLoop into its own method
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Dixler committed Sep 15, 2022
1 parent 56fe72c commit 9065d7c
Showing 1 changed file with 38 additions and 40 deletions.
78 changes: 38 additions & 40 deletions pkg/backend/snapshot.go
Expand Up @@ -639,6 +639,38 @@ func (sm *SnapshotManager) saveSnapshot() error {
return nil
}

// defaultServiceLoop saves a Snapshot whenever a mutation occurs
func (sm *SnapshotManager) defaultServiceLoop(mutationRequests chan mutationRequest, done chan error) {
// True if we have elided writes since the last actual write.
hasElidedWrites := false

// Service each mutation request in turn.
serviceLoop:
for {
select {
case request := <-mutationRequests:
var err error
if request.mutator() {
err = sm.saveSnapshot()
hasElidedWrites = false
} else {
hasElidedWrites = true
}
request.result <- err
case <-sm.cancel:
break serviceLoop
}
}

// If we still have elided writes once the channel has closed, flush the snapshot.
var err error
if hasElidedWrites {
logging.V(9).Infof("SnapshotManager: flushing elided writes...")
err = sm.saveSnapshot()
}
done <- err
}

// unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when
// SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not
// cause a Snapshot to be serialized to the user's state backend.
Expand Down Expand Up @@ -677,46 +709,12 @@ func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot)
done: done,
}

go func() {
unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != ""
if unsafeEnabled {
// This codepath skips writing back snapshots on all mutations.
// The final snapshot is written back when `SnapshotManager.Close()` is called.
// This trades reliability for speed and can cause state to be lost if Close()
// is not called or successful.
manager.unsafeServiceLoop(mutationRequests, done)
return
}

// True if we have elided writes since the last actual write.
hasElidedWrites := false

// Service each mutation request in turn.
serviceLoop:
for {
select {
case request := <-mutationRequests:
var err error
if request.mutator() {
err = manager.saveSnapshot()
hasElidedWrites = false
} else {
hasElidedWrites = true
}
request.result <- err
case <-cancel:
break serviceLoop
}
}

// If we still have elided writes once the channel has closed, flush the snapshot.
var err error
if hasElidedWrites {
logging.V(9).Infof("SnapshotManager: flushing elided writes...")
err = manager.saveSnapshot()
}
done <- err
}()
serviceLoop := manager.defaultServiceLoop
unsafeEnabled := os.Getenv(experimentalSnapshotManagerFlag) != ""
if unsafeEnabled {
serviceLoop = manager.unsafeServiceLoop
}
go serviceLoop(mutationRequests, done)

return manager
}

0 comments on commit 9065d7c

Please sign in to comment.