Skip to content

Commit

Permalink
feature: 增加分片并发下载和断续下载
Browse files Browse the repository at this point in the history
- 增加多线程分片下载文件
- 增加下载文件但是进程退出后,重新下载恢复之前的进度
  • Loading branch information
arrebole committed Apr 4, 2023
1 parent 83792f7 commit 2a9fcf6
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 18 deletions.
20 changes: 16 additions & 4 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,19 @@ func NewGetCommand() cli.Command {
PrintErrorAndExit("get %s: parse mtime: %v", upPath, err)
}
}
if c.Int("w") > 10 || c.Int("w") < 1 {
PrintErrorAndExit("max concurrent threads must between (1 - 10)")
}
if mc.Start != "" || mc.End != "" {
session.GetStartBetweenEndFiles(upPath, localPath, mc, c.Int("w"))
} else {
session.Get(upPath, localPath, mc, c.Int("w"))
session.Get(upPath, localPath, mc, c.Int("w"), c.Bool("c"))
}
return nil
},
Flags: []cli.Flag{
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
cli.IntFlag{Name: "w", Usage: "max concurrent threads (1-10)", Value: 5},
cli.BoolFlag{Name: "c", Usage: "continue download, Resume Broken Download"},
cli.StringFlag{Name: "mtime", Usage: "file's data was last modified n*24 hours ago, same as linux find command."},
cli.StringFlag{Name: "start", Usage: "file download range starting location"},
cli.StringFlag{Name: "end", Usage: "file download range ending location"},
Expand All @@ -315,7 +319,9 @@ func NewPutCommand() cli.Command {
if c.NArg() > 1 {
upPath = c.Args().Get(1)
}

if c.Int("w") > 10 || c.Int("w") < 1 {
PrintErrorAndExit("max concurrent threads must between (1 - 10)")
}
session.Put(
localPath,
upPath,
Expand All @@ -332,9 +338,12 @@ func NewPutCommand() cli.Command {
func NewUploadCommand() cli.Command {
return cli.Command{
Name: "upload",
Usage: "upload multiple directory or file",
Usage: "upload multiple directory or file or http url",
Action: func(c *cli.Context) error {
InitAndCheck(LOGIN, CHECK, c)
if c.Int("w") > 10 || c.Int("w") < 1 {
PrintErrorAndExit("max concurrent threads must between (1 - 10)")
}
session.Upload(
c.Args(),
c.String("remote"),
Expand Down Expand Up @@ -422,6 +431,9 @@ func NewSyncCommand() cli.Command {
if c.NArg() > 1 {
upPath = c.Args().Get(1)
}
if c.Int("w") > 10 || c.Int("w") < 1 {
PrintErrorAndExit("max concurrent threads must between (1 - 10)")
}
session.Sync(localPath, upPath, c.Int("w"), c.Bool("delete"), c.Bool("strong"))
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/syndtr/goleveldb v1.0.0
github.com/upyun/go-sdk/v3 v3.0.3
github.com/upyun/go-sdk/v3 v3.0.4
github.com/urfave/cli v1.22.4
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFd
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/upyun/go-sdk/v3 v3.0.3 h1:2wUkNk2fyJReMYHMvJyav050D83rYwSjN7mEPR0Pp8Q=
github.com/upyun/go-sdk/v3 v3.0.3/go.mod h1:P/SnuuwhrIgAVRd/ZpzDWqCsBAf/oHg7UggbAxyZa0E=
github.com/upyun/go-sdk/v3 v3.0.4 h1:2DCJa/Yi7/3ZybT9UCPATSzvU3wpPPxhXinNlb1Hi8Q=
github.com/upyun/go-sdk/v3 v3.0.4/go.mod h1:P/SnuuwhrIgAVRd/ZpzDWqCsBAf/oHg7UggbAxyZa0E=
github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA=
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
18 changes: 15 additions & 3 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@ func (w *WrappedWriter) Close() error {
return w.w.Close()
}

func NewFileWrappedWriter(localPath string, bar *uiprogress.Bar) (*WrappedWriter, error) {
fd, err := os.Create(localPath)
func NewFileWrappedWriter(localPath string, bar *uiprogress.Bar, resume bool) (*WrappedWriter, error) {
var fd *os.File
var err error

if resume {
fd, err = os.OpenFile(localPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0755)
} else {
fd, err = os.Create(localPath)
}
if err != nil {
return nil, err
}

fileinfo, err := fd.Stat()
if err != nil {
return nil, err
}

return &WrappedWriter{
w: fd,
Copyed: 0,
Copyed: int(fileinfo.Size()),
bar: bar,
}, nil
}
Expand Down
30 changes: 30 additions & 0 deletions partial/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package partial

type Chunk struct {
Start int64
End int64
Error error
Buffer []byte
}

func cut(chunks []*Chunk, i int) ([]*Chunk, []*Chunk) {
if len(chunks) <= i {
return chunks, []*Chunk{}
}

return chunks[0:i], chunks[i:]
}

func (p *MultiPartialDownloader) retryDownloadChunk(start, end int64) ([]byte, error) {
var err error
var buffer []byte

// 重试三次
for t := 0; t < 3; t++ {
buffer, err = p.DownFunc(p, start, end-1)
if err == nil {
break
}
}
return buffer, err
}
67 changes: 67 additions & 0 deletions partial/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package partial

import (
"errors"
"os"
"sync"
)

func (p *MultiPartialDownloader) Download() error {
fileinfo, err := os.Stat(p.FilePath)
if err == nil {
p.LocalSize = fileinfo.Size()
}

// 计算需要下载的块数
needDownSize := p.FinalSize - p.LocalSize
chunkCount := needDownSize / ChunkSize
if needDownSize%ChunkSize != 0 {
chunkCount++
}

// 下载任务队列
var queue []*Chunk
for i := 0; i < int(chunkCount); i++ {
start := p.LocalSize + int64(i)*ChunkSize
end := p.LocalSize + int64(i+1)*ChunkSize
if end > p.FinalSize {
end = p.FinalSize
}
queue = append(queue, &Chunk{
Start: start,
End: end,
})
}

for {
var chunks []*Chunk
chunks, queue = cut(queue, p.Works)
if len(chunks) <= 0 {
break
}

var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(c *Chunk) {
defer wg.Done()
buffer, err := p.retryDownloadChunk(c.Start, c.End)
c.Buffer = buffer
c.Error = err
}(chunk)
}

wg.Wait()

for _, chunk := range chunks {
if chunk.Error != nil {
return chunk.Error
}
if len(chunk.Buffer) == 0 {
return errors.New("chunk buffer download but size is 0")
}
p.Writer.Write(chunk.Buffer)
}
}
return nil
}
18 changes: 18 additions & 0 deletions partial/partial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package partial

import (
"io"
)

const ChunkSize = 1024 * 1024 * 10

type MultiPartialDownloader struct {
FilePath string
FinalSize int64
LocalSize int64
Writer io.Writer
Works int
DownFunc ChunkDownFunc
}

type ChunkDownFunc func(file *MultiPartialDownloader, start, end int64) ([]byte, error)
43 changes: 33 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/gosuri/uiprogress"
"github.com/jehiah/go-strftime"
"github.com/upyun/go-sdk/v3/upyun"
"github.com/upyun/upx/partial"
)

const (
Expand Down Expand Up @@ -298,7 +300,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker
os.MkdirAll(lpath, 0755)
} else {
for i := 1; i <= MaxRetry; i++ {
id, e = sess.getFileWithProgress(id, fpath, lpath, fInfo)
id, e = sess.getFileWithProgress(id, fpath, lpath, fInfo, 1, false)
if e == nil {
break
}
Expand Down Expand Up @@ -328,7 +330,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker
return err
}

func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInfo *upyun.FileInfo) (int, error) {
func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInfo *upyun.FileInfo, works int, resume bool) (int, error) {
var err error

var bar *uiprogress.Bar
Expand Down Expand Up @@ -361,20 +363,36 @@ func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInf
return id, err
}

w, err := NewFileWrappedWriter(localPath, bar)
w, err := NewFileWrappedWriter(localPath, bar, resume)
if err != nil {
return id, err
}
defer w.Close()

_, err = sess.updriver.Get(&upyun.GetObjectConfig{
Path: sess.AbsPath(upPath),
Writer: w,
})
downloader := partial.MultiPartialDownloader{
FilePath: localPath,
Writer: w,
Works: works,
FinalSize: upInfo.Size,
DownFunc: func(file *partial.MultiPartialDownloader, start, end int64) ([]byte, error) {
var buffer bytes.Buffer
_, err = sess.updriver.Get(&upyun.GetObjectConfig{
Path: sess.AbsPath(upPath),
Writer: &buffer,
Headers: map[string]string{
"Range": fmt.Sprintf("bytes=%d-%d", start, end),
},
})
return buffer.Bytes(), err
},
}

err = downloader.Download()

return idx, err
}

func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int) {
func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int, resume bool) {
upPath = sess.AbsPath(upPath)
upInfo, err := sess.updriver.GetInfo(upPath)
if err != nil {
Expand Down Expand Up @@ -406,7 +424,12 @@ func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers i
if isDir {
localPath = filepath.Join(localPath, path.Base(upPath))
}
sess.getFileWithProgress(-1, upPath, localPath, upInfo)

// 小于 100M 不开启多线程
if upInfo.Size < 1024*1024*100 {
workers = 1
}
sess.getFileWithProgress(-1, upPath, localPath, upInfo, workers, resume)
}
}

Expand Down Expand Up @@ -451,7 +474,7 @@ func (sess *Session) GetStartBetweenEndFiles(upPath, localPath string, match *Ma
for fInfo := range fInfoChan {
fp := filepath.Join(fpath, fInfo.Name)
if (fp >= startList || startList == "") && (fp < endList || endList == "") {
sess.Get(fp, localPath, match, workers)
sess.Get(fp, localPath, match, workers, false)
} else if strings.HasPrefix(startList, fp) {
//前缀相同进入下一级文件夹,继续递归判断
if fInfo.IsDir {
Expand Down

0 comments on commit 2a9fcf6

Please sign in to comment.