-
Notifications
You must be signed in to change notification settings - Fork 246
/
service.go
447 lines (386 loc) · 13 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
package history
import (
"context"
"database/sql"
"errors"
"math/big"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
statustypes "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
statusrpc "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
)
// EventBalanceHistoryUpdateStarted and EventBalanceHistoryUpdateDone are used to notify the UI that balance history is being updated
const (
EventBalanceHistoryUpdateStarted walletevent.EventType = "wallet-balance-history-update-started"
EventBalanceHistoryUpdateFinished walletevent.EventType = "wallet-balance-history-update-finished"
EventBalanceHistoryUpdateFinishedWithError walletevent.EventType = "wallet-balance-history-update-finished-with-error"
balanceHistoryUpdateInterval = 12 * time.Hour
)
type Service struct {
balance *Balance
db *sql.DB
eventFeed *event.Feed
rpcClient *statusrpc.Client
networkManager *network.Manager
tokenManager *token.Manager
serviceContext context.Context
cancelFn context.CancelFunc
timer *time.Timer
visibleTokenSymbols []string
visibleTokenSymbolsMutex sync.Mutex // Protects access to visibleSymbols
}
type chainIdentity uint64
func NewService(db *sql.DB, eventFeed *event.Feed, rpcClient *statusrpc.Client, tokenManager *token.Manager) *Service {
return &Service{
balance: NewBalance(NewBalanceDB(db)),
db: db,
eventFeed: eventFeed,
rpcClient: rpcClient,
networkManager: rpcClient.NetworkManager,
tokenManager: tokenManager,
}
}
func (s *Service) Stop() {
if s.cancelFn != nil {
s.cancelFn()
}
}
func (s *Service) triggerEvent(eventType walletevent.EventType, account statustypes.Address, message string) {
s.eventFeed.Send(walletevent.Event{
Type: eventType,
Accounts: []common.Address{
common.Address(account),
},
Message: message,
})
}
func (s *Service) StartBalanceHistory() {
go func() {
s.serviceContext, s.cancelFn = context.WithCancel(context.Background())
s.timer = time.NewTimer(balanceHistoryUpdateInterval)
update := func() (exit bool) {
err := s.updateBalanceHistoryForAllEnabledNetworks(s.serviceContext)
if s.serviceContext.Err() != nil {
s.triggerEvent(EventBalanceHistoryUpdateFinished, statustypes.Address{}, "Service canceled")
s.timer.Stop()
return true
}
if err != nil {
s.triggerEvent(EventBalanceHistoryUpdateFinishedWithError, statustypes.Address{}, err.Error())
}
return false
}
if update() {
return
}
for range s.timer.C {
s.resetTimer(balanceHistoryUpdateInterval)
if update() {
return
}
}
}()
}
func (s *Service) resetTimer(interval time.Duration) {
if s.timer != nil {
s.timer.Stop()
s.timer.Reset(interval)
}
}
func (s *Service) UpdateVisibleTokens(symbols []string) {
s.visibleTokenSymbolsMutex.Lock()
defer s.visibleTokenSymbolsMutex.Unlock()
startUpdate := len(s.visibleTokenSymbols) == 0 && len(symbols) > 0
s.visibleTokenSymbols = symbols
if startUpdate {
s.resetTimer(0)
}
}
func (s *Service) isTokenVisible(tokenSymbol string) bool {
s.visibleTokenSymbolsMutex.Lock()
defer s.visibleTokenSymbolsMutex.Unlock()
for _, visibleSymbol := range s.visibleTokenSymbols {
if visibleSymbol == tokenSymbol {
return true
}
}
return false
}
// Native token implementation of DataSource interface
type chainClientSource struct {
chainClient *chain.Client
currency string
}
func (src *chainClientSource) HeaderByNumber(ctx context.Context, blockNo *big.Int) (*types.Header, error) {
return src.chainClient.HeaderByNumber(ctx, blockNo)
}
func (src *chainClientSource) BalanceAt(ctx context.Context, account common.Address, blockNo *big.Int) (*big.Int, error) {
return src.chainClient.BalanceAt(ctx, account, blockNo)
}
func (src *chainClientSource) ChainID() uint64 {
return src.chainClient.ChainID
}
func (src *chainClientSource) Currency() string {
return src.currency
}
func (src *chainClientSource) TimeNow() int64 {
return time.Now().UTC().Unix()
}
type tokenChainClientSource struct {
chainClientSource
TokenManager *token.Manager
NetworkManager *network.Manager
firstUnavailableBlockNo *big.Int
}
func (src *tokenChainClientSource) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
network := src.NetworkManager.Find(src.chainClient.ChainID)
if network == nil {
return nil, errors.New("network not found")
}
token := src.TokenManager.FindToken(network, src.currency)
if token == nil {
return nil, errors.New("token not found")
}
if src.firstUnavailableBlockNo != nil && blockNumber.Cmp(src.firstUnavailableBlockNo) < 0 {
return big.NewInt(0), nil
}
balance, err := src.TokenManager.GetTokenBalanceAt(ctx, src.chainClient, account, token.Address, blockNumber)
if err != nil {
if err.Error() == "no contract code at given address" {
// Ignore requests before contract deployment and mark this state for future requests
src.firstUnavailableBlockNo = new(big.Int).Set(blockNumber)
return big.NewInt(0), nil
}
return nil, err
}
return balance, err
}
// GetBalanceHistory returns token count balance
// TODO: fetch token to FIAT exchange rates and return FIAT balance
func (s *Service) GetBalanceHistory(ctx context.Context, chainIDs []uint64, address common.Address, currency string, endTimestamp int64, timeInterval TimeInterval) ([]*DataPoint, error) {
allData := make(map[chainIdentity][]*DataPoint)
for _, chainID := range chainIDs {
data, err := s.balance.get(ctx, chainID, currency, address, endTimestamp, timeInterval)
if err != nil {
return nil, err
}
if len(data) > 0 {
allData[chainIdentity(chainID)] = data
}
}
return mergeDataPoints(allData, strideDuration(timeInterval))
}
// mergeDataPoints merges close in time block numbers. Drops the ones that are not in a stride duration
// this should improve merging balance data from different chains which are incompatible due to different timelines
// and block length
func mergeDataPoints(data map[chainIdentity][]*DataPoint, stride time.Duration) ([]*DataPoint, error) {
if len(data) == 0 {
return make([]*DataPoint, 0), nil
}
pos := make(map[chainIdentity]int)
for k := range data {
pos[k] = 0
}
res := make([]*DataPoint, 0)
strideStart := findFirstStrideWindow(data, stride)
for {
strideEnd := strideStart + int64(stride.Seconds())
// - Gather all points in the stride window starting with current pos
var strideIdentities map[chainIdentity][]timeIdentity
strideIdentities, pos = dataInStrideWindowAndNextPos(data, pos, strideEnd)
// Check if all chains have data
strideComplete := true
for k := range data {
_, strideComplete = strideIdentities[k]
if !strideComplete {
break
}
}
if strideComplete {
chainMaxBalance := make(map[chainIdentity]*DataPoint)
for chainID, identities := range strideIdentities {
for _, identity := range identities {
_, exists := chainMaxBalance[chainID]
if exists && (*big.Int)(identity.dataPoint(data).Value).Cmp((*big.Int)(chainMaxBalance[chainID].Value)) <= 0 {
continue
}
chainMaxBalance[chainID] = identity.dataPoint(data)
}
}
balance := big.NewInt(0)
for _, chainBalance := range chainMaxBalance {
balance.Add(balance, (*big.Int)(chainBalance.Value))
}
res = append(res, &DataPoint{
Timestamp: uint64(strideEnd),
Value: (*hexutil.Big)(balance),
BlockNumber: (*hexutil.Big)(getBlockID(chainMaxBalance)),
})
}
if allPastEnd(data, pos) {
return res, nil
}
strideStart = strideEnd
}
}
func getBlockID(chainBalance map[chainIdentity]*DataPoint) *big.Int {
var res *big.Int
for _, balance := range chainBalance {
if res == nil {
res = new(big.Int).Set(balance.BlockNumber.ToInt())
} else if res.Cmp(balance.BlockNumber.ToInt()) != 0 {
return nil
}
}
return res
}
type timeIdentity struct {
chain chainIdentity
index int
}
func (i timeIdentity) dataPoint(data map[chainIdentity][]*DataPoint) *DataPoint {
return data[i.chain][i.index]
}
func (i timeIdentity) atEnd(data map[chainIdentity][]*DataPoint) bool {
return (i.index + 1) == len(data[i.chain])
}
func (i timeIdentity) pastEnd(data map[chainIdentity][]*DataPoint) bool {
return i.index >= len(data[i.chain])
}
func allPastEnd(data map[chainIdentity][]*DataPoint, pos map[chainIdentity]int) bool {
for chainID := range pos {
if !(timeIdentity{chainID, pos[chainID]}).pastEnd(data) {
return false
}
}
return true
}
// findFirstStrideWindow returns the start of the first stride window
// Tried to implement finding an optimal stride window but it was becoming too complicated and not worth it given that it will
// potentially save the first and last stride but it is not guaranteed. Current implementation should give good results
// as long as the the DataPoints are regular enough
func findFirstStrideWindow(data map[chainIdentity][]*DataPoint, stride time.Duration) int64 {
pos := make(map[chainIdentity]int)
for k := range data {
pos[k] = 0
}
// Identify the current oldest and newest block
cur := sortTimeAsc(data, pos)
return int64(cur[0].dataPoint(data).Timestamp)
}
func copyMap[K comparable, V any](original map[K]V) map[K]V {
copy := make(map[K]V, len(original))
for key, value := range original {
copy[key] = value
}
return copy
}
// startPos might have indexes past the end of the data for a chain
func dataInStrideWindowAndNextPos(data map[chainIdentity][]*DataPoint, startPos map[chainIdentity]int, endT int64) (identities map[chainIdentity][]timeIdentity, nextPos map[chainIdentity]int) {
pos := copyMap(startPos)
identities = make(map[chainIdentity][]timeIdentity)
// Identify the current oldest and newest block
lastLen := int(-1)
for lastLen < len(identities) {
lastLen = len(identities)
sorted := sortTimeAsc(data, pos)
for _, identity := range sorted {
if identity.dataPoint(data).Timestamp < uint64(endT) {
identities[identity.chain] = append(identities[identity.chain], identity)
pos[identity.chain]++
}
}
}
return identities, pos
}
// sortTimeAsc expect indexes in pos past the end of the data for a chain
func sortTimeAsc(data map[chainIdentity][]*DataPoint, pos map[chainIdentity]int) []timeIdentity {
res := make([]timeIdentity, 0, len(data))
for k := range data {
identity := timeIdentity{
chain: k,
index: pos[k],
}
if !identity.pastEnd(data) {
res = append(res, identity)
}
}
sort.Slice(res, func(i, j int) bool {
return res[i].dataPoint(data).Timestamp < res[j].dataPoint(data).Timestamp
})
return res
}
// updateBalanceHistoryForAllEnabledNetworks iterates over all enabled and supported networks for the s.visibleTokenSymbol
// and updates the balance history for the given address
//
// expects ctx to have cancellation support and processing to be cancelled by the caller
func (s *Service) updateBalanceHistoryForAllEnabledNetworks(ctx context.Context) error {
accountsDB, err := accounts.NewDB(s.db)
if err != nil {
return err
}
addresses, err := accountsDB.GetWalletAddresses()
if err != nil {
return err
}
networks, err := s.networkManager.Get(true)
if err != nil {
return err
}
for _, address := range addresses {
s.triggerEvent(EventBalanceHistoryUpdateStarted, address, "")
for _, network := range networks {
tokensForChain, err := s.tokenManager.GetTokens(network.ChainID)
if err != nil {
tokensForChain = make([]*token.Token, 0)
}
tokensForChain = append(tokensForChain, s.tokenManager.ToToken(network))
for _, token := range tokensForChain {
if !s.isTokenVisible(token.Symbol) {
continue
}
var dataSource DataSource
chainClient, err := chain.NewClient(s.rpcClient, network.ChainID)
if err != nil {
return err
}
if token.IsNative() {
dataSource = &chainClientSource{chainClient, token.Symbol}
} else {
dataSource = &tokenChainClientSource{
chainClientSource: chainClientSource{
chainClient: chainClient,
currency: token.Symbol,
},
TokenManager: s.tokenManager,
NetworkManager: s.networkManager,
}
}
for currentInterval := int(BalanceHistoryAllTime); currentInterval >= int(BalanceHistory7Days); currentInterval-- {
select {
case <-ctx.Done():
return errors.New("context cancelled")
default:
}
err = s.balance.update(ctx, dataSource, common.Address(address), TimeInterval(currentInterval))
if err != nil {
log.Warn("Error updating balance history", "chainID", dataSource.ChainID(), "currency", dataSource.Currency(), "address", address.String(), "interval", currentInterval, "err", err)
}
}
}
}
s.triggerEvent(EventBalanceHistoryUpdateFinished, address, "")
}
return nil
}