diff --git a/src/changes.go b/src/changes.go index dda951f6..f23934a4 100644 --- a/src/changes.go +++ b/src/changes.go @@ -293,10 +293,10 @@ func merge(remotes, locals chan *File) (merged []*dirList) { return } -func reduceToSize(changes []*Change, isPush bool) (totalSize int64) { +func reduceToSize(changes []*Change, fromSrc bool) (totalSize int64) { totalSize = 0 for _, c := range changes { - if isPush { + if fromSrc { if c.Src != nil { totalSize += c.Src.Size } diff --git a/src/commands.go b/src/commands.go index 8a0b164e..8ba6a4a9 100644 --- a/src/commands.go +++ b/src/commands.go @@ -144,9 +144,21 @@ func readCommentedFileCompileRegexp(p string) *regexp.Regexp { return regExComp } -func (g *Commands) taskStart(numOfTasks int) { - if numOfTasks > 0 { - g.progress = pb.StartNew(numOfTasks) +func (g *Commands) taskStart(tasks int64) { + if tasks > 0 { + g.progress = newProgressBar(tasks) + } +} + +func newProgressBar(total int64) *pb.ProgressBar { + pbf := pb.New64(total) + pbf.Start() + return pbf +} + +func (g *Commands) taskAdd(n int64) { + if g.progress != nil { + g.progress.Add64(n) } } diff --git a/src/pull.go b/src/pull.go index 94c43870..4b7577d5 100644 --- a/src/pull.go +++ b/src/pull.go @@ -23,6 +23,8 @@ import ( "runtime" "sort" "sync" + + "github.com/odeke-em/statos" ) const ( @@ -152,7 +154,10 @@ func (g *Commands) PullPiped() (err error) { func (g *Commands) playPullChangeList(cl []*Change, exports []string) (err error) { var next []*Change - g.taskStart(len(cl)) + + pullSize := reduceToSize(cl, true) + + g.taskStart(int64(len(cl)) + pullSize) // TODO: Only provide precedence ordering if all the other options are allowed // Currently noop on sorting by precedence @@ -160,6 +165,12 @@ func (g *Commands) playPullChangeList(cl []*Change, exports []string) (err error sort.Sort(ByPrecedence(cl)) } + go func() { + for n := range g.rem.progressChan { + g.taskAdd(int64(n)) + } + }() + for { if len(cl) > maxNumOfConcPullTasks { next, cl = cl[:maxNumOfConcPullTasks], cl[maxNumOfConcPullTasks:len(cl)] @@ -436,6 +447,16 @@ func (g *Commands) singleDownload(p, id, exportURL string) (err error) { if err != nil { return err } - _, err = io.Copy(fo, blob) + + ws := statos.NewWriter(fo) + go func() { + commChan := ws.ProgressChan() + for n := range commChan { + g.rem.progressChan <- n + } + }() + + _, err = io.Copy(ws, blob) + return } diff --git a/src/push.go b/src/push.go index 1a137c45..ce858fb1 100644 --- a/src/push.go +++ b/src/push.go @@ -208,7 +208,11 @@ func (g *Commands) deserializeIndex(identifier string) *config.Index { } func (g *Commands) playPushChangeList(cl []*Change) (err error) { - g.taskStart(len(cl)) + pushSize := reduceToSize(cl, true) + + g.taskStart(int64(len(cl)) + pushSize) + + defer close(g.rem.progressChan) // TODO: Only provide precedence ordering if all the other options are allowed // Currently noop on sorting by precedence @@ -216,6 +220,12 @@ func (g *Commands) playPushChangeList(cl []*Change) (err error) { sort.Sort(ByPrecedence(cl)) } + go func() { + for n := range g.rem.progressChan { + g.taskAdd(int64(n)) + } + }() + for _, c := range cl { switch c.Op() { case OpMod: @@ -327,7 +337,10 @@ func (g *Commands) remoteUntrash(change *Change) (err error) { } func (g *Commands) remoteDelete(change *Change) (err error) { - defer g.taskDone() + defer func() { + g.taskAdd(change.Dest.Size) + g.taskDone() + }() err = g.rem.Trash(change.Dest.Id) if err != nil { diff --git a/src/remote.go b/src/remote.go index c695a544..176ff83e 100644 --- a/src/remote.go +++ b/src/remote.go @@ -29,6 +29,7 @@ import ( "code.google.com/p/goauth2/oauth" "github.com/odeke-em/drive/config" drive "github.com/odeke-em/google-api-go-client/drive/v2" + "github.com/odeke-em/statos" ) const ( @@ -109,14 +110,20 @@ func mimeTypeFromExt(ext string) string { } type Remote struct { - transport *oauth.Transport - service *drive.Service + transport *oauth.Transport + service *drive.Service + progressChan chan int } func NewRemoteContext(context *config.Context) *Remote { transport := newTransport(context) service, _ := drive.New(transport.Client()) - return &Remote{service: service, transport: transport} + progressChan := make(chan int) + return &Remote{ + progressChan: progressChan, + service: service, + transport: transport, + } } func hasExportLinks(f *File) bool { @@ -511,7 +518,16 @@ func (r *Remote) UpsertByComparison(args *upsertOpt) (f *File, err error) { if err != nil && !args.src.IsDir { return } - return r.upsertByComparison(body, args) + bd := statos.NewReader(body) + + go func() { + commChan := bd.ProgressChan() + for n := range commChan { + r.progressChan <- n + } + }() + + return r.upsertByComparison(bd, args) } func (r *Remote) findShared(p []string) (chan *File, error) { diff --git a/src/trash.go b/src/trash.go index 969a3c60..edabc563 100644 --- a/src/trash.go +++ b/src/trash.go @@ -155,7 +155,8 @@ func (g *Commands) reduce(args []string, toTrash bool) error { } func (g *Commands) playTrashChangeList(cl []*Change, toTrash bool) (err error) { - g.taskStart(len(cl)) + trashSize := reduceToSize(cl, !toTrash) + g.taskStart(int64(len(cl)) + trashSize) var f = g.remoteUntrash if toTrash {