Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload: scan the directory to be uploaded in parallel to estimate the amount of data to be uploaded #622

Merged
merged 1 commit into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 39 additions & 12 deletions cli/cli_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type cliProgress struct {
spinPhase int
uploadStartTime time.Time

previousFileCount int
previousTotalSize int64
estimatedFileCount int
estimatedTotalBytes int64

// indicates shared instance that does not reset counters at the beginning of upload.
shared bool
Expand Down Expand Up @@ -152,13 +152,29 @@ func (p *cliProgress) output(col *color.Color, msg string) {
return
}

if p.previousTotalSize > 0 {
percent := (float64(hashedBytes+cachedBytes) * hundredPercent / float64(p.previousTotalSize))
if percent > hundredPercent {
percent = hundredPercent
if p.estimatedTotalBytes > 0 {
line += fmt.Sprintf(", estimated %v", units.BytesStringBase10(p.estimatedTotalBytes))

ratio := float64(hashedBytes+cachedBytes) / float64(p.estimatedTotalBytes)
if ratio > 1 {
ratio = 1
}

line += fmt.Sprintf(" %.1f%%", percent)
timeSoFarSeconds := clock.Since(p.uploadStartTime).Seconds()
estimatedTotalTime := time.Second * time.Duration(timeSoFarSeconds/ratio)
estimatedEndTime := p.uploadStartTime.Add(estimatedTotalTime)

remaining := clock.Until(estimatedEndTime)
if remaining < 0 {
remaining = 0
}

remaining = remaining.Round(time.Second)

line += fmt.Sprintf(" (%.1f%%)", ratio*hundredPercent)
line += fmt.Sprintf(" %v left", remaining)
} else {
line += ", estimating..."
}

var extraSpaces string
Expand Down Expand Up @@ -197,20 +213,31 @@ func (p *cliProgress) FinishShared() {
p.output(defaultColor, "")
}

func (p *cliProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {
func (p *cliProgress) UploadStarted() {
if p.shared {
// do nothing
return
}

*p = cliProgress{
uploading: 1,
uploadStartTime: clock.Now(),
previousFileCount: previousFileCount,
previousTotalSize: previousTotalSize,
uploading: 1,
uploadStartTime: clock.Now(),
}
}

func (p *cliProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
if p.shared {
// do nothing
return
}

p.outputMutex.Lock()
defer p.outputMutex.Unlock()

p.estimatedFileCount = fileCount
p.estimatedTotalBytes = totalBytes
}

func (p *cliProgress) UploadFinished() {
// do nothing here, we still want to report the files flushed after the Upload has completed.
// instead, Finish() will be called.
Expand Down
8 changes: 5 additions & 3 deletions htmlui/src/SourcesTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,18 @@ export class SourcesTable extends Component {
const totalBytes = u.hashedBytes + u.cachedBytes;

totals = sizeDisplayName(totalBytes);
if (x.row.original.lastSnapshot) {
const percent = Math.round(totalBytes * 1000.0 / x.row.original.lastSnapshot.stats.totalSize) / 10.0;
if (u.estimatedBytes) {
totals += "/" + sizeDisplayName(u.estimatedBytes);

const percent = Math.round(totalBytes * 1000.0 / u.estimatedBytes) / 10.0;
if (percent <= 100) {
totals += " " + percent + "%";
}
}
}

return <>
<Spinner animation="border" variant="primary" size="sm" title={title} />&nbsp;Snapshotting {totals}
<Spinner animation="border" variant="primary" size="sm" title={title} />&nbsp;Uploading {totals}
&nbsp;
<Button variant="danger" size="sm" onClick={() => {
parent.cancelSnapshot(x.row.original.source);
Expand Down
29 changes: 16 additions & 13 deletions snapshot/snapshotfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Uploader struct {

// for testing only, when set will write to a given channel whenever checkpoint completes
checkpointFinished chan struct{}

// disable snapshot size estimation
disableEstimation bool
}

// IsCanceled returns true if the upload is canceled.
Expand Down Expand Up @@ -694,6 +697,7 @@ func (u *Uploader) processNonDirectories(ctx context.Context, parentCheckpointRe
// See if we had this name during either of previous passes.
if cachedEntry := u.maybeIgnoreCachedEntry(ctx, findCachedEntry(ctx, entry, prevEntries)); cachedEntry != nil {
atomic.AddInt32(&u.stats.CachedFiles, 1)
atomic.AddInt64(&u.stats.TotalFileSize, entry.Size())
u.Progress.CachedFile(filepath.Join(dirRelativePath, entry.Name()), entry.Size())

// compute entryResult now, cachedEntry is short-lived
Expand Down Expand Up @@ -943,20 +947,8 @@ func (u *Uploader) Upload(
Source: sourceInfo,
}

maxPreviousTotalFileSize := int64(0)
maxPreviousFileCount := 0

for _, m := range previousManifests {
if s := m.Stats.TotalFileSize; s > maxPreviousTotalFileSize {
maxPreviousTotalFileSize = s
}

if s := int(m.Stats.TotalFileCount); s > maxPreviousFileCount {
maxPreviousFileCount = s
}
}
u.Progress.UploadStarted()

u.Progress.UploadStarted(maxPreviousFileCount, maxPreviousTotalFileSize)
defer u.Progress.UploadFinished()

u.stats = snapshot.Stats{}
Expand All @@ -966,6 +958,9 @@ func (u *Uploader) Upload(

s.StartTime = u.repo.Time()

scanctx, cancelScan := context.WithCancel(ctx)
defer cancelScan()

switch entry := source.(type) {
case fs.Directory:
var previousDirs []fs.Directory
Expand All @@ -979,9 +974,17 @@ func (u *Uploader) Upload(
entry = ignorefs.New(entry, policyTree, ignorefs.ReportIgnoredFiles(func(_ string, md fs.Entry) {
u.stats.AddExcluded(md)
}))

go func() {
ds, _ := u.scanDirectory(scanctx, entry)

u.Progress.EstimatedDataSize(ds.numFiles, ds.totalFileSize)
}()

s.RootEntry, err = u.uploadDirWithCheckpointing(ctx, entry, policyTree, previousDirs, sourceInfo)

case fs.File:
u.Progress.EstimatedDataSize(1, entry.Size())
s.RootEntry, err = u.uploadFileWithCheckpointing(ctx, entry.Name(), entry, policyTree.EffectivePolicy(), sourceInfo)

default:
Expand Down
23 changes: 20 additions & 3 deletions snapshot/snapshotfs/upload_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// UploadProgress is invoked by by uploader to report status of file and directory uploads.
type UploadProgress interface {
// UploadStarted is emitted once at the start of an upload
UploadStarted(previousFileCount int, previousTotalSize int64)
UploadStarted()

// UploadFinished is emitted once at the end of an upload
UploadFinished()
Expand Down Expand Up @@ -36,14 +36,20 @@ type UploadProgress interface {

// FinishedDirectory is emitted whenever a directory is finished uploading.
FinishedDirectory(dirname string)

// EstimatedDataSize is emitted whenever the size of upload is estimated.
EstimatedDataSize(fileCount int, totalBytes int64)
}

// NullUploadProgress is an implementation of UploadProgress that does not produce any output.
type NullUploadProgress struct {
}

// UploadStarted implements UploadProgress.
func (p *NullUploadProgress) UploadStarted(previousFileCount int, previousTotalSize int64) {}
func (p *NullUploadProgress) UploadStarted() {}

// EstimatedDataSize implements UploadProgress.
func (p *NullUploadProgress) EstimatedDataSize(fileCount int, totalBytes int64) {}

// UploadFinished implements UploadProgress.
func (p *NullUploadProgress) UploadFinished() {}
Expand Down Expand Up @@ -79,10 +85,13 @@ type UploadCounters struct {
TotalCachedBytes int64 `json:"cachedBytes"`
TotalHashedBytes int64 `json:"hashedBytes"`

EstimatedBytes int64 `json:"estimatedBytes"`

TotalCachedFiles int32 `json:"cachedFiles"`
TotalHashedFiles int32 `json:"hashedFiles"`

TotalIgnoredErrors int32 `json:"ignoredErrors"`
EstimatedFiles int32 `json:"estimatedFiles"`

CurrentDirectory string `json:"directory"`

Expand All @@ -100,11 +109,17 @@ type CountingUploadProgress struct {
}

// UploadStarted implements UploadProgress.
func (p *CountingUploadProgress) UploadStarted(previousFileCount int, previousTotalFileSize int64) {
func (p *CountingUploadProgress) UploadStarted() {
// reset counters to all-zero values.
p.counters = UploadCounters{}
}

// EstimatedDataSize implements UploadProgress.
func (p *CountingUploadProgress) EstimatedDataSize(numFiles int, numBytes int64) {
atomic.StoreInt64(&p.counters.EstimatedBytes, numBytes)
atomic.StoreInt32(&p.counters.EstimatedFiles, int32(numFiles))
}

// HashedBytes implements UploadProgress.
func (p *CountingUploadProgress) HashedBytes(numBytes int64) {
atomic.AddInt64(&p.counters.TotalHashedBytes, numBytes)
Expand Down Expand Up @@ -149,6 +164,8 @@ func (p *CountingUploadProgress) Snapshot() UploadCounters {
TotalHashedFiles: atomic.LoadInt32(&p.counters.TotalHashedFiles),
TotalCachedBytes: atomic.LoadInt64(&p.counters.TotalCachedBytes),
TotalHashedBytes: atomic.LoadInt64(&p.counters.TotalHashedBytes),
EstimatedBytes: atomic.LoadInt64(&p.counters.EstimatedBytes),
EstimatedFiles: atomic.LoadInt32(&p.counters.EstimatedFiles),
CurrentDirectory: p.counters.CurrentDirectory,
LastErrorPath: p.counters.LastErrorPath,
LastError: p.counters.LastError,
Expand Down
51 changes: 51 additions & 0 deletions snapshot/snapshotfs/upload_scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package snapshotfs

import (
"context"

"github.com/kopia/kopia/fs"
)

type scanResults struct {
numFiles int
totalFileSize int64
}

// scanDirectory computes the number of files and their total size in a given directory recursively descending
// into subdirectories. The scan teminates early as soon as the provided context is canceled.
func (u *Uploader) scanDirectory(ctx context.Context, dir fs.Directory) (scanResults, error) {
var res scanResults

if u.disableEstimation {
return res, nil
}

entries, err := dir.Readdir(ctx)
if err != nil {
return res, err
}

for _, e := range entries {
if err := ctx.Err(); err != nil {
// terminate early if context got canceled
return res, err
}

switch e := e.(type) {
case fs.Directory:
dr, err := u.scanDirectory(ctx, e)
res.numFiles += dr.numFiles
res.totalFileSize += dr.totalFileSize

if err != nil {
return res, err
}

case fs.File:
res.numFiles++
res.totalFileSize += e.Size()
}
}

return res, nil
}
1 change: 1 addition & 0 deletions snapshot/snapshotfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestUploadWithCheckpointing(t *testing.T) {

// create a channel that will be sent to whenever checkpoint completes.
u.checkpointFinished = make(chan struct{})
u.disableEstimation = true

policyTree := policy.BuildTree(nil, policy.DefaultPolicy)

Expand Down