Skip to content

Commit

Permalink
Merge pull request #4697 from MichaelEischer/report-blob-errors
Browse files Browse the repository at this point in the history
backup: report files whose chunks failed to upload
  • Loading branch information
MichaelEischer committed Feb 12, 2024
2 parents dde556e + 5b5d506 commit 19bf2cf
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 15 deletions.
8 changes: 5 additions & 3 deletions internal/archiver/blob_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package archiver

import (
"context"
"fmt"

"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
Expand Down Expand Up @@ -43,9 +44,9 @@ func (s *BlobSaver) TriggerShutdown() {

// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, filename string, cb func(res SaveBlobResponse)) {
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}:
case s.ch <- saveBlobJob{BlobType: t, buf: buf, fn: filename, cb: cb}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
}
Expand All @@ -54,6 +55,7 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb
type saveBlobJob struct {
restic.BlobType
buf *Buffer
fn string
cb func(res SaveBlobResponse)
}

Expand Down Expand Up @@ -95,7 +97,7 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error, exiting: %v", err)
return err
return fmt.Errorf("failed to save blob from file %q: %w", job.fn, err)
}
job.cb(res)
job.buf.Release()
Expand Down
11 changes: 6 additions & 5 deletions internal/archiver/blob_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -57,7 +59,7 @@ func TestBlobSaver(t *testing.T) {
lock.Lock()
results = append(results, SaveBlobResponse{})
lock.Unlock()
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {
b.Save(ctx, restic.DataBlob, buf, "file", func(res SaveBlobResponse) {
lock.Lock()
results[idx] = res
lock.Unlock()
Expand Down Expand Up @@ -106,7 +108,7 @@ func TestBlobSaverError(t *testing.T) {

for i := 0; i < test.blobs; i++ {
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {})
b.Save(ctx, restic.DataBlob, buf, "errfile", func(res SaveBlobResponse) {})
}

b.TriggerShutdown()
Expand All @@ -116,9 +118,8 @@ func TestBlobSaverError(t *testing.T) {
t.Errorf("expected error not found")
}

if err != errTest {
t.Fatalf("unexpected error found: %v", err)
}
rtest.Assert(t, errors.Is(err, errTest), "unexpected error %v", err)
rtest.Assert(t, strings.Contains(err.Error(), "errfile"), "expected error to contain 'errfile' got: %v", err)
})
}
}
4 changes: 2 additions & 2 deletions internal/archiver/file_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse))
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, string, func(res SaveBlobResponse))

// FileSaver concurrently saves incoming files to the repo.
type FileSaver struct {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
node.Content = append(node.Content, restic.ID{})
lock.Unlock()

s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) {
s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr SaveBlobResponse) {
lock.Lock()
if !sbr.known {
fnr.stats.DataBlobs++
Expand Down
2 changes: 1 addition & 1 deletion internal/archiver/file_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func createTestFiles(t testing.TB, num int) (files []string) {
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) {
wg, ctx := errgroup.WithContext(ctx)

saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) {
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, _ string, cb func(SaveBlobResponse)) {
cb(SaveBlobResponse{
id: restic.Hash(buf.Data),
length: len(buf.Data),
Expand Down
6 changes: 3 additions & 3 deletions internal/archiver/tree_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (

// TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct {
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse))
saveBlob SaveBlobFn
errFn ErrorFunc

ch chan<- saveTreeJob
}

// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver {
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob SaveBlobFn, errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob)

s := &TreeSaver{
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I

b := &Buffer{Data: buf}
ch := make(chan SaveBlobResponse, 1)
s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) {
s.saveBlob(ctx, restic.TreeBlob, b, job.target, func(res SaveBlobResponse) {
ch <- res
})

Expand Down
2 changes: 1 addition & 1 deletion internal/archiver/tree_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"golang.org/x/sync/errgroup"
)

func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *Buffer, _ string, cb func(res SaveBlobResponse)) {
cb(SaveBlobResponse{
id: restic.NewRandomID(),
known: false,
Expand Down

0 comments on commit 19bf2cf

Please sign in to comment.