Skip to content

Commit

Permalink
upload: scan the directory to be uploaded in parallel to estimate the…
Browse files Browse the repository at this point in the history
… amount of data to be uploaded

This allows better progress indicator in the CLI and UI.
The percentage completed is not displayed until estimate is available.

Quick demo: https://asciinema.org/a/O7ktcWSgaGUPfJwhzc65mMWM1
  • Loading branch information
jkowalski committed Sep 18, 2020
1 parent f2cf71d commit 340c716
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 31 deletions.
51 changes: 39 additions & 12 deletions cli/cli_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type cliProgress struct {
spinPhase int
uploadStartTime time.Time

previousFileCount int
previousTotalSize int64
estimatedFileCount int
estimatedTotalBytes int64

// indicates shared instance that does not reset counters at the beginning of upload.
shared bool
Expand Down Expand Up @@ -152,13 +152,29 @@ func (p *cliProgress) output(col *color.Color, msg string) {
return
}

if p.previousTotalSize > 0 {
percent := (float64(hashedBytes+cachedBytes) * hundredPercent / float64(p.previousTotalSize))
if percent > hundredPercent {
percent = hundredPercent
if p.estimatedTotalBytes > 0 {
line += fmt.Sprintf(", estimated %v", units.BytesStringBase10(p.estimatedTotalBytes))

ratio := float64(hashedBytes+cachedBytes) / float64(p.estimatedTotalBytes)
if ratio > 1 {
ratio = 1
}

line += fmt.Sprintf(" %.1f%%", percent)
timeSoFarSeconds := clock.Since(p.uploadStartTime).Seconds()
estimatedTotalTime := time.Second * time.Duration(timeSoFarSeconds/ratio)
estimatedEndTime := p.uploadStartTime.Add(estimatedTotalTime)

remaining := clock.Until(estimatedEndTime)
if remaining < 0 {
remaining = 0
}

remaining = remaining.Round(time.Second)

line += fmt.Sprintf(" (%.1f%%)", ratio*hundredPercent)
line += fmt.Sprintf(" %v left", remaining)
} else {
line += ", estimating..."
}

var extraSpaces string
Expand Down Expand Up @@ -197,20 +213,31 @@ func (p *cliProgress) FinishShared() {
p.output(defaultColor, "")
}

func (p *cliProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {
func (p *cliProgress) UploadStarted() {
if p.shared {
// do nothing
return
}

*p = cliProgress{
uploading: 1,
uploadStartTime: clock.Now(),
previousFileCount: previousFileCount,
previousTotalSize: previousTotalSize,
uploading: 1,
uploadStartTime: clock.Now(),
}
}

func (p *cliProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
if p.shared {
// do nothing
return
}

p.outputMutex.Lock()
defer p.outputMutex.Unlock()

p.estimatedFileCount = fileCount
p.estimatedTotalBytes = totalBytes
}

func (p *cliProgress) UploadFinished() {
// do nothing here, we still want to report the files flushed after the Upload has completed.
// instead, Finish() will be called.
Expand Down
8 changes: 5 additions & 3 deletions htmlui/src/SourcesTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,18 @@ export class SourcesTable extends Component {
const totalBytes = u.hashedBytes + u.cachedBytes;

totals = sizeDisplayName(totalBytes);
if (x.row.original.lastSnapshot) {
const percent = Math.round(totalBytes * 1000.0 / x.row.original.lastSnapshot.stats.totalSize) / 10.0;
if (u.estimatedBytes) {
totals += "/" + sizeDisplayName(u.estimatedBytes);

const percent = Math.round(totalBytes * 1000.0 / u.estimatedBytes) / 10.0;
if (percent <= 100) {
totals += " " + percent + "%";
}
}
}

return <>
<Spinner animation="border" variant="primary" size="sm" title={title} />&nbsp;Snapshotting {totals}
<Spinner animation="border" variant="primary" size="sm" title={title} />&nbsp;Uploading {totals}
&nbsp;
<Button variant="danger" size="sm" onClick={() => {
parent.cancelSnapshot(x.row.original.source);
Expand Down
29 changes: 16 additions & 13 deletions snapshot/snapshotfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Uploader struct {

// for testing only, when set will write to a given channel whenever checkpoint completes
checkpointFinished chan struct{}

// disable snapshot size estimation
disableEstimation bool
}

// IsCanceled returns true if the upload is canceled.
Expand Down Expand Up @@ -694,6 +697,7 @@ func (u *Uploader) processNonDirectories(ctx context.Context, parentCheckpointRe
// See if we had this name during either of previous passes.
if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entry, prevEntries)); cachedEntry != nil {
atomic.AddInt32(&u.stats.CachedFiles, 1)
atomic.AddInt64(&u.stats.TotalFileSize, entry.Size())
u.Progress.CachedFile(filepath.Join(dirRelativePath, entry.Name()), entry.Size())

// compute entryResult now, cachedEntry is short-lived
Expand Down Expand Up @@ -943,20 +947,8 @@ func (u *Uploader) Upload(
Source: sourceInfo,
}

maxPreviousTotalFileSize := int64(0)
maxPreviousFileCount := 0

for _, m := range previousManifests {
if s := m.Stats.TotalFileSize; s > maxPreviousTotalFileSize {
maxPreviousTotalFileSize = s
}

if s := int(m.Stats.TotalFileCount); s > maxPreviousFileCount {
maxPreviousFileCount = s
}
}
u.Progress.UploadStarted()

u.Progress.UploadStarted(maxPreviousFileCount, maxPreviousTotalFileSize)
defer u.Progress.UploadFinished()

u.stats = snapshot.Stats{}
Expand All @@ -966,6 +958,9 @@ func (u *Uploader) Upload(

s.StartTime = u.repo.Time()

scanctx, cancelScan := context.WithCancel(ctx)
defer cancelScan()

switch entry := source.(type) {
case fs.Directory:
var previousDirs []fs.Directory
Expand All @@ -979,9 +974,17 @@ func (u *Uploader) Upload(
entry = ignorefs.New(entry, policyTree, ignorefs.ReportIgnoredFiles(func(_ string, md fs.Entry) {
u.stats.AddExcluded(md)
}))

go func() {
ds, _ := u.scanDirectory(scanctx, entry)

u.Progress.EstimatedDataSize(ds.numFiles, ds.totalFileSize)
}()

s.RootEntry, err = u.uploadDirWithCheckpointing(ctx, entry, policyTree, previousDirs, sourceInfo)

case fs.File:
u.Progress.EstimatedDataSize(1, entry.Size())
s.RootEntry, err = u.uploadFileWithCheckpointing(ctx, entry.Name(), entry, policyTree.EffectivePolicy(), sourceInfo)

default:
Expand Down
23 changes: 20 additions & 3 deletions snapshot/snapshotfs/upload_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// UploadProgress is invoked by by uploader to report status of file and directory uploads.
type UploadProgress interface {
// UploadStarted is emitted once at the start of an upload
UploadStarted(previousFileCount int, previousTotalSize int64)
UploadStarted()

// UploadFinished is emitted once at the end of an upload
UploadFinished()
Expand Down Expand Up @@ -36,14 +36,20 @@ type UploadProgress interface {

// FinishedDirectory is emitted whenever a directory is finished uploading.
FinishedDirectory(dirname string)

// EstimatedDataSize is emitted whenever the size of upload is estimated.
EstimatedDataSize(fileCount int, totalBytes int64)
}

// NullUploadProgress is an implementation of UploadProgress that does not produce any output.
type NullUploadProgress struct {
}

// UploadStarted implements UploadProgress.
func (p *NullUploadProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {}
func (p *NullUploadProgress) UploadStarted() {}

// EstimatedDataSize implements UploadProgress.
func (p *NullUploadProgress) EstimatedDataSize(fileCount int, totalBytes int64) {}

// UploadFinished implements UploadProgress.
func (p *NullUploadProgress) UploadFinished() {}
Expand Down Expand Up @@ -79,10 +85,13 @@ type UploadCounters struct {
TotalCachedBytes int64 `json:"cachedBytes"`
TotalHashedBytes int64 `json:"hashedBytes"`

EstimatedBytes int64 `json:"estimatedBytes"`

TotalCachedFiles int32 `json:"cachedFiles"`
TotalHashedFiles int32 `json:"hashedFiles"`

TotalIgnoredErrors int32 `json:"ignoredErrors"`
EstimatedFiles int32 `json:"estimatedFiles"`

CurrentDirectory string `json:"directory"`

Expand All @@ -100,11 +109,17 @@ type CountingUploadProgress struct {
}

// UploadStarted implements UploadProgress.
func (p *CountingUploadProgress) UploadStarted(previousFileCount int, previousTotalFileSize int64) {
func (p *CountingUploadProgress) UploadStarted() {
// reset counters to all-zero values.
p.counters = UploadCounters{}
}

// EstimatedDataSize implements UploadProgress.
func (p *CountingUploadProgress) EstimatedDataSize(numFiles int, numBytes int64) {
atomic.StoreInt64(&p.counters.EstimatedBytes, numBytes)
atomic.StoreInt32(&p.counters.EstimatedFiles, int32(numFiles))
}

// HashedBytes implements UploadProgress.
func (p *CountingUploadProgress) HashedBytes(numBytes int64) {
atomic.AddInt64(&p.counters.TotalHashedBytes, numBytes)
Expand Down Expand Up @@ -149,6 +164,8 @@ func (p *CountingUploadProgress) Snapshot() UploadCounters {
TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles),
TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes),
TotalHashedBytes: atomic.LoadInt64(&p.counters.TotalHashedBytes),
EstimatedBytes: atomic.LoadInt64(&p.counters.EstimatedBytes),
EstimatedFiles: atomic.LoadInt32(&p.counters.EstimatedFiles),
CurrentDirectory: p.counters.CurrentDirectory,
LastErrorPath: p.counters.LastErrorPath,
LastError: p.counters.LastError,
Expand Down
51 changes: 51 additions & 0 deletions snapshot/snapshotfs/upload_scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package snapshotfs

import (
"context"

"github.com/kopia/kopia/fs"
)

type scanResults struct {
numFiles int
totalFileSize int64
}

// scanDirectory computes the number of files and their total size in a given directory recursively descending
// into subdirectories. The scan teminates early as soon as the provided context is canceled.
func (u *Uploader) scanDirectory(ctx context.Context, dir fs.Directory) (scanResults, error) {
var res scanResults

if u.disableEstimation {
return res, nil
}

entries, err := dir.Readdir(ctx)
if err != nil {
return res, err
}

for _, e := range entries {
if err := ctx.Err(); err != nil {
// terminate early if context got canceled
return res, err
}

switch e := e.(type) {
case fs.Directory:
dr, err := u.scanDirectory(ctx, e)
res.numFiles += dr.numFiles
res.totalFileSize += dr.totalFileSize

if err != nil {
return res, err
}

case fs.File:
res.numFiles++
res.totalFileSize += e.Size()
}
}

return res, nil
}
1 change: 1 addition & 0 deletions snapshot/snapshotfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestUploadWithCheckpointing(t *testing.T) {

// create a channel that will be sent to whenever checkpoint completes.
u.checkpointFinished = make(chan struct{})
u.disableEstimation = true

policyTree := policy.BuildTree(nil, policy.DefaultPolicy)

Expand Down

0 comments on commit 340c716

Please sign in to comment.