Skip to content

Commit

Permalink
add checkpoint multi upload
Browse files Browse the repository at this point in the history
  • Loading branch information
jojoliang committed Sep 29, 2020
1 parent c88b738 commit 287669a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 20 deletions.
137 changes: 118 additions & 19 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -481,15 +482,28 @@ type Object struct {
// MultiUploadOptions is the option of the multiupload,
// ThreadPoolSize default is one
type MultiUploadOptions struct {
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
CheckPointFile string
EnableCheckpoint bool
}

type CheckPointOptions struct {
cpfile *os.File
Key string `xml:"Key"`
FilePath string `xml:"FilePath"`
FileSize int64 `xml:"FileSize"`
PartSize int64 `xml:"PartSize"`
UploadID string `xml:"UploadID"`
Parts []Object `xml:"Parts>Part,omitempty"`
}

type Chunk struct {
Number int
OffSet int64
Size int64
Done bool
}

// jobs
Expand All @@ -506,13 +520,15 @@ type Jobs struct {
type Results struct {
PartNumber int
Resp *Response
err error
}

func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.err = err
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
Expand All @@ -528,6 +544,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
&io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
res.PartNumber = j.Chunk.Number
res.Resp = resp
res.err = err
if err != nil {
rt--
if rt == 0 {
Expand All @@ -554,34 +571,72 @@ func DividePart(fileSize int64) (int64, int64) {
return partNum, partSize
}

func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) {
if filePath == "" {
return nil, 0, errors.New("filePath invalid")
return nil, nil, 0, errors.New("filePath invalid")
}

file, err := os.Open(filePath)
if err != nil {
return nil, 0, err
return nil, nil, 0, err
}
defer file.Close()

stat, err := file.Stat()
if err != nil {
return nil, 0, err
return nil, nil, 0, err
}

optcp := &CheckPointOptions{}
uploaded := false
if opt.EnableCheckpoint {
for {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error())
}
defer optcp.cpfile.Close()
bs, err := ioutil.ReadAll(optcp.cpfile)
if err != nil {
break
}
err = xml.Unmarshal(bs, optcp)
if err != nil {
break
}
if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() {
break
}
uploaded = true
break
}
optcp.Key = name
optcp.FilePath = filePath
optcp.FileSize = stat.Size()
}

var partNum int64
partSize := opt.PartSize
if uploaded {
partSize = optcp.PartSize
}
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = stat.Size() / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
return nil, nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
}
if opt.EnableCheckpoint {
optcp.PartSize = partSize / 1024 / 1024
}

var chunks []Chunk
var chunk = Chunk{}
var chunk = Chunk{
Done: false,
}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
Expand All @@ -597,8 +652,14 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error)
partNum++
}

return chunks, int(partNum), nil

if uploaded {
for _, part := range optcp.Parts {
if part.PartNumber <= int(partNum) {
chunks[(part.PartNumber - 1)].Done = true
}
}
}
return optcp, chunks, int(partNum), nil
}

// MultiUpload/Upload 为高级upload接口,并发分块上传
Expand All @@ -615,8 +676,12 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
if opt == nil {
opt = &MultiUploadOptions{}
}
if opt.EnableCheckpoint && opt.CheckPointFile == "" {
opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath)
}

// 1.Get the file chunk
chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt)
if err != nil {
return nil, nil, err
}
Expand All @@ -637,15 +702,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
ETag: rsp.Header.Get("ETag"),
}
return result, rsp, nil
}
if opt.EnableCheckpoint {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644)
if err != nil {
return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error())
}
defer optcp.cpfile.Close()
}

// 2.Init
uploadID := optcp.UploadID
optini := opt.OptIni
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
if uploadID == "" {
// 2.Init
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID
optcp.UploadID = uploadID
}
uploadID := res.UploadID
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
Expand All @@ -657,6 +733,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
optcom := &CompleteMultipartUploadOptions{}
if len(optcp.Parts) > 0 {
optcom.Parts = append(optcom.Parts, optcp.Parts...)
partNum -= len(optcp.Parts)
}

// 3.Start worker
for w := 1; w <= poolSize; w++ {
Expand All @@ -665,6 +745,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string

// 4.Push jobs
for _, chunk := range chunks {
if chunk.Done {
continue
}
partOpt := &ObjectUploadPartOptions{}
if optini != nil && optini.ObjectPutHeaderOptions != nil {
partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
Expand All @@ -687,19 +770,35 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
for i := 1; i <= partNum; i++ {
res := <-chresults
// Notice one part fail can not get the etag according.
if res.Resp == nil {
if res.Resp == nil || res.err != nil {
// Some part already fail, can not to get the header inside.
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
}
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
if opt.EnableCheckpoint {
optcp.Parts = append(optcp.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
err := optcp.cpfile.Truncate(0)
if err != nil {
continue
}
_, err = optcp.cpfile.Seek(0, os.SEEK_SET)
if err == nil {
xml.NewEncoder(optcp.cpfile).Encode(optcp)
}
}
}
sort.Sort(ObjectList(optcom.Parts))

v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
if opt.EnableCheckpoint && err == nil {
os.Remove(opt.CheckPointFile)
}

return v, resp, err
}
Expand Down
2 changes: 1 addition & 1 deletion object_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string
// ObjectUploadPartOptions is the options of upload-part
type ObjectUploadPartOptions struct {
Expect string `header:"Expect,omitempty" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"`

XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
Expand Down

0 comments on commit 287669a

Please sign in to comment.