-
Notifications
You must be signed in to change notification settings - Fork 22
/
range_put_file.go
112 lines (93 loc) · 2.51 KB
/
range_put_file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package files
import (
"context"
"fmt"
"io"
"log"
"math"
"os"
"sync"
)
// PutFile is a helper method which takes a file, and automatically chunks it up, rather than having to do this yourself
func (c Client) PutFile(ctx context.Context, shareName, path, fileName string, file *os.File, parallelism int) error {
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("error loading file info: %s", err)
}
if fileInfo.Size() == 0 {
return fmt.Errorf("file is empty which is not supported")
}
fileSize := fileInfo.Size()
chunkSize := 4 * 1024 * 1024 // 4MB
if chunkSize > int(fileSize) {
chunkSize = int(fileSize)
}
chunks := int(math.Ceil(float64(fileSize) / float64(chunkSize*1.0)))
workerCount := parallelism
if workerCount > chunks {
workerCount = chunks
}
var waitGroup sync.WaitGroup
waitGroup.Add(workerCount)
jobs := make(chan int, workerCount)
errors := make(chan error, chunkSize)
for i := 0; i < workerCount; i++ {
go func() {
for i := range jobs {
log.Printf("[DEBUG] Chunk %d of %d", i+1, chunks)
uci := uploadChunkInput{
thisChunk: i,
chunkSize: chunkSize,
fileSize: fileSize,
}
_, err := c.uploadChunk(ctx, shareName, path, fileName, uci, file)
if err != nil {
errors <- err
}
}
waitGroup.Done()
}()
}
for i := 0; i < chunks; i++ {
jobs <- i
}
close(jobs)
waitGroup.Wait()
// TODO: we should switch to hashicorp/multi-error here
if len(errors) > 0 {
return fmt.Errorf("uploading file: %s", <-errors)
}
return nil
}
type uploadChunkInput struct {
thisChunk int
chunkSize int
fileSize int64
}
func (c Client) uploadChunk(ctx context.Context, shareName, path, fileName string, input uploadChunkInput, file *os.File) (result PutRangeResponse, err error) {
startBytes := int64(input.chunkSize * input.thisChunk)
endBytes := startBytes + int64(input.chunkSize)
// the last size may exceed the size of the file
remaining := input.fileSize - startBytes
if int64(input.chunkSize) > remaining {
endBytes = startBytes + remaining
}
bytesToRead := int(endBytes) - int(startBytes)
bytes := make([]byte, bytesToRead)
_, err = file.ReadAt(bytes, startBytes)
if err != nil {
if err != io.EOF {
return result, fmt.Errorf("reading bytes: %s", err)
}
}
putBytesInput := PutByteRangeInput{
StartBytes: startBytes,
EndBytes: endBytes,
Content: bytes,
}
result, err = c.PutByteRange(ctx, shareName, path, fileName, putBytesInput)
if err != nil {
return result, fmt.Errorf("putting bytes: %s", err)
}
return
}