Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve archiver, fix hang on fifos #1741

Merged
merged 12 commits into from Apr 30, 2018
8 changes: 6 additions & 2 deletions cmd/restic/global.go
Expand Up @@ -94,7 +94,7 @@ func init() {
f.BoolVarP(&globalOptions.JSON, "json", "", false, "set output mode to JSON for commands that support it")
f.StringVar(&globalOptions.CacheDir, "cache-dir", "", "set the cache directory")
f.BoolVar(&globalOptions.NoCache, "no-cache", false, "do not use a local cache")
f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "path to load root certificates from (default: use system certificates)")
f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "`file` to load root certificates from (default: use system certificates)")
f.StringVar(&globalOptions.TLSClientCert, "tls-client-cert", "", "path to a file containing PEM encoded TLS client certificate and private key")
f.BoolVar(&globalOptions.CleanupCache, "cleanup-cache", false, "auto remove old cache directories")
f.IntVar(&globalOptions.LimitUploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
Expand Down Expand Up @@ -355,7 +355,11 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) {
}

if stdoutIsTerminal() {
Verbosef("password is correct\n")
id := s.Config().ID
if len(id) > 8 {
id = id[:8]
}
Verbosef("repository %v opened successfully, password is correct\n", id)
}

if opts.NoCache {
Expand Down
31 changes: 19 additions & 12 deletions cmd/restic/global_debug.go
@@ -1,4 +1,4 @@
// +build debug
// +build debug profile

package main

Expand All @@ -15,19 +15,21 @@ import (
)

var (
listenMemoryProfile string
memProfilePath string
cpuProfilePath string
traceProfilePath string
insecure bool
listenProfile string
memProfilePath string
cpuProfilePath string
traceProfilePath string
blockProfilePath string
insecure bool
)

func init() {
f := cmdRoot.PersistentFlags()
f.StringVar(&listenMemoryProfile, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&listenProfile, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`")
f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`")
f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`")
f.StringVar(&blockProfilePath, "block-profile", "", "write block profile to `dir`")
f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings")
}

Expand All @@ -38,12 +40,12 @@ func (fakeTestingTB) Logf(msg string, args ...interface{}) {
}

func runDebug() error {
if listenMemoryProfile != "" {
fmt.Fprintf(os.Stderr, "running memory profile HTTP server on %v\n", listenMemoryProfile)
if listenProfile != "" {
fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", listenProfile)
go func() {
err := http.ListenAndServe(listenMemoryProfile, nil)
err := http.ListenAndServe(listenProfile, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "memory profile listen failed: %v\n", err)
fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err)
}
}()
}
Expand All @@ -58,9 +60,12 @@ func runDebug() error {
if traceProfilePath != "" {
profilesEnabled++
}
if blockProfilePath != "" {
profilesEnabled++
}

if profilesEnabled > 1 {
return errors.Fatal("only one profile (memory or CPU) may be activated at the same time")
return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time")
}

var prof interface {
Expand All @@ -73,6 +78,8 @@ func runDebug() error {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath))
} else if traceProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath))
} else if blockProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(blockProfilePath))
}

if prof != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/restic/global_release.go
@@ -1,4 +1,4 @@
// +build !debug
// +build !debug,!profile

package main

Expand Down
112 changes: 59 additions & 53 deletions internal/archiver/archiver.go
Expand Up @@ -50,6 +50,7 @@ type Archiver struct {

blobSaver *BlobSaver
fileSaver *FileSaver
treeSaver *TreeSaver

// Error is called for all errors that occur during backup.
Error ErrorFunc
Expand Down Expand Up @@ -86,6 +87,10 @@ type Options struct {
// concurrently. If it's set to zero, the default is the number of CPUs
// available in the system.
SaveBlobConcurrency uint

// SaveTreeConcurrency sets how many trees are marshalled and saved to the
// repo concurrently.
SaveTreeConcurrency uint
}

// ApplyDefaults returns a copy of o with the default options set for all unset
Expand All @@ -102,6 +107,12 @@ func (o Options) ApplyDefaults() Options {
o.SaveBlobConcurrency = uint(runtime.NumCPU())
}

if o.SaveTreeConcurrency == 0 {
// use a relatively high concurrency here, having multiple SaveTree
// workers is cheap
o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20
}

return o
}

Expand Down Expand Up @@ -172,7 +183,7 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID,
// adds a newline after each object)
buf = append(buf, '\n')

b := Buffer{Data: buf}
b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
if res.Err() != nil {
return restic.ID{}, s, res.Err()
Expand Down Expand Up @@ -212,24 +223,20 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *resti

// SaveDir stores a directory in the repo and returns the node. snPath is the
// path within the current snapshot.
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) {
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (d FutureTree, err error) {
debug.Log("%v %v", snPath, dir)

var s ItemStats

treeNode, err := arch.nodeFromFileInfo(dir, fi)
if err != nil {
return nil, s, err
return FutureTree{}, err
}

names, err := readdirnames(arch.FS, dir)
if err != nil {
return nil, s, err
return FutureTree{}, err
}

var futures []FutureNode

tree := restic.NewTree()
nodes := make([]FutureNode, 0, len(names))

for _, name := range names {
pathname := arch.FS.Join(dir, name)
Expand All @@ -245,54 +252,22 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
continue
}

return nil, s, err
return FutureTree{}, err
}

if excluded {
continue
}

futures = append(futures, fn)
nodes = append(nodes, fn)
}

for _, fn := range futures {
fn.wait()

// return the error if it wasn't ignored
if fn.err != nil {
fn.err = arch.error(fn.target, fn.fi, fn.err)
if fn.err == nil {
// ignore error
continue
}

return nil, s, fn.err
}

// when the error is ignored, the node could not be saved, so ignore it
if fn.node == nil {
debug.Log("%v excluded: %v", fn.snPath, fn.target)
continue
}
ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes)

err := tree.Insert(fn.node)
if err != nil {
return nil, s, err
}
}

id, treeStats, err := arch.saveTree(ctx, tree)
if err != nil {
return nil, ItemStats{}, err
}

s.Add(treeStats)

treeNode.Subtree = &id
return treeNode, s, nil
return ft, nil
}

// FutureNode holds a reference to a node or a FutureFile.
// FutureNode holds a reference to a node, FutureFile, or FutureTree.
type FutureNode struct {
snPath, target string

Expand All @@ -306,14 +281,31 @@ type FutureNode struct {

isFile bool
file FutureFile
isDir bool
dir FutureTree
}

func (fn *FutureNode) wait() {
if fn.isFile {
func (fn *FutureNode) wait(ctx context.Context) {
switch {
case fn.isFile:
// wait for and collect the data for the file
fn.node = fn.file.Node()
fn.err = fn.file.Err()
fn.stats = fn.file.Stats()

// ensure the other stuff can be garbage-collected
fn.file = FutureFile{}
fn.isFile = false

case fn.isDir:
// wait for and collect the data for the dir
fn.node = fn.dir.Node()
fn.err = fn.dir.Err()
fn.stats = fn.dir.Stats()

// ensure the other stuff can be garbage-collected
fn.dir = FutureTree{}
fn.isDir = false
}
}

Expand All @@ -324,6 +316,8 @@ func (fn *FutureNode) wait() {
//
// snPath is the path within the current snapshot.
func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) {
start := time.Now()

fn = FutureNode{
snPath: snPath,
target: target,
Expand All @@ -340,7 +334,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
var fi os.FileInfo
var errFI error

file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0)
file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW|fs.O_NONBLOCK, 0)
if errOpen == nil {
fi, errFI = file.Stat()
}
Expand Down Expand Up @@ -400,7 +394,9 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
snItem := snPath + "/"
start := time.Now()
oldSubtree := arch.loadSubtree(ctx, previous)
fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)

fn.isDir = true
fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
if err == nil {
arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start))
} else {
Expand Down Expand Up @@ -429,6 +425,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
}
}

debug.Log("return after %.3f", time.Since(start).Seconds())

return fn, false, nil
}

Expand Down Expand Up @@ -564,9 +562,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start))
}

debug.Log("waiting on %d nodes", len(futureNodes))

// process all futures
for name, fn := range futureNodes {
fn.wait()
fn.wait(ctx)

// return the error, or ignore it
if fn.err != nil {
Expand Down Expand Up @@ -720,10 +720,16 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
// runWorkers starts the worker pools, which are stopped when the context is cancelled.
func (arch *Archiver) runWorkers(ctx context.Context) {
arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency)
arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob

arch.fileSaver = NewFileSaver(ctx,
arch.FS,
arch.blobSaver,
arch.Repo.Config().ChunkerPolynomial,
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo

arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
}

// Snapshot saves several targets and returns a snapshot.
Expand Down
14 changes: 12 additions & 2 deletions internal/archiver/archiver_test.go
Expand Up @@ -608,7 +608,12 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err)
}

node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil)
ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil)
if err != nil {
t.Fatal(err)
}

node, stats, err := ft.Node(), ft.Stats(), ft.Err()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -681,7 +686,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err)
}

node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil)
ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil)
if err != nil {
t.Fatal(err)
}

node, stats, err := ft.Node(), ft.Stats(), ft.Err()
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/archiver/blob_saver.go
Expand Up @@ -27,7 +27,7 @@ type BlobSaver struct {
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
ch := make(chan saveBlobJob, 2*int(workers))
ch := make(chan saveBlobJob)
s := &BlobSaver{
repo: repo,
knownBlobs: restic.NewBlobSet(),
Expand All @@ -45,7 +45,7 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. The second return parameter is true if the blob was
// previously unknown.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob {
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse, 1)
s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}

Expand Down Expand Up @@ -91,7 +91,7 @@ func (s *FutureBlob) Length() int {

type saveBlobJob struct {
restic.BlobType
buf Buffer
buf *Buffer
ch chan<- saveBlobResponse
}

Expand Down