Skip to content

Commit

Permalink
Merge pull request #4644 from MichaelEischer/refactor-repair-packs
Browse files Browse the repository at this point in the history
Refactor and test `repair packs`
  • Loading branch information
MichaelEischer committed Jan 27, 2024
2 parents 724ec17 + f0e1ad2 commit 3424088
Show file tree
Hide file tree
Showing 17 changed files with 479 additions and 275 deletions.
30 changes: 3 additions & 27 deletions cmd/restic/cmd_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/spf13/cobra"
Expand All @@ -25,7 +24,6 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/textfile"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/backup"
"github.com/restic/restic/internal/ui/termstatus"
)
Expand Down Expand Up @@ -56,31 +54,9 @@ Exit status is 3 if some source data could not be read (incomplete snapshot crea
},
DisableAutoGenTag: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
var wg sync.WaitGroup
cancelCtx, cancel := context.WithCancel(ctx)
defer func() {
// shutdown termstatus
cancel()
wg.Wait()
}()

term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet)
wg.Add(1)
go func() {
defer wg.Done()
term.Run(cancelCtx)
}()

// use the terminal for stdout/stderr
prevStdout, prevStderr := globalOptions.stdout, globalOptions.stderr
defer func() {
globalOptions.stdout, globalOptions.stderr = prevStdout, prevStderr
}()
stdioWrapper := ui.NewStdioWrapper(term)
globalOptions.stdout, globalOptions.stderr = stdioWrapper.Stdout(), stdioWrapper.Stderr()

return runBackup(ctx, backupOptions, globalOptions, term, args)
term, cancel := setupTermstatus()
defer cancel()
return runBackup(cmd.Context(), backupOptions, globalOptions, term, args)
},
}

Expand Down
32 changes: 16 additions & 16 deletions cmd/restic/cmd_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/progress"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -766,7 +767,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
return errors.Fatalf("%s", err)
}
} else if len(plan.ignorePacks) != 0 {
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil)
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, false)
if err != nil {
return errors.Fatalf("%s", err)
}
Expand All @@ -778,7 +779,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
}

if opts.unsafeRecovery {
_, err = writeIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil)
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, true)
if err != nil {
return errors.Fatalf("%s", err)
}
Expand All @@ -788,23 +789,22 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
return nil
}

func writeIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) (restic.IDSet, error) {
func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool) error {
Verbosef("rebuilding index\n")

bar := newProgressMax(!gopts.Quiet, 0, "packs processed")
obsoleteIndexes, err := repo.Index().Save(ctx, repo, removePacks, extraObsolete, bar)
bar.Done()
return obsoleteIndexes, err
}

func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) error {
obsoleteIndexes, err := writeIndexFiles(ctx, gopts, repo, removePacks, extraObsolete)
if err != nil {
return err
}

Verbosef("deleting obsolete index files\n")
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
SaveProgress: bar,
DeleteProgress: func() *progress.Counter {
return newProgressMax(!gopts.Quiet, 0, "old indexes deleted")
},
DeleteReport: func(id restic.ID, err error) {
if gopts.verbosity > 2 {
Verbosef("removed index %v\n", id.String())
}
},
SkipDeletion: skipDeletion,
})
}

func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (usedBlobs restic.CountedBlobSet, err error) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/restic/cmd_repair_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func rebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOpti
}
}

err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes)
err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes, false)
if err != nil {
return err
}
Expand Down
75 changes: 12 additions & 63 deletions cmd/restic/cmd_repair_packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/termstatus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

var cmdRepairPacks = &cobra.Command{
Expand All @@ -29,15 +29,17 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
`,
DisableAutoGenTag: true,
RunE: func(cmd *cobra.Command, args []string) error {
return runRepairPacks(cmd.Context(), globalOptions, args)
term, cancel := setupTermstatus()
defer cancel()
return runRepairPacks(cmd.Context(), globalOptions, term, args)
},
}

func init() {
cmdRepair.AddCommand(cmdRepairPacks)
}

func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) error {
func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.Terminal, args []string) error {
// FIXME discuss and add proper feature flag mechanism
flag, _ := os.LookupEnv("RESTIC_FEATURES")
if flag != "repair-packs-v1" {
Expand Down Expand Up @@ -68,21 +70,19 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) err
return err
}

return repairPacks(ctx, gopts, repo, ids)
}

func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repository, ids restic.IDSet) error {
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
err := repo.LoadIndex(ctx, bar)
err = repo.LoadIndex(ctx, bar)
if err != nil {
return errors.Fatalf("%s", err)
}

Warnf("saving backup copies of pack files in current folder\n")
printer := newTerminalProgressPrinter(gopts.verbosity, term)

printer.P("saving backup copies of pack files to current folder")
for id := range ids {
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
if err != nil {
return errors.Fatalf("%s", err)
return err
}

err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error {
Expand All @@ -94,66 +94,15 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo
return err
})
if err != nil {
return errors.Fatalf("%s", err)
}
}

wg, wgCtx := errgroup.WithContext(ctx)
repo.StartPackUploader(wgCtx, wg)
repo.DisableAutoIndexUpdate()

Warnf("salvaging intact data from specified pack files\n")
bar = newProgressMax(!gopts.Quiet, uint64(len(ids)), "pack files")
defer bar.Done()

wg.Go(func() error {
// examine all data the indexes have for the pack file
for b := range repo.Index().ListPacks(wgCtx, ids) {
blobs := b.Blobs
if len(blobs) == 0 {
Warnf("no blobs found for pack %v\n", b.PackID)
bar.Add(1)
continue
}

err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
// Fallback path
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if err != nil {
Warnf("failed to load blob %v: %v\n", blob.ID, err)
return nil
}
}
id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true)
if !id.Equal(blob.ID) {
panic("pack id mismatch during upload")
}
return err
})
if err != nil {
return err
}
bar.Add(1)
return err
}
return repo.Flush(wgCtx)
})

if err := wg.Wait(); err != nil {
return errors.Fatalf("%s", err)
}
bar.Done()

// remove salvaged packs from index
err = rebuildIndexFiles(ctx, gopts, repo, ids, nil)
err = repository.RepairPacks(ctx, repo, ids, printer)
if err != nil {
return errors.Fatalf("%s", err)
}

// cleanup
Warnf("removing salvaged pack files\n")
DeleteFiles(ctx, gopts, repo, ids, restic.PackFile)

Warnf("\nUse `restic repair snapshots --forget` to remove the corrupted data blobs from all snapshots\n")
return nil
}
29 changes: 3 additions & 26 deletions cmd/restic/cmd_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"strings"
"sync"
"time"

"github.com/restic/restic/internal/debug"
Expand Down Expand Up @@ -38,31 +37,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
`,
DisableAutoGenTag: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
var wg sync.WaitGroup
cancelCtx, cancel := context.WithCancel(ctx)
defer func() {
// shutdown termstatus
cancel()
wg.Wait()
}()

term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet)
wg.Add(1)
go func() {
defer wg.Done()
term.Run(cancelCtx)
}()

// allow usage of warnf / verbosef
prevStdout, prevStderr := globalOptions.stdout, globalOptions.stderr
defer func() {
globalOptions.stdout, globalOptions.stderr = prevStdout, prevStderr
}()
stdioWrapper := ui.NewStdioWrapper(term)
globalOptions.stdout, globalOptions.stderr = stdioWrapper.Stdout(), stdioWrapper.Stderr()

return runRestore(ctx, restoreOptions, globalOptions, term, args)
term, cancel := setupTermstatus()
defer cancel()
return runRestore(cmd.Context(), restoreOptions, globalOptions, term, args)
},
}

Expand Down
56 changes: 14 additions & 42 deletions cmd/restic/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package main
import (
"context"

"golang.org/x/sync/errgroup"

"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/restic"
)

Expand All @@ -24,46 +21,21 @@ func DeleteFilesChecked(ctx context.Context, gopts GlobalOptions, repo restic.Re
// deleteFiles deletes the given fileList of fileType in parallel
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
func deleteFiles(ctx context.Context, gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
totalCount := len(fileList)
fileChan := make(chan restic.ID)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
defer close(fileChan)
for id := range fileList {
select {
case fileChan <- id:
case <-ctx.Done():
return ctx.Err()
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, 0, "files deleted")
defer bar.Done()

return restic.ParallelRemove(ctx, repo, fileList, fileType, func(id restic.ID, err error) error {
if err != nil {
if !gopts.JSON {
Warnf("unable to remove %v/%v from the repository\n", fileType, id)
}
if !ignoreError {
return err
}
}
if !gopts.JSON && gopts.verbosity > 2 {
Verbosef("removed %v/%v\n", fileType, id)
}
return nil
})

bar := newProgressMax(!gopts.JSON && !gopts.Quiet, uint64(totalCount), "files deleted")
defer bar.Done()
// deleting files is IO-bound
workerCount := repo.Connections()
for i := 0; i < int(workerCount); i++ {
wg.Go(func() error {
for id := range fileChan {
h := backend.Handle{Type: fileType, Name: id.String()}
err := repo.Backend().Remove(ctx, h)
if err != nil {
if !gopts.JSON {
Warnf("unable to remove %v from the repository\n", h)
}
if !ignoreError {
return err
}
}
if !gopts.JSON && gopts.verbosity > 2 {
Verbosef("removed %v\n", h)
}
bar.Add(1)
}
return nil
})
}
err := wg.Wait()
return err
}, bar)
}

0 comments on commit 3424088

Please sign in to comment.