From 51eb88361e3aaf53427baaed460364a73d388140 Mon Sep 17 00:00:00 2001 From: arrebole Date: Tue, 4 Apr 2023 14:53:27 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E5=A2=9E=E5=8A=A0=E5=88=86=E7=89=87?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E4=B8=8B=E8=BD=BD=E5=92=8C=E6=96=AD=E7=BB=AD?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加多线程分片下载文件 - 增加下载文件但是进程退出后,重新下载恢复之前的进度 --- commands.go | 20 +++++++++++--- io.go | 18 ++++++++++-- partial/chunk.go | 31 +++++++++++++++++++++ partial/download.go | 67 +++++++++++++++++++++++++++++++++++++++++++++ partial/partial.go | 18 ++++++++++++ session.go | 43 ++++++++++++++++++++++------- 6 files changed, 180 insertions(+), 17 deletions(-) create mode 100644 partial/chunk.go create mode 100644 partial/download.go create mode 100644 partial/partial.go diff --git a/commands.go b/commands.go index c9906f0..5eaf622 100644 --- a/commands.go +++ b/commands.go @@ -282,15 +282,19 @@ func NewGetCommand() cli.Command { PrintErrorAndExit("get %s: parse mtime: %v", upPath, err) } } + if c.Int("w") > 10 || c.Int("w") < 1 { + PrintErrorAndExit("max concurrent threads must between (1 - 10)") + } if mc.Start != "" || mc.End != "" { session.GetStartBetweenEndFiles(upPath, localPath, mc, c.Int("w")) } else { - session.Get(upPath, localPath, mc, c.Int("w")) + session.Get(upPath, localPath, mc, c.Int("w"), c.Bool("c")) } return nil }, Flags: []cli.Flag{ - cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5}, + cli.IntFlag{Name: "w", Usage: "max concurrent threads (1-10)", Value: 5}, + cli.BoolFlag{Name: "c", Usage: "continue download, Resume Broken Download"}, cli.StringFlag{Name: "mtime", Usage: "file's data was last modified n*24 hours ago, same as linux find command."}, cli.StringFlag{Name: "start", Usage: "file download range starting location"}, cli.StringFlag{Name: "end", Usage: "file download range ending location"}, @@ -315,7 +319,9 @@ func NewPutCommand() cli.Command { if c.NArg() > 1 { upPath = c.Args().Get(1) } - + if c.Int("w") > 10 || c.Int("w") < 1 { + PrintErrorAndExit("max concurrent threads must between (1 - 10)") + } session.Put( localPath, upPath, @@ -332,9 +338,12 @@ func NewPutCommand() cli.Command { func NewUploadCommand() cli.Command { return cli.Command{ Name: "upload", - Usage: "upload multiple directory or file", + Usage: "upload multiple directory or file or http url", Action: func(c *cli.Context) error { InitAndCheck(LOGIN, CHECK, c) + if c.Int("w") > 10 || c.Int("w") < 1 { + PrintErrorAndExit("max concurrent threads must between (1 - 10)") + } session.Upload( c.Args(), c.String("remote"), @@ -422,6 +431,9 @@ func NewSyncCommand() cli.Command { if c.NArg() > 1 { upPath = c.Args().Get(1) } + if c.Int("w") > 10 || c.Int("w") < 1 { + PrintErrorAndExit("max concurrent threads must between (1 - 10)") + } session.Sync(localPath, upPath, c.Int("w"), c.Bool("delete"), c.Bool("strong")) return nil }, diff --git a/io.go b/io.go index 40306af..1f2ffd2 100644 --- a/io.go +++ b/io.go @@ -34,15 +34,27 @@ func (w *WrappedWriter) Close() error { return w.w.Close() } -func NewFileWrappedWriter(localPath string, bar *uiprogress.Bar) (*WrappedWriter, error) { - fd, err := os.Create(localPath) +func NewFileWrappedWriter(localPath string, bar *uiprogress.Bar, resume bool) (*WrappedWriter, error) { + var fd *os.File + var err error + + if resume { + fd, err = os.OpenFile(localPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0755) + } else { + fd, err = os.Create(localPath) + } + if err != nil { + return nil, err + } + + fileinfo, err := fd.Stat() if err != nil { return nil, err } return &WrappedWriter{ w: fd, - Copyed: 0, + Copyed: int(fileinfo.Size()), bar: bar, }, nil } diff --git a/partial/chunk.go b/partial/chunk.go new file mode 100644 index 0000000..e5a3f13 --- /dev/null +++ b/partial/chunk.go @@ -0,0 +1,31 @@ +package partial + +type Chunk struct { + Start int64 + End int64 + Error error + Buffer []byte +} + +func cut(chunks []*Chunk, i int) ([]*Chunk, []*Chunk) { + + if len(chunks) <= i { + return chunks, []*Chunk{} + } + + return chunks[0:i], chunks[i:] +} + +func (p *MultiPartialDownloader) retryDownloadChunk(start, end int64) ([]byte, error) { + var err error + var buffer []byte + + // 重试三次 + for t := 0; t < 3; t++ { + buffer, err = p.DownFunc(p, start, end-1) + if err == nil { + break + } + } + return buffer, nil +} diff --git a/partial/download.go b/partial/download.go new file mode 100644 index 0000000..77342b2 --- /dev/null +++ b/partial/download.go @@ -0,0 +1,67 @@ +package partial + +import ( + "errors" + "os" + "sync" +) + +func (p *MultiPartialDownloader) Download() error { + fileinfo, err := os.Stat(p.FilePath) + if err == nil { + p.LocalSize = fileinfo.Size() + } + + // 计算需要下载的块数 + needDownSize := p.FinalSize - p.LocalSize + chunkCount := needDownSize / ChunkSize + if needDownSize%ChunkSize != 0 { + chunkCount++ + } + + // 下载任务队列 + var queue []*Chunk + for i := 0; i < int(chunkCount); i++ { + start := p.LocalSize + int64(i)*ChunkSize + end := p.LocalSize + int64(i+1)*ChunkSize + if end > p.FinalSize { + end = p.FinalSize + } + queue = append(queue, &Chunk{ + Start: start, + End: end, + }) + } + + for { + var chunks []*Chunk + chunks, queue = cut(queue, p.Works) + if len(chunks) <= 0 { + break + } + + var wg sync.WaitGroup + for _, chunk := range chunks { + wg.Add(1) + go func(c *Chunk) { + defer wg.Done() + buffer, err := p.retryDownloadChunk(c.Start, c.End) + c.Buffer = buffer + c.Error = err + }(chunk) + } + + wg.Wait() + + for _, chunk := range chunks { + if chunk.Error != nil { + return chunk.Error + } + if len(chunk.Buffer) == 0 { + return errors.New("chunk buffer download but size is 0") + } + p.Writer.Write(chunk.Buffer) + } + } + return nil +} diff --git a/partial/partial.go b/partial/partial.go new file mode 100644 index 0000000..1e9bfca --- /dev/null +++ b/partial/partial.go @@ -0,0 +1,18 @@ +package partial + +import ( + "io" +) + +const ChunkSize = 1024 * 1024 * 10 + +type MultiPartialDownloader struct { + FilePath string + FinalSize int64 + LocalSize int64 + Writer io.Writer + Works int + DownFunc ChunkDownFunc +} + +type ChunkDownFunc func(file *MultiPartialDownloader, start, end int64) ([]byte, error) diff --git a/session.go b/session.go index 7fb16a0..3d40434 100644 --- a/session.go +++ b/session.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -20,6 +21,7 @@ import ( "github.com/gosuri/uiprogress" "github.com/jehiah/go-strftime" "github.com/upyun/go-sdk/v3/upyun" + "github.com/upyun/upx/partial" ) const ( @@ -298,7 +300,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker os.MkdirAll(lpath, 0755) } else { for i := 1; i <= MaxRetry; i++ { - id, e = sess.getFileWithProgress(id, fpath, lpath, fInfo) + id, e = sess.getFileWithProgress(id, fpath, lpath, fInfo, 1, false) if e == nil { break } @@ -328,7 +330,7 @@ func (sess *Session) getDir(upPath, localPath string, match *MatchConfig, worker return err } -func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInfo *upyun.FileInfo) (int, error) { +func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInfo *upyun.FileInfo, works int, resume bool) (int, error) { var err error var bar *uiprogress.Bar @@ -361,20 +363,36 @@ func (sess *Session) getFileWithProgress(id int, upPath, localPath string, upInf return id, err } - w, err := NewFileWrappedWriter(localPath, bar) + w, err := NewFileWrappedWriter(localPath, bar, resume) if err != nil { return id, err } defer w.Close() - _, err = sess.updriver.Get(&upyun.GetObjectConfig{ - Path: sess.AbsPath(upPath), - Writer: w, - }) + downloader := partial.MultiPartialDownloader{ + FilePath: localPath, + Writer: w, + Works: works, + FinalSize: upInfo.Size, + DownFunc: func(file *partial.MultiPartialDownloader, start, end int64) ([]byte, error) { + var buffer bytes.Buffer + _, err = sess.updriver.Get(&upyun.GetObjectConfig{ + Path: sess.AbsPath(upPath), + Writer: &buffer, + Headers: map[string]string{ + "Range": fmt.Sprintf("bytes=%d-%d", start, end), + }, + }) + return buffer.Bytes(), err + }, + } + + err = downloader.Download() + return idx, err } -func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int) { +func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers int, resume bool) { upPath = sess.AbsPath(upPath) upInfo, err := sess.updriver.GetInfo(upPath) if err != nil { @@ -406,7 +424,12 @@ func (sess *Session) Get(upPath, localPath string, match *MatchConfig, workers i if isDir { localPath = filepath.Join(localPath, path.Base(upPath)) } - sess.getFileWithProgress(-1, upPath, localPath, upInfo) + + // 小于 100M 不开启多线程 + if upInfo.Size < 1024*1024*100 { + workers = 1 + } + sess.getFileWithProgress(-1, upPath, localPath, upInfo, workers, resume) } } @@ -451,7 +474,7 @@ func (sess *Session) GetStartBetweenEndFiles(upPath, localPath string, match *Ma for fInfo := range fInfoChan { fp := filepath.Join(fpath, fInfo.Name) if (fp >= startList || startList == "") && (fp < endList || endList == "") { - sess.Get(fp, localPath, match, workers) + sess.Get(fp, localPath, match, workers, false) } else if strings.HasPrefix(startList, fp) { //前缀相同进入下一级文件夹,继续递归判断 if fInfo.IsDir {