forked from git-lfs/git-lfs
/
upload_queue.go
201 lines (169 loc) · 4.77 KB
/
upload_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package lfs
import (
"fmt"
"github.com/cheggaaa/pb"
"os"
"path/filepath"
"sync"
"sync/atomic"
)
var (
clientAuthorized = int32(0)
)
// Uploadable describes a file that can be uploaded.
type Uploadable struct {
OIDPath string
Filename string
CB CopyCallback
Size int64
}
// NewUploadable builds the Uploadable from the given information.
func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *WrappedError) {
path, err := LocalMediaPath(oid)
if err != nil {
return nil, Errorf(err, "Error uploading file %s (%s)", filename, oid)
}
if err := ensureFile(filename, path); err != nil {
return nil, Errorf(err, "Error uploading file %s (%s)", filename, oid)
}
fi, err := os.Stat(filename)
if err != nil {
return nil, Errorf(err, "Error uploading file %s (%s)", filename, oid)
}
cb, file, cbErr := CopyCallbackFile("push", filename, index, totalFiles)
if cbErr != nil {
fmt.Fprintln(os.Stderr, cbErr.Error())
}
if file != nil {
defer file.Close()
}
return &Uploadable{path, filename, cb, fi.Size()}, nil
}
// UploadQueue provides a queue that will allow concurrent uploads.
type UploadQueue struct {
uploadc chan *Uploadable
errorc chan *WrappedError
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(workers, files int) *UploadQueue {
return &UploadQueue{
uploadc: make(chan *Uploadable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
}
}
// Add adds an Uploadable to the upload queue.
func (q *UploadQueue) Add(u *Uploadable) {
q.wg.Add(1)
q.size += u.Size
q.uploadc <- u
}
// Process starts the upload queue and displays a progress bar.
func (q *UploadQueue) Process() {
bar := pb.New64(q.size)
bar.SetUnits(pb.U_BYTES)
bar.ShowBar = false
bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
bar.Start()
// This goroutine collects errors returned from uploads
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process uploads. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
// This will block Process() until the worker goroutines are spun up and ready
// to process uploads.
workersReady := make(chan int, q.workers)
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process uploads
go func(n int) {
workersReady <- 1
for upload := range q.uploadc {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
cb := func(total, read int64, current int) error {
bar.Add(current)
if upload.CB != nil {
return upload.CB(total, read, current)
}
return nil
}
err := Upload(upload.OIDPath, upload.Filename, cb)
if err != nil {
q.errorc <- err
}
f := atomic.AddInt64(&q.finished, 1)
bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
}
close(q.uploadc)
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
q.wg.Wait()
close(q.errorc)
bar.Finish()
}
// Errors returns any errors encountered during uploading.
func (q *UploadQueue) Errors() []*WrappedError {
return q.errors
}
// ensureFile makes sure that the cleanPath exists before pushing it. If it
// does not exist, it attempts to clean it by reading the file at smudgePath.
func ensureFile(smudgePath, cleanPath string) error {
if _, err := os.Stat(cleanPath); err == nil {
return nil
}
expectedOid := filepath.Base(cleanPath)
localPath := filepath.Join(LocalWorkingDir, smudgePath)
file, err := os.Open(localPath)
if err != nil {
return err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return err
}
cleaned, err := PointerClean(file, stat.Size(), nil)
if err != nil {
return err
}
cleaned.Close()
if expectedOid != cleaned.Oid {
return fmt.Errorf("Expected %s to have an OID of %s, got %s", smudgePath, expectedOid, cleaned.Oid)
}
return nil
}