-
Notifications
You must be signed in to change notification settings - Fork 397
/
combiner.go
171 lines (143 loc) · 4.04 KB
/
combiner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"sync"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Handler handles piece deletion requests from a queue.
type Handler interface {
// Handle should call queue.PopAll until finished.
Handle(ctx context.Context, node storj.NodeURL, queue Queue)
}
// NewQueue is a constructor func for queues.
type NewQueue func() Queue
// Queue is a queue for jobs.
type Queue interface {
// TryPush tries to push a new job to the queue.
TryPush(job Job) bool
// PopAll fetches all jobs in the queue.
//
// When there are no more jobs, the queue must stop accepting new jobs.
PopAll() ([]Job, bool)
// PopAllWithoutClose fetches all jobs in the queue,
// but without closing the queue for new requests.
PopAllWithoutClose() []Job
}
// Job is a single of deletion.
type Job struct {
// Pieces are the pieces id-s that need to be deleted.
Pieces []storj.PieceID
// Resolve is for notifying the job issuer about the outcome.
Resolve Promise
}
// Promise is for signaling to the deletion requests about the result.
type Promise interface {
// Success is called when the job has been successfully handled.
Success()
// Failure is called when the job didn't complete successfully.
Failure()
}
// Combiner combines multiple concurrent deletion requests into batches.
type Combiner struct {
// ctx context to pass down to the handler.
ctx context.Context
cancel context.CancelFunc
// handler defines what to do with the jobs.
handler Handler
// newQueue creates a new queue.
newQueue NewQueue
// workers contains all worker goroutines.
workers sync2.WorkGroup
// mu protects workerByID
mu sync.Mutex
workerByID map[storj.NodeID]*worker
}
// worker handles a batch of jobs.
type worker struct {
waitFor chan struct{}
node storj.NodeURL
jobs Queue
done chan struct{}
}
// NewCombiner creates a new combiner.
func NewCombiner(parent context.Context, handler Handler, newQueue NewQueue) *Combiner {
ctx, cancel := context.WithCancel(parent)
return &Combiner{
ctx: ctx,
cancel: cancel,
handler: handler,
newQueue: newQueue,
workerByID: map[storj.NodeID]*worker{},
}
}
// Close shuts down all workers.
func (combiner *Combiner) Close() {
combiner.cancel()
combiner.workers.Close()
}
// Enqueue adds a deletion job to the queue.
func (combiner *Combiner) Enqueue(node storj.NodeURL, job Job) {
combiner.mu.Lock()
defer combiner.mu.Unlock()
last := combiner.workerByID[node.ID]
// Check whether we can use the last worker.
if last != nil && last.jobs.TryPush(job) {
// We've successfully added a job to an existing worker.
return
}
// Create a new worker when one doesn't exist or the last one was full.
next := &worker{
node: node,
jobs: combiner.newQueue(),
done: make(chan struct{}),
}
if last != nil {
next.waitFor = last.done
}
combiner.workerByID[node.ID] = next
if !next.jobs.TryPush(job) {
// This should never happen.
job.Resolve.Failure()
}
// Start the worker.
next.start(combiner)
}
// schedule starts the worker.
func (worker *worker) start(combiner *Combiner) {
// Try to add to worker pool, this may fail when we are shutting things down.
workerStarted := combiner.workers.Go(func() {
defer mon.TaskNamed("worker_start")(nil)(nil)
defer close(worker.done)
// Ensure we fail any jobs that the handler didn't handle.
defer FailPending(worker.jobs)
if worker.waitFor != nil {
// Wait for previous worker to finish work to ensure fairness between nodes.
select {
case <-worker.waitFor:
case <-combiner.ctx.Done():
return
}
}
// Handle the job queue.
combiner.handler.Handle(combiner.ctx, worker.node, worker.jobs)
})
// If we failed to start a worker, then mark all the jobs as failures.
if !workerStarted {
FailPending(worker.jobs)
}
}
// FailPending fails all the jobs in the queue.
func FailPending(jobs Queue) {
for {
list, ok := jobs.PopAll()
if !ok {
return
}
for _, job := range list {
job.Resolve.Failure()
}
}
}