Skip to content

Commit

Permalink
support resume upload
Browse files Browse the repository at this point in the history
  • Loading branch information
huangnauh committed Nov 4, 2022
1 parent 41d4cd7 commit 4d74d4f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/syndtr/goleveldb v1.0.0
github.com/upyun/go-sdk/v3 v3.0.2
github.com/upyun/go-sdk/v3 v3.0.3
github.com/urfave/cli v1.22.4
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5I
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/upyun/go-sdk/v3 v3.0.2 h1:Ke+iOipK5CT0xzMwsgJsi7faJV7ID4lAs+wrH1RH0dA=
github.com/upyun/go-sdk/v3 v3.0.2/go.mod h1:P/SnuuwhrIgAVRd/ZpzDWqCsBAf/oHg7UggbAxyZa0E=
github.com/upyun/go-sdk/v3 v3.0.3 h1:2wUkNk2fyJReMYHMvJyav050D83rYwSjN7mEPR0Pp8Q=
github.com/upyun/go-sdk/v3 v3.0.3/go.mod h1:P/SnuuwhrIgAVRd/ZpzDWqCsBAf/oHg7UggbAxyZa0E=
github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA=
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
86 changes: 53 additions & 33 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"os/signal"
Expand All @@ -26,6 +27,10 @@ const (
SYNC_NOT_FOUND
DELETE_OK
DELETE_FAIL

MinResumePutFileSize = 100 * 1024 * 1024
DefaultBlockSize = 10 * 1024 * 1024
DefaultResumeRetry = 10
)

type Session struct {
Expand Down Expand Up @@ -453,49 +458,64 @@ func (sess *Session) GetStartBetweenEndFiles(upPath, localPath string, match *Ma

func (sess *Session) putFileWithProgress(barId int, localPath, upPath string, localInfo os.FileInfo) (int, error) {
var err error
bar, idx := AddBar(barId, int(localInfo.Size()))
bar = bar.AppendCompleted()
bar.PrependFunc(func(b *uiprogress.Bar) string {
status := "WAIT"
if b.Current() == b.Total {
status = "OK"
}
name := leftAlign(shortPath(upPath, 40), 40)
if err != nil {
b.Set(bar.Total)
return fmt.Sprintf("%s ERR %s", name, err)
}
return fmt.Sprintf("%s %s", name, rightAlign(status, 4))
})

fd, err := os.Open(localPath)
if err != nil {
return idx, err
return -1, err
}
defer fd.Close()

var wg sync.WaitGroup
wReader := &ProgressReader{fd: fd}
wg.Add(1)
go func() {
defer wg.Done()
for err == nil {
if wReader.Copyed() == bar.Total {
bar.Set(wReader.Copyed())
return
}
bar.Set(wReader.Copyed())
}
}()

err = sess.updriver.Put(&upyun.PutObjectConfig{
cfg := &upyun.PutObjectConfig{
Path: upPath,
Headers: map[string]string{
"Content-Length": fmt.Sprint(localInfo.Size()),
},
Reader: wReader,
})
Reader: fd,
}

idx := -1
if isVerbose {
var bar *uiprogress.Bar
bar, idx = AddBar(barId, int(localInfo.Size()))
bar = bar.AppendCompleted()
bar.PrependFunc(func(b *uiprogress.Bar) string {
status := "WAIT"
if b.Current() == b.Total {
status = "OK"
}
name := leftAlign(shortPath(upPath, 40), 40)
if err != nil {
b.Set(bar.Total)
return fmt.Sprintf("%s ERR %s", name, err)
}
return fmt.Sprintf("%s %s", name, rightAlign(status, 4))
})
wReader := &ProgressReader{fd: fd}
cfg.Reader = wReader
wg.Add(1)
go func() {
defer wg.Done()
for err == nil {
if wReader.Copyed() == bar.Total {
bar.Set(wReader.Copyed())
return
}
bar.Set(wReader.Copyed())
}
}()
} else {
log.Printf("file: %s, Start\n", upPath)
if localInfo.Size() >= MinResumePutFileSize {
cfg.UseResumeUpload = true
cfg.ResumePartSize = DefaultBlockSize
cfg.MaxResumePutTries = DefaultResumeRetry
}
}

err = sess.updriver.Put(cfg)
wg.Wait()
if !isVerbose {
log.Printf("file: %s, Done\n", upPath)
}
return idx, err
}

Expand Down

0 comments on commit 4d74d4f

Please sign in to comment.