Skip to content

Commit

Permalink
progress: byte by byte progress info.
Browse files Browse the repository at this point in the history
This fixes the old style of progress reporting that
would only update once a job was completed. Previously
could not work for large jobs.
  • Loading branch information
Emmanuel Odeke committed Apr 11, 2015
1 parent 7dbd8bf commit b2659cf
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/changes.go
Expand Up @@ -293,10 +293,10 @@ func merge(remotes, locals chan *File) (merged []*dirList) {
return
}

func reduceToSize(changes []*Change, isPush bool) (totalSize int64) {
func reduceToSize(changes []*Change, fromSrc bool) (totalSize int64) {
totalSize = 0
for _, c := range changes {
if isPush {
if fromSrc {
if c.Src != nil {
totalSize += c.Src.Size
}
Expand Down
18 changes: 15 additions & 3 deletions src/commands.go
Expand Up @@ -144,9 +144,21 @@ func readCommentedFileCompileRegexp(p string) *regexp.Regexp {
return regExComp
}

func (g *Commands) taskStart(numOfTasks int) {
if numOfTasks > 0 {
g.progress = pb.StartNew(numOfTasks)
func (g *Commands) taskStart(tasks int64) {
if tasks > 0 {
g.progress = newProgressBar(tasks)
}
}

func newProgressBar(total int64) *pb.ProgressBar {
pbf := pb.New64(total)
pbf.Start()
return pbf
}

func (g *Commands) taskAdd(n int64) {
if g.progress != nil {
g.progress.Add64(n)
}
}

Expand Down
25 changes: 23 additions & 2 deletions src/pull.go
Expand Up @@ -23,6 +23,8 @@ import (
"runtime"
"sort"
"sync"

"github.com/odeke-em/statos"
)

const (
Expand Down Expand Up @@ -152,14 +154,23 @@ func (g *Commands) PullPiped() (err error) {

func (g *Commands) playPullChangeList(cl []*Change, exports []string) (err error) {
var next []*Change
g.taskStart(len(cl))

pullSize := reduceToSize(cl, true)

g.taskStart(int64(len(cl)) + pullSize)

// TODO: Only provide precedence ordering if all the other options are allowed
// Currently noop on sorting by precedence
if false && !g.opts.NoClobber {
sort.Sort(ByPrecedence(cl))
}

go func() {
for n := range g.rem.progressChan {
g.taskAdd(int64(n))
}
}()

for {
if len(cl) > maxNumOfConcPullTasks {
next, cl = cl[:maxNumOfConcPullTasks], cl[maxNumOfConcPullTasks:len(cl)]
Expand Down Expand Up @@ -436,6 +447,16 @@ func (g *Commands) singleDownload(p, id, exportURL string) (err error) {
if err != nil {
return err
}
_, err = io.Copy(fo, blob)

ws := statos.NewWriter(fo)
go func() {
commChan := ws.ProgressChan()
for n := range commChan {
g.rem.progressChan <- n
}
}()

_, err = io.Copy(ws, blob)

return
}
17 changes: 15 additions & 2 deletions src/push.go
Expand Up @@ -208,14 +208,24 @@ func (g *Commands) deserializeIndex(identifier string) *config.Index {
}

func (g *Commands) playPushChangeList(cl []*Change) (err error) {
g.taskStart(len(cl))
pushSize := reduceToSize(cl, true)

g.taskStart(int64(len(cl)) + pushSize)

defer close(g.rem.progressChan)

// TODO: Only provide precedence ordering if all the other options are allowed
// Currently noop on sorting by precedence
if false && !g.opts.NoClobber {
sort.Sort(ByPrecedence(cl))
}

go func() {
for n := range g.rem.progressChan {
g.taskAdd(int64(n))
}
}()

for _, c := range cl {
switch c.Op() {
case OpMod:
Expand Down Expand Up @@ -327,7 +337,10 @@ func (g *Commands) remoteUntrash(change *Change) (err error) {
}

func (g *Commands) remoteDelete(change *Change) (err error) {
defer g.taskDone()
defer func() {
g.taskAdd(change.Dest.Size)
g.taskDone()
}()

err = g.rem.Trash(change.Dest.Id)
if err != nil {
Expand Down
24 changes: 20 additions & 4 deletions src/remote.go
Expand Up @@ -29,6 +29,7 @@ import (
"code.google.com/p/goauth2/oauth"
"github.com/odeke-em/drive/config"
drive "github.com/odeke-em/google-api-go-client/drive/v2"
"github.com/odeke-em/statos"
)

const (
Expand Down Expand Up @@ -109,14 +110,20 @@ func mimeTypeFromExt(ext string) string {
}

type Remote struct {
transport *oauth.Transport
service *drive.Service
transport *oauth.Transport
service *drive.Service
progressChan chan int
}

func NewRemoteContext(context *config.Context) *Remote {
transport := newTransport(context)
service, _ := drive.New(transport.Client())
return &Remote{service: service, transport: transport}
progressChan := make(chan int)
return &Remote{
progressChan: progressChan,
service: service,
transport: transport,
}
}

func hasExportLinks(f *File) bool {
Expand Down Expand Up @@ -511,7 +518,16 @@ func (r *Remote) UpsertByComparison(args *upsertOpt) (f *File, err error) {
if err != nil && !args.src.IsDir {
return
}
return r.upsertByComparison(body, args)
bd := statos.NewReader(body)

go func() {
commChan := bd.ProgressChan()
for n := range commChan {
r.progressChan <- n
}
}()

return r.upsertByComparison(bd, args)
}

func (r *Remote) findShared(p []string) (chan *File, error) {
Expand Down
3 changes: 2 additions & 1 deletion src/trash.go
Expand Up @@ -155,7 +155,8 @@ func (g *Commands) reduce(args []string, toTrash bool) error {
}

func (g *Commands) playTrashChangeList(cl []*Change, toTrash bool) (err error) {
g.taskStart(len(cl))
trashSize := reduceToSize(cl, !toTrash)
g.taskStart(int64(len(cl)) + trashSize)

var f = g.remoteUntrash
if toTrash {
Expand Down

0 comments on commit b2659cf

Please sign in to comment.