-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
scheduler.go
109 lines (93 loc) · 2.85 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package batch
import (
"sync"
"time"
"github.com/lightningnetwork/lnd/kvdb"
)
// TimeScheduler is a batching engine that executes requests within a fixed
// horizon. When the first request is received, a TimeScheduler waits a
// configurable duration for other concurrent requests to join the batch. Once
// this time has elapsed, the batch is closed and executed. Subsequent requests
// are then added to a new batch which undergoes the same process.
type TimeScheduler struct {
db kvdb.Backend
locker sync.Locker
duration time.Duration
mu sync.Mutex
b *batch
}
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
// which to schedule batches. If the operation needs to modify a higher-level
// cache, the cache's lock should be provided to so that external consistency
// can be maintained, as successful db operations will cause a request's
// OnCommit method to be executed while holding this lock.
func NewTimeScheduler(db kvdb.Backend, locker sync.Locker,
duration time.Duration) *TimeScheduler {
return &TimeScheduler{
db: db,
locker: locker,
duration: duration,
}
}
// Execute schedules the provided request for batch execution along with other
// concurrent requests. The request will be executed within a fixed horizon,
// parameterizeed by the duration of the scheduler. The error from the
// underlying operation is returned to the caller.
//
// NOTE: Part of the Scheduler interface.
func (s *TimeScheduler) Execute(r *Request) error {
req := request{
Request: r,
errChan: make(chan error, 1),
}
// Add the request to the current batch. If the batch has been cleared
// or no batch exists, create a new one.
s.mu.Lock()
if s.b == nil {
s.b = &batch{
db: s.db,
clear: s.clear,
locker: s.locker,
}
time.AfterFunc(s.duration, s.b.trigger)
}
s.b.reqs = append(s.b.reqs, &req)
// If this is a non-lazy request, we'll execute the batch immediately.
if !r.lazy {
go s.b.trigger()
}
s.mu.Unlock()
// Wait for the batch to process the request. If the batch didn't
// ask us to execute the request individually, simply return the error.
err := <-req.errChan
if err != errSolo {
return err
}
// Obtain exclusive access to the cache if this scheduler needs to
// modify the cache in OnCommit.
if s.locker != nil {
s.locker.Lock()
defer s.locker.Unlock()
}
// Otherwise, run the request on its own.
commitErr := kvdb.Update(s.db, req.Update, func() {
if req.Reset != nil {
req.Reset()
}
})
// Finally, return the commit error directly or execute the OnCommit
// closure with the commit error if present.
if req.OnCommit != nil {
return req.OnCommit(commitErr)
}
return commitErr
}
// clear resets the scheduler's batch to nil so that no more requests can be
// added.
func (s *TimeScheduler) clear(b *batch) {
s.mu.Lock()
if s.b == b {
s.b = nil
}
s.mu.Unlock()
}