/
poller.go
164 lines (134 loc) · 4.25 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package bundle
import (
"context"
"fmt"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/task"
"gorm.io/gorm"
)
// Periodically gets the unbundled interactions and puts them on the output channel
// Gets interactions that somehow didn't get sent through the notification channel.
// Probably because of a restart.
type Poller struct {
*task.Task
db *gorm.DB
monitor monitoring.Monitor
// Data about the interactions that need to be bundled
output chan *model.BundleItem
}
func NewPoller(config *config.Config) (self *Poller) {
self = new(Poller)
if config.Bundler.PollerDisabled {
self.Task = task.NewTask(config, "poller")
return
}
self.Task = task.NewTask(config, "poller").
WithRepeatedSubtaskFunc(config.Bundler.PollerInterval, self.handleNewTransactions).
WithRepeatedSubtaskFunc(config.Bundler.PollerInterval, self.handleRetrying)
return
}
func (self *Poller) WithDB(db *gorm.DB) *Poller {
self.db = db
return self
}
func (self *Poller) WithOutputChannel(bundleItems chan *model.BundleItem) *Poller {
self.output = bundleItems
return self
}
func (self *Poller) WithMonitor(monitor monitoring.Monitor) *Poller {
self.monitor = monitor
return self
}
func (self *Poller) handleNewTransactions() (repeat bool, err error) {
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.Bundler.PollerTimeout)
defer cancel()
// Inserts interactions that weren't yet bundled into bundle_items table
var bundleItems []model.BundleItem
err = self.db.WithContext(ctx).
Raw(`UPDATE bundle_items
SET state = 'UPLOADING'::bundle_state, updated_at = NOW()
WHERE interaction_id IN (SELECT interaction_id
FROM bundle_items
WHERE state = 'PENDING'::bundle_state
ORDER BY interaction_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED)
RETURNING *`, self.Config.Bundler.PollerMaxBatchSize).
Scan(&bundleItems).Error
if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get new interactions")
self.monitor.GetReport().Bundler.Errors.PollerFetchError.Inc()
}
err = nil
return
}
if len(bundleItems) > 0 {
self.Log.WithField("count", len(bundleItems)).Debug("Polled new bundle items")
}
for i := range bundleItems {
select {
case <-self.StopChannel:
return
case self.output <- &bundleItems[i]:
}
// Update metrics
self.monitor.GetReport().Bundler.State.BundlesFromSelects.Inc()
}
// Start another check if there can be more items to fetch
// Skip this if another check is scheduled
if len(bundleItems) != self.Config.Bundler.PollerMaxBatchSize {
return
}
// Repeat right away
repeat = true
return
}
func (self *Poller) handleRetrying() (repeat bool, err error) {
ctx, cancel := context.WithTimeout(self.Ctx, self.Config.Bundler.PollerTimeout)
defer cancel()
// Inserts interactions that weren't yet bundled into bundle_items table
var bundleItems []model.BundleItem
err = self.db.WithContext(ctx).
Raw(`UPDATE bundle_items
SET state = 'UPLOADING'::bundle_state, updated_at = NOW()
WHERE interaction_id IN (
SELECT interaction_id
FROM bundle_items
WHERE state = 'UPLOADING'::bundle_state AND updated_at < NOW() - ?::interval
ORDER BY interaction_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED)
RETURNING *`, fmt.Sprintf("%d seconds", int((self.Config.Bundler.PollerRetryBundleAfter.Seconds()))), self.Config.Bundler.PollerMaxBatchSize).
Scan(&bundleItems).
Error
if err != nil {
if err != gorm.ErrRecordNotFound {
self.Log.WithError(err).Error("Failed to get interactions for retrying")
self.monitor.GetReport().Bundler.Errors.PollerFetchError.Inc()
}
err = nil
return
}
if len(bundleItems) > 0 {
self.Log.WithField("count", len(bundleItems)).Trace("Polled bundle items for retrying")
}
for i := range bundleItems {
select {
case <-self.StopChannel:
return
case self.output <- &bundleItems[i]:
}
// Update metrics
self.monitor.GetReport().Bundler.State.RetriedBundlesFromSelects.Inc()
}
// Start another check if there can be more items to fetch
// Skip this if another check is scheduled
if len(bundleItems) != self.Config.Bundler.PollerMaxBatchSize {
return
}
repeat = true
return
}