-
Notifications
You must be signed in to change notification settings - Fork 0
/
poller.go
155 lines (127 loc) · 3.84 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package send
import (
"context"
"fmt"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/task"
"gorm.io/gorm"
)
// Periodically gets pending data items and puts them on the output channel
// Gets data items that somehow didn't get sent through the notification channel.
type Poller struct {
*task.Task
db *gorm.DB
monitor monitoring.Monitor
output chan *model.DataItem
}
func NewPoller(config *config.Config) (self *Poller) {
self = new(Poller)
self.Task = task.NewTask(config, "poller").
WithRepeatedSubtaskFunc(config.Sender.PollerInterval, self.handleNew).
WithRepeatedSubtaskFunc(config.Sender.PollerInterval, self.handleRetrying)
return
}
func (self *Poller) WithDB(db *gorm.DB) *Poller {
self.db = db
return self
}
func (self *Poller) WithOutputChannel(v chan *model.DataItem) *Poller {
self.output = v
return self
}
func (self *Poller) WithMonitor(monitor monitoring.Monitor) *Poller {
self.monitor = monitor
return self
}
func (self *Poller) handleNew() (repeat bool, err error) {
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.Sender.PollerTimeout)
defer cancel()
// Gets new data items
var dataItems []model.DataItem
err = self.db.WithContext(ctx).
Raw(`UPDATE data_items
SET state = 'UPLOADING'::bundle_state, updated_at = NOW()
WHERE data_item_id IN (SELECT data_item_id
FROM data_items
WHERE state = 'PENDING'::bundle_state
ORDER BY updated_at ASC
LIMIT ?
FOR UPDATE SKIP LOCKED)
RETURNING *`, self.Config.Sender.PollerMaxBatchSize).
Scan(&dataItems).Error
if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get new data items")
self.monitor.GetReport().Sender.Errors.PollerFetchError.Inc()
}
err = nil
return
}
if len(dataItems) > 0 {
self.Log.WithField("count", len(dataItems)).Debug("Polled new data items")
}
for i := range dataItems {
select {
case <-self.Ctx.Done():
return
case self.output <- &dataItems[i]:
}
// Update metrics
self.monitor.GetReport().Sender.State.BundlesFromSelects.Inc()
}
// Start another check if there can be more items to fetch
// Skip this if another check is scheduled
if len(dataItems) != self.Config.Sender.PollerMaxBatchSize {
return
}
// Repeat right away
repeat = true
return
}
func (self *Poller) handleRetrying() (repeat bool, err error) {
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.Sender.PollerTimeout)
defer cancel()
var dataItems []model.DataItem
err = self.db.WithContext(ctx).
Raw(`UPDATE data_items
SET state = 'UPLOADING'::bundle_state, updated_at = NOW()
WHERE data_item_id IN (
SELECT data_item_id
FROM data_items
WHERE state = 'UPLOADING'::bundle_state AND updated_at < NOW() - ?::interval
ORDER BY updated_at ASC
LIMIT ?
FOR UPDATE SKIP LOCKED)
RETURNING *`, fmt.Sprintf("%d seconds", int((self.Config.Sender.PollerRetryBundleAfter.Seconds()))), self.Config.Sender.PollerMaxBatchSize).
Scan(&dataItems).
Error
if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get data items for retrying")
self.monitor.GetReport().Sender.Errors.PollerFetchError.Inc()
}
err = nil
return
}
if len(dataItems) > 0 {
self.Log.WithField("count", len(dataItems)).Trace("Polled data items for retrying sending to bundling service")
}
for i := range dataItems {
select {
case <-self.Ctx.Done():
return
case self.output <- &dataItems[i]:
}
// Update metrics
self.monitor.GetReport().Sender.State.RetriedBundlesFromSelects.Inc()
}
// Start another check if there can be more items to fetch
// Skip this if another check is scheduled
if len(dataItems) != self.Config.Sender.PollerMaxBatchSize {
return
}
repeat = true
return
}