Skip to content

Commit

Permalink
Fixed checkpointing to not restart the entire upload process (#594)
Browse files Browse the repository at this point in the history
* object: added Checkpoint() method to object writer

* upload: refactored code structure to allow better checkpointing

* upload: removed Checkpoint() method from UploadProgress

* Update fs/entry.go

Co-authored-by: Julio López <julio+gh@kasten.io>
  • Loading branch information
jkowalski and Julio López committed Sep 13, 2020
1 parent 6a14ac8 commit f0b97b9
Show file tree
Hide file tree
Showing 10 changed files with 546 additions and 211 deletions.
18 changes: 0 additions & 18 deletions cli/cli_progress.go
Expand Up @@ -89,24 +89,6 @@ func (p *cliProgress) CachedFile(fname string, numBytes int64) {
p.maybeOutput()
}

func (p *cliProgress) Checkpoint() {
p.output(noticeColor, "Saving a checkpoint...\n")

if p.shared {
// do not reset counters
return
}

*p = cliProgress{
uploading: 1,
uploadStartTime: clock.Now(),
previousFileCount: p.previousFileCount,
previousTotalSize: p.previousTotalSize,
uploadedBytes: p.uploadedBytes,
uploadedFiles: p.uploadedFiles,
}
}

func (p *cliProgress) maybeOutput() {
if atomic.LoadInt32(&p.uploading) == 0 {
return
Expand Down
9 changes: 9 additions & 0 deletions fs/entry.go
Expand Up @@ -89,6 +89,15 @@ type DirectorySummary struct {
FailedEntries []*EntryWithError `json:"errors,omitempty"`
}

// Clone clones given directory summary.
func (s *DirectorySummary) Clone() DirectorySummary {
res := *s

res.FailedEntries = append([]*EntryWithError(nil), s.FailedEntries...)

return res
}

// Symlink represents a symbolic link entry.
type Symlink interface {
Entry
Expand Down
72 changes: 72 additions & 0 deletions repo/object/object_manager_test.go
Expand Up @@ -148,6 +148,78 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
}
}

func TestCheckpointing(t *testing.T) {
ctx := testlogging.Context(t)
_, om := setupTest(t)

writer := om.NewWriter(ctx, WriterOptions{})

// write all zeroes
allZeroes := make([]byte, 1<<20)

// empty file, nothing flushed
checkpoint1, err := writer.Checkpoint()
verifyNoError(t, err)

// write some bytes, but not enough to flush.
writer.Write(allZeroes[0:50])
checkpoint2, err := writer.Checkpoint()
verifyNoError(t, err)

// write enough to flush first content.
writer.Write(allZeroes)
checkpoint3, err := writer.Checkpoint()
verifyNoError(t, err)

// write enough to flush second content.
writer.Write(allZeroes)
checkpoint4, err := writer.Checkpoint()
verifyNoError(t, err)

result, err := writer.Result()
verifyNoError(t, err)

if !objectIDsEqual(checkpoint1, "") {
t.Errorf("unexpected checkpoint1: %v err: %v", checkpoint1, err)
}

if !objectIDsEqual(checkpoint2, "") {
t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err)
}

verifyFull(ctx, t, om, checkpoint3, allZeroes)
verifyFull(ctx, t, om, checkpoint4, make([]byte, 2<<20))
verifyFull(ctx, t, om, result, make([]byte, 2<<20+50))
}

func verifyFull(ctx context.Context, t *testing.T, om *Manager, oid ID, want []byte) {
t.Helper()

r, err := om.Open(ctx, oid)
if err != nil {
t.Fatalf("unable to open %v: %v", oid, err)
}

defer r.Close()

data, err := ioutil.ReadAll(r)
if err != nil {
t.Fatalf("unable to read all: %v", err)
}

if !bytes.Equal(data, want) {
t.Fatalf("unexpected data read for %v", oid)
}
}

func verifyNoError(t *testing.T, err error) {
t.Helper()

if err != nil {
t.Fatal(err)
}
}

func verifyIndirectBlock(ctx context.Context, t *testing.T, r *Manager, oid ID) {
for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() {
if c, _, ok := indexContentID.ContentID(); ok {
Expand Down
36 changes: 34 additions & 2 deletions repo/object/object_writer.go
Expand Up @@ -25,6 +25,12 @@ const indirectContentPrefix = "x"
type Writer interface {
io.WriteCloser

// Checkpoint returns ID of an object consisting of all contents written to storage so far.
// This may not include some data buffered in the writer.
// In case nothing has been written yet, returns empty object ID.
Checkpoint() (ID, error)

// Result returns object ID representing all bytes written to the writer.
Result() (ID, error)
}

Expand Down Expand Up @@ -77,6 +83,8 @@ type objectWriter struct {

splitter splitter.Splitter

writeMutex sync.Mutex

asyncWritesSemaphore chan struct{} // async writes semaphore or nil
asyncWritesWG sync.WaitGroup

Expand All @@ -103,6 +111,9 @@ func (w *objectWriter) Close() error {
}

func (w *objectWriter) Write(data []byte) (n int, err error) {
w.writeMutex.Lock()
defer w.writeMutex.Unlock()

dataLen := len(data)
w.totalLength += int64(dataLen)

Expand Down Expand Up @@ -229,6 +240,17 @@ func maybeCompressedContentBytes(comp compression.Compressor, output *bytes.Buff
return input, false, nil
}

func (w *objectWriter) drainWrites() {
w.writeMutex.Lock()

// wait for any in-flight asynchronous writes to finish
w.asyncWritesWG.Wait()
}

func (w *objectWriter) undrainWrites() {
w.writeMutex.Unlock()
}

func (w *objectWriter) Result() (ID, error) {
// no need to hold a lock on w.indirectIndexGrowMutex, since growing index only happens synchronously
// and never in parallel with calling Result()
Expand All @@ -238,13 +260,23 @@ func (w *objectWriter) Result() (ID, error) {
}
}

// wait for any asynchronous writes to complete.
w.asyncWritesWG.Wait()
return w.Checkpoint()
}

// Checkpoint returns object ID which represents portion of the object that has already been written.
// The result may be an empty object ID if nothing has been flushed yet.
func (w *objectWriter) Checkpoint() (ID, error) {
w.drainWrites()
defer w.undrainWrites()

if w.contentWriteError != nil {
return "", w.contentWriteError
}

if len(w.indirectIndex) == 0 {
return "", nil
}

if len(w.indirectIndex) == 1 {
return w.indirectIndex[0].Object, nil
}
Expand Down
62 changes: 62 additions & 0 deletions snapshot/snapshotfs/checkpoint_registry.go
@@ -0,0 +1,62 @@
package snapshotfs

import (
"sync"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/snapshot"
)

// checkpointFunc is invoked when checkpoint occurs. The callback must checkpoint current state of
// file or directory and return directory entry.
type checkpointFunc func() (*snapshot.DirEntry, error)

type checkpointRegistry struct {
mu sync.Mutex

checkpoints map[string]checkpointFunc
}

func (r *checkpointRegistry) addCheckpointCallback(e fs.Entry, f checkpointFunc) {
r.mu.Lock()
defer r.mu.Unlock()

if r.checkpoints == nil {
r.checkpoints = map[string]checkpointFunc{}
}

r.checkpoints[e.Name()] = f
}

func (r *checkpointRegistry) removeCheckpointCallback(e fs.Entry) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.checkpoints, e.Name())
}

// runCheckpoints invokes all registered checkpointers and adds results to the provided builder, while
// randomizing file names for non-directory entries. this is to prevent the use of checkpointed objects
// as authoritative on subsequent runs.
func (r *checkpointRegistry) runCheckpoints(checkpointBuilder *dirManifestBuilder) error {
r.mu.Lock()
defer r.mu.Unlock()

for n, cp := range r.checkpoints {
de, err := cp()
if err != nil {
return errors.Wrapf(err, "error checkpointing %v", n)
}

if de.Type != snapshot.EntryTypeDirectory {
de.Name = ".checkpointed." + de.Name + "." + uuid.New().String()
}

checkpointBuilder.addEntry(de)
}

return nil
}
82 changes: 82 additions & 0 deletions snapshot/snapshotfs/checkpoint_registry_test.go
@@ -0,0 +1,82 @@
package snapshotfs

import (
"os"
"strings"
"testing"

"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/mockfs"
"github.com/kopia/kopia/snapshot"
)

func TestCheckpointRegistry(t *testing.T) {
var cp checkpointRegistry

d := mockfs.NewDirectory()
dir1 := d.AddDir("dir1", os.FileMode(0o755))
f1 := d.AddFile("f1", []byte{1, 2, 3}, os.FileMode(0o755))
f2 := d.AddFile("f2", []byte{2, 3, 4}, os.FileMode(0o755))
f3 := d.AddFile("f3", []byte{2, 3, 4}, os.FileMode(0o755))

cp.addCheckpointCallback(dir1, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "dir1",
Type: snapshot.EntryTypeDirectory,
}, nil
})

cp.addCheckpointCallback(f1, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "f1",
}, nil
})

cp.addCheckpointCallback(f2, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "f2",
}, nil
})

cp.addCheckpointCallback(f3, func() (*snapshot.DirEntry, error) {
return &snapshot.DirEntry{
Name: "other",
}, nil
})

// remove callback before it has a chance of firing
cp.removeCheckpointCallback(f3)
cp.removeCheckpointCallback(f3)

var dmb dirManifestBuilder

dmb.addEntry(&snapshot.DirEntry{
Name: "pre-existing",
})

if err := cp.runCheckpoints(&dmb); err != nil {
t.Fatalf("error running checkpoints: %v", err)
}

dm := dmb.Build(clock.Now(), "checkpoint")
if got, want := len(dm.Entries), 4; got != want {
t.Fatalf("got %v entries, wanted %v (%+#v)", got, want, dm.Entries)
}

// directory names don't get mangled
if dm.Entries[0].Name != "dir1" {
t.Errorf("invalid entry %v", dm.Entries[0])
}

if !strings.HasPrefix(dm.Entries[1].Name, ".checkpointed.f1.") {
t.Errorf("invalid entry %v", dm.Entries[1])
}

if !strings.HasPrefix(dm.Entries[2].Name, ".checkpointed.f2.") {
t.Errorf("invalid entry %v", dm.Entries[2])
}

if dm.Entries[3].Name != "pre-existing" {
t.Errorf("invalid entry %v", dm.Entries[3])
}
}

0 comments on commit f0b97b9

Please sign in to comment.