Skip to content

Commit

Permalink
update multidownload
Browse files Browse the repository at this point in the history
  • Loading branch information
jojoliang committed Apr 7, 2021
1 parent f54cbf1 commit 2a64ad4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
39 changes: 35 additions & 4 deletions costesting/ci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *CosTestSuite) SetupSuite() {
XCosACL: "public-read",
}
r, err := s.Client.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
Expand Down Expand Up @@ -142,7 +142,7 @@ func (s *CosTestSuite) TestPutHeadDeleteBucket() {
},
})
r, err := client.Bucket.Put(context.Background(), nil)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
Expand Down Expand Up @@ -507,6 +507,37 @@ func (s *CosTestSuite) TestPutGetDeleteObjectByUpload_10MB() {
assert.Nil(s.T(), err, "remove local file Failed")
}

func (s *CosTestSuite) TestPutGetDeleteObjectByUploadAndDownload_10MB() {
// Create tmp file
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
assert.Nil(s.T(), err, "create tmp file Failed")
defer newfile.Close()

name := "test/objectUpload" + time.Now().Format(time.RFC3339)
b := make([]byte, 1024*1024*10)
_, err = rand.Read(b)

newfile.Write(b)
opt := &cos.MultiUploadOptions{
PartSize: 1,
ThreadPoolSize: 3,
}
_, _, err = s.Client.Object.Upload(context.Background(), name, filePath, opt)
assert.Nil(s.T(), err, "PutObject Failed")

// Over write tmp file
_, err = s.Client.Object.Download(context.Background(), name, filePath, nil)
assert.Nil(s.T(), err, "DownloadObject Failed")

_, err = s.Client.Object.Delete(context.Background(), name)
assert.Nil(s.T(), err, "DeleteObject Failed")

// remove the local tmp file
err = os.Remove(filePath)
assert.Nil(s.T(), err, "remove local file Failed")
}

func (s *CosTestSuite) TestPutGetDeleteObjectSpecialName() {
f := strings.NewReader("test")
name := s.SepFileName + time.Now().Format(time.RFC3339)
Expand Down Expand Up @@ -606,7 +637,7 @@ func (s *CosTestSuite) TestCopyObject() {

// Notice in intranet the bucket host sometimes has i/o timeout problem
r, err := c.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
Expand Down Expand Up @@ -971,7 +1002,7 @@ func (s *CosTestSuite) TestMultiCopy() {

// Notice in intranet the bucket host sometimes has i/o timeout problem
r, err := c.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
Expand Down
11 changes: 9 additions & 2 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,9 @@ func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
if n != j.Chunk.Size || err != nil {
res.err = fmt.Errorf("io.Copy Failed, read:%v, size:%v, err:%v", n, j.Chunk.Size, err)
res.err = fmt.Errorf("io.Copy Failed, nread:%v, want:%v, err:%v", n, j.Chunk.Size, err)
}
fd.Close()
results <- &res
break
}
Expand Down Expand Up @@ -1040,14 +1041,15 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
opt = &MultiDownloadOptions{}
}
if opt.Opt != nil && opt.Opt.Range != "" {
return nil, fmt.Errorf("does not supported Range Get")
return nil, fmt.Errorf("Download doesn't support Range Options")
}
// 获取文件长度和CRC
var coscrc string
resp, err := s.Head(ctx, name, nil)
if err != nil {
return resp, err
}
// 如果对象不存在x-cos-hash-crc64ecma,则跳过不做校验
coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
strTotalBytes := resp.Header.Get("Content-Length")
totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
Expand All @@ -1072,6 +1074,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if err != nil {
return rsp, err
}
defer fd.Close()
localcrc, err := calCRC64(fd)
if err != nil {
return rsp, err
Expand All @@ -1082,11 +1085,13 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
}
return rsp, err
}
// 创建文件
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
}
nfile.Close()

var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
Expand All @@ -1104,6 +1109,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
downOpt.Listener = nil // listener need to set nil
}
job := &Jobs{
Name: name,
Expand Down Expand Up @@ -1135,6 +1141,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if err != nil {
return resp, err
}
defer fd.Close()
localcrc, err := calCRC64(fd)
if err != nil {
return resp, err
Expand Down
55 changes: 26 additions & 29 deletions object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"hash/crc64"
"io"
"io/ioutil"
math_rand "math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -528,19 +529,19 @@ func TestObjectService_Download(t *testing.T) {
}
defer os.Remove(filePath)
// 源文件内容
totalBytes := int64(1024 * 1024 * 10)
totalBytes := int64(1024*1024*9 + 123)
b := make([]byte, totalBytes)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
tb := crc64.MakeTable(crc64.ECMA)
localcrc := crc64.Update(0, tb, b)
localcrc := strconv.FormatUint(crc64.Update(0, tb, b), 10)

retryMap := make(map[int64]int)
mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10))
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
w.Header().Add("x-cos-hash-crc64ecma", localcrc)
return
}
strRange := r.Header.Get("Range")
Expand All @@ -549,36 +550,21 @@ func TestObjectService_Download(t *testing.T) {
start, _ := strconv.ParseInt(slice2[0], 10, 64)
end, _ := strconv.ParseInt(slice2[1], 10, 64)
if retryMap[start] == 0 {
// 重试校验1
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
} else if retryMap[start] == 1 {
// 重试检验2
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
return
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, (end-start)/2))
if err != nil || int64(n) != (end-start)/2 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}

io.Copy(w, bytes.NewBuffer(b[start:end]))
} else if retryMap[start] == 2 {
// 重试检验3
retryMap[start]++
st := math_rand.Int63n(totalBytes - 1024*1024)
et := st + end - start
io.Copy(w, bytes.NewBuffer(b[st:et+1]))
} else {
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, end-start+1))
if err != nil || int64(n) != end-start+1 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}
io.Copy(w, bytes.NewBuffer(b[start:end+1]))
}
})

Expand All @@ -587,9 +573,20 @@ func TestObjectService_Download(t *testing.T) {
PartSize: 1,
}
downPath := "down.file" + time.Now().Format(time.RFC3339)
defer os.Remove(downPath)
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err == nil {
// 长度不一致 Failed
t.Fatalf("Object.Upload returned error: %v", err)
}
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err == nil {
// CRC不一致
t.Fatalf("Object.Upload returned error: %v", err)
}
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err != nil {
// 正确
t.Fatalf("Object.Upload returned error: %v", err)
}
os.Remove(downPath)
}

0 comments on commit 2a64ad4

Please sign in to comment.