forked from Angey40/BaiduPCS-Go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
133 lines (116 loc) · 2.38 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package uploader
import (
"context"
"github.com/iikira/BaiduPCS-Go/pcsutil/waitgroup"
"github.com/oleiade/lane"
"os"
"time"
)
type (
worker struct {
id int
partOffset int64
splitUnit SplitUnit
checksum string
}
workerList []*worker
)
// CheckSumList 返回所以worker的checksum
// TODO: 实现sort
func (werl *workerList) CheckSumList() []string {
checksumList := make([]string, 0, len(*werl))
for _, wer := range *werl {
checksumList = append(checksumList, wer.checksum)
}
return checksumList
}
func (werl *workerList) Readed() int64 {
var readed int64
for _, wer := range *werl {
readed += wer.splitUnit.Readed()
}
return readed
}
func (muer *MultiUploader) upload() (uperr error) {
err := muer.multiUpload.Precreate()
if err != nil {
return err
}
var (
deque = lane.NewDeque()
// 控制并发量
wg = waitgroup.NewWaitGroup(muer.parallel)
)
// 加入队列
for _, wer := range muer.workers {
if wer.checksum == "" {
deque.Append(wer)
}
}
for {
e := deque.Shift()
if e == nil { // 任务为空
if wg.Parallel() == 0 { // 结束
break
} else {
time.Sleep(1e9)
continue
}
}
wer := e.(*worker)
wg.AddDelta()
go func() {
defer wg.Done()
var (
ctx, cancel = context.WithCancel(context.Background())
doneChan = make(chan struct{})
checksum string
terr error
)
go func() {
checksum, terr = muer.multiUpload.TmpFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit)
close(doneChan)
}()
select {
case <-muer.canceled:
cancel()
return
case <-doneChan:
// continue
}
cancel()
if terr != nil {
if me, ok := terr.(*MultiError); ok {
if me.Terminated { // 终止
close(muer.canceled)
uperr = me.Err
return
}
}
uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, os.SEEK_SET)
deque.Append(wer)
return
}
wer.checksum = checksum
// 通知更新
if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) {
muer.updateInstanceStateChan <- struct{}{}
}
}()
}
wg.Wait()
select {
case <-muer.canceled:
if uperr != nil {
return uperr
}
return context.Canceled
default:
}
cerr := muer.multiUpload.CreateSuperFile(muer.workers.CheckSumList()...)
if cerr != nil {
return cerr
}
return
}