Skip to content

Commit

Permalink
add multidownload
Browse files Browse the repository at this point in the history
  • Loading branch information
jojoliang committed Apr 5, 2021
1 parent 4cc93a9 commit f54cbf1
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 0 deletions.
57 changes: 57 additions & 0 deletions example/object/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"net/http"
"net/url"
"os"

"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)

func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}

func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: false,
RequestBody: false,
ResponseHeader: false,
ResponseBody: false,
},
},
})

opt := &cos.MultiDownloadOptions{
ThreadPoolSize: 5,
}
resp, err := c.Object.Download(
context.Background(), "test", "./test1G", opt,
)
log_status(err)
fmt.Printf("done, %v\n", resp.Header)
}
23 changes: 23 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,26 @@ func cloneObjectUploadPartOptions(opt *ObjectUploadPartOptions) *ObjectUploadPar
}
return &res
}

type RangeOptions struct {
HasStart bool
HasEnd bool
Start int64
End int64
}

func FormatRangeOptions(opt *RangeOptions) string {
if opt == nil {
return ""
}
if opt.HasStart && opt.HasEnd {
return fmt.Sprintf("bytes=%v-%v", opt.Start, opt.End)
}
if opt.HasStart {
return fmt.Sprintf("bytes=%v-", opt.Start)
}
if opt.HasEnd {
return fmt.Sprintf("bytes=-%v", opt.End)
}
return "bytes=-"
}
193 changes: 193 additions & 0 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ type MultiUploadOptions struct {
EnableVerification bool
}

type MultiDownloadOptions struct {
Opt *ObjectGetOptions
PartSize int64
ThreadPoolSize int
}

type Chunk struct {
Number int
OffSet int64
Expand All @@ -570,6 +576,7 @@ type Jobs struct {
Chunk Chunk
Data io.Reader
Opt *ObjectUploadPartOptions
DownOpt *ObjectGetOptions
}

type Results struct {
Expand Down Expand Up @@ -632,6 +639,48 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
}
}

func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
opt := &RangeOptions{
HasStart: true,
HasEnd: true,
Start: j.Chunk.OffSet,
End: j.Chunk.OffSet + j.Chunk.Size - 1,
}
j.DownOpt.Range = FormatRangeOptions(opt)
rt := j.RetryTimes
for {
var res Results
res.PartNumber = j.Chunk.Number
resp, err := s.Get(context.Background(), j.Name, j.DownOpt)
res.err = err
res.Resp = resp
if err != nil {
rt--
if rt == 0 {
results <- &res
break
}
continue
}
defer resp.Body.Close()
fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660)
if err != nil {
res.err = err
results <- &res
break
}
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
if n != j.Chunk.Size || err != nil {
res.err = fmt.Errorf("io.Copy Failed, read:%v, size:%v, err:%v", n, j.Chunk.Size, err)
}
results <- &res
break
}
}
}

func DividePart(fileSize int64, last int) (int64, int64) {
partSize := int64(last * 1024 * 1024)
partNum := fileSize / partSize
Expand Down Expand Up @@ -953,6 +1002,150 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
return v, resp, err
}

func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = totalBytes / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too manry parts, out of 10000")
}
} else {
partNum, partSize = DividePart(totalBytes, 64)
}

var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}

if totalBytes%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = totalBytes % partSize
chunks = append(chunks, chunk)
partNum++
}

return chunks, int(partNum), nil
}

func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions) (*Response, error) {
// 参数校验
if opt == nil {
opt = &MultiDownloadOptions{}
}
if opt.Opt != nil && opt.Opt.Range != "" {
return nil, fmt.Errorf("does not supported Range Get")
}
// 获取文件长度和CRC
var coscrc string
resp, err := s.Head(ctx, name, nil)
if err != nil {
return resp, err
}
coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
strTotalBytes := resp.Header.Get("Content-Length")
totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
if err != nil {
return resp, err
}

// 切分
chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize)
if err != nil {
return resp, err
}
// 直接下载到文件
if partNum == 0 || partNum == 1 {
rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt)
if err != nil {
return rsp, err
}
if coscrc != "" && s.client.Conf.EnableCRC {
icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
fd, err := os.Open(filepath)
if err != nil {
return rsp, err
}
localcrc, err := calCRC64(fd)
if err != nil {
return rsp, err
}
if localcrc != icoscrc {
return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
}
}
return rsp, err
}
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
}
nfile.Close()
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
poolSize = 1
}
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
for w := 1; w <= poolSize; w++ {
go downloadWorker(s, chjobs, chresults)
}

go func() {
for _, chunk := range chunks {
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
}
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
Chunk: chunk,
DownOpt: &downOpt,
}
chjobs <- job
}
close(chjobs)
}()

err = nil
for i := 0; i < partNum; i++ {
res := <-chresults
if res.Resp == nil || res.err != nil {
err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
continue
}
}
close(chresults)
if err != nil {
return nil, err
}
if coscrc != "" && s.client.Conf.EnableCRC {
icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
fd, err := os.Open(filepath)
if err != nil {
return resp, err
}
localcrc, err := calCRC64(fd)
if err != nil {
return resp, err
}
if localcrc != icoscrc {
return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
}
}
return resp, err
}

type ObjectPutTaggingOptions struct {
XMLName xml.Name `xml:"Tagging"`
TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
Expand Down

0 comments on commit f54cbf1

Please sign in to comment.