-
Notifications
You must be signed in to change notification settings - Fork 3
/
retriever.go
148 lines (125 loc) · 3.96 KB
/
retriever.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Package retriever contains all components for events retriever, which
// gets all past events
package retriever // import "github.com/joincivil/civil-events-crawler/pkg/retriever"
import (
"runtime"
"sort"
"sync"
"time"
log "github.com/golang/glog"
"github.com/Jeffail/tunny"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/joincivil/civil-events-crawler/pkg/model"
)
const (
workerMultiplier = 2
)
// NewEventRetriever creates a EventRetriever given a list of ContractFilterers and
// connection to client
func NewEventRetriever(client bind.ContractBackend, filterers []model.ContractFilterers) *EventRetriever {
retriever := &EventRetriever{
client: client,
PastEvents: make([]*model.Event, 0),
filterers: filterers,
}
return retriever
}
// EventRetriever handles the iterator returned from retrieving past events
type EventRetriever struct {
// Client is the ethereum client from go-ethereum
client bind.ContractBackend
// PastEvents is a slice that holds all past Events requested
PastEvents []*model.Event
// filterers contains a list of ContractFilterers
filterers []model.ContractFilterers
// Mutex to lock writes/reads to PastEvents
pastEventsMutex sync.Mutex
// Mutex to lock access to the filterers map
mutex sync.Mutex
}
// Retrieve gets all events from StartBlock until now
// If nonSubOnly is true, retrieve only event types that are not subscribed to by
// the watchers. This is useful when polling for events alongside setting up watchers
// for events.
func (r *EventRetriever) Retrieve(nonSubOnly bool) error {
start := time.Now()
numWorkers := runtime.NumCPU() * workerMultiplier
// Worker pool to run the filterers
pool := tunny.NewFunc(numWorkers, func(payload interface{}) interface{} {
f := payload.(func())
f()
return nil
})
defer pool.Close()
wg := sync.WaitGroup{}
for _, filter := range r.filterers {
wg.Add(1)
go func(filt model.ContractFilterers) {
filtererFunc := func() {
log.Infof(
"Starting filterer: %v, %v",
filt.ContractName(),
filt.ContractAddress().Hex(),
)
pastEvents := []*model.Event{}
pastEvents, err := filt.StartFilterers(r.client, pastEvents, nonSubOnly)
if err != nil {
log.Errorf("Error retrieving filterer events: err: %v", err)
return
}
r.pastEventsMutex.Lock()
r.PastEvents = append(r.PastEvents, pastEvents...)
r.pastEventsMutex.Unlock()
}
pool.Process(filtererFunc)
wg.Done()
log.Infof(
"Completed filterer: %v, %v",
filt.ContractName(),
filt.ContractAddress().Hex(),
)
}(filter)
}
wg.Wait()
log.Infof("All %v filterers have run, took %v", len(r.filterers), time.Since(start))
return nil
}
// AddFilterers add filterers to the retriever
func (r *EventRetriever) AddFilterers(w model.ContractFilterers) error {
defer r.mutex.Unlock()
r.mutex.Lock()
r.filterers = append(r.filterers, w)
return nil
}
// RemoveFilterers remove given filterers from the retriever
func (r *EventRetriever) RemoveFilterers(w model.ContractFilterers) error {
defer r.mutex.Unlock()
r.mutex.Lock()
if r.filterers != nil && len(r.filterers) > 0 {
for index, ew := range r.filterers {
if w.ContractAddress() == ew.ContractAddress() &&
w.ContractName() == ew.ContractName() {
// Delete the item in the filterers list.
copy(r.filterers[index:], r.filterers[index+1:])
r.filterers[len(r.filterers)-1] = nil
r.filterers = r.filterers[:len(r.filterers)-1]
return nil
}
}
}
return nil
}
// SortEventsByBlock sorts events in PastEvents by block number
// NOTE(IS): This is not optimal, but for now checking that values exist outside of sort
// Pass in nil if you want to sort retriever.PastEvents
func (r *EventRetriever) SortEventsByBlock(events []*model.Event) error {
if events == nil {
events = r.PastEvents
}
sort.Slice(events, func(i, j int) bool {
blockNumber1 := events[i].BlockNumber()
blockNumber2 := events[j].BlockNumber()
return blockNumber1 < blockNumber2
})
return nil
}