@@ -4,12 +4,18 @@ package ownership
4
4
5
5
import (
6
6
"context"
7
+ "slices"
7
8
"sync"
9
+ "time"
8
10
9
11
"go.uber.org/zap"
10
12
11
13
"github.com/ethereum/go-ethereum/common"
12
- "github.com/ethereum/go-ethereum/event"
14
+
15
+ "github.com/status-im/go-wallet-sdk/pkg/balance/multistandardfetcher"
16
+ "github.com/status-im/go-wallet-sdk/pkg/contracts/erc1155"
17
+ "github.com/status-im/go-wallet-sdk/pkg/contracts/erc721"
18
+ "github.com/status-im/go-wallet-sdk/pkg/eventlog"
13
19
14
20
gocommon "github.com/status-im/status-go/common"
15
21
"github.com/status-im/status-go/crypto/types"
@@ -18,8 +24,9 @@ import (
18
24
"github.com/status-im/status-go/rpc/network"
19
25
"github.com/status-im/status-go/services/accounts/accountsevent"
20
26
walletCommon "github.com/status-im/status-go/services/wallet/common"
27
+ "github.com/status-im/status-go/services/wallet/multistandardbalance"
21
28
"github.com/status-im/status-go/services/wallet/thirdparty"
22
- "github.com/status-im/status-go/services/wallet/transfer "
29
+ "github.com/status-im/status-go/services/wallet/transferdetector "
23
30
"github.com/status-im/status-go/services/wallet/walletevent"
24
31
)
25
32
@@ -30,8 +37,6 @@ const (
30
37
type loaderPerChainID = map [walletCommon.ChainID ]* PeriodicalLoader
31
38
type loaderPerAddressAndChainID = map [common.Address ]loaderPerChainID
32
39
33
- type TransferCb func (common.Address , walletCommon.ChainID , []transfer.Transfer )
34
-
35
40
type AccountsProvider interface {
36
41
GetWalletAddresses () ([]types.Address , error )
37
42
}
@@ -41,12 +46,18 @@ type NetworksProvider interface {
41
46
GetPublisher () * pubsub.Publisher
42
47
}
43
48
49
+ type BlockChainStateProvider interface {
50
+ GetEstimatedBlockTime (ctx context.Context , chainID uint64 , blockNumber uint64 ) (time.Time , error )
51
+ }
52
+
44
53
type Controller struct {
45
- fetcher CollectibleOwnershipFetcher
46
- storage CollectibleOwnershipStorage
47
- walletFeed * event.Feed
48
- accountsProvider AccountsProvider
49
- accountsPublisher * pubsub.Publisher
54
+ fetcher CollectibleOwnershipFetcher
55
+ storage CollectibleOwnershipStorage
56
+ accountsProvider AccountsProvider
57
+ accountsPublisher * pubsub.Publisher
58
+ multistandardBalancePublisher * pubsub.Publisher
59
+ transferDetectorPublisher * pubsub.Publisher
60
+ blockChainStateProvider BlockChainStateProvider
50
61
51
62
networksProvider NetworksProvider
52
63
@@ -66,24 +77,28 @@ type Controller struct {
66
77
67
78
func NewController (
68
79
storage CollectibleOwnershipStorage ,
69
- walletFeed * event.Feed ,
70
80
accountsProvider AccountsProvider ,
71
81
accountsPublisher * pubsub.Publisher ,
72
82
networksProvider NetworksProvider ,
83
+ multistandardBalancePublisher * pubsub.Publisher ,
84
+ transferDetectorPublisher * pubsub.Publisher ,
85
+ blockChainStateProvider BlockChainStateProvider ,
73
86
fetcher CollectibleOwnershipFetcher ,
74
87
collectiblesPublisher * pubsub.Publisher ,
75
88
logger * zap.Logger ,
76
89
) * Controller {
77
90
return & Controller {
78
- fetcher : fetcher ,
79
- storage : storage ,
80
- walletFeed : walletFeed ,
81
- accountsProvider : accountsProvider ,
82
- accountsPublisher : accountsPublisher ,
83
- networksProvider : networksProvider ,
84
- periodicalLoaders : make (loaderPerAddressAndChainID ),
85
- collectiblesPublisher : collectiblesPublisher ,
86
- logger : logger .Named ("OwnershipController" ),
91
+ fetcher : fetcher ,
92
+ storage : storage ,
93
+ accountsProvider : accountsProvider ,
94
+ accountsPublisher : accountsPublisher ,
95
+ networksProvider : networksProvider ,
96
+ multistandardBalancePublisher : multistandardBalancePublisher ,
97
+ transferDetectorPublisher : transferDetectorPublisher ,
98
+ blockChainStateProvider : blockChainStateProvider ,
99
+ periodicalLoaders : make (loaderPerAddressAndChainID ),
100
+ collectiblesPublisher : collectiblesPublisher ,
101
+ logger : logger .Named ("OwnershipController" ),
87
102
}
88
103
}
89
104
@@ -105,11 +120,14 @@ func (c *Controller) Start() {
105
120
// Setup collectibles fetch when a new account gets added
106
121
c .startAccountsWatcher ()
107
122
108
- // Setup collectibles fetch when relevant activity is detected
109
- c .startWalletEventsWatcher ()
110
-
111
123
// Setup collectibles fetch when active networks change
112
124
c .startNetworkEventsWatcher ()
125
+
126
+ // Start balance change watcher
127
+ c .startBalanceChangeWatcher ()
128
+
129
+ // Start transfer detection watcher
130
+ c .startTransferDetectionWatcher ()
113
131
}
114
132
115
133
func (c * Controller ) Stop () {
@@ -120,8 +138,6 @@ func (c *Controller) Stop() {
120
138
close (c .stopCh )
121
139
c .stopCh = nil
122
140
123
- c .stopWalletEventsWatcher ()
124
-
125
141
c .stopPeriodicalLoaders ()
126
142
}
127
143
@@ -237,63 +253,118 @@ func (c *Controller) startAccountsWatcher() {
237
253
}()
238
254
}
239
255
240
- func (c * Controller ) startWalletEventsWatcher () {
241
- if c .walletEventsWatcher != nil {
242
- return
243
- }
244
-
245
- if c .walletFeed == nil {
256
+ func (c * Controller ) startNetworkEventsWatcher () {
257
+ if c .networksProvider == nil {
246
258
return
247
259
}
248
260
249
- walletEventCb := func (event walletevent.Event ) {
250
- if event .Type != transfer .EventInternalERC721TransferDetected &&
251
- event .Type != transfer .EventInternalERC1155TransferDetected {
252
- return
253
- }
254
-
255
- chainID := walletCommon .ChainID (event .ChainID )
256
- for _ , account := range event .Accounts {
257
- c .refetchOwnershipIfRecentTransfer (account , chainID , event .At )
261
+ ch , unsub := pubsub .Subscribe [network.EventActiveNetworksChanged ](c .networksProvider .GetPublisher (), 10 )
262
+ go func () {
263
+ defer gocommon .LogOnPanic ()
264
+ defer unsub ()
265
+ for {
266
+ select {
267
+ case <- c .stopCh :
268
+ return
269
+ case _ , ok := <- ch :
270
+ if ! ok {
271
+ return
272
+ }
273
+ c .checkPeriodicalLoaders ()
274
+ }
258
275
}
259
- }
260
-
261
- c .walletEventsWatcher = walletevent .NewWatcher (c .walletFeed , walletEventCb )
262
-
263
- c .walletEventsWatcher .Start ()
276
+ }()
264
277
}
265
278
266
- func (c * Controller ) stopWalletEventsWatcher () {
267
- if c .walletEventsWatcher != nil {
268
- c .walletEventsWatcher .Stop ()
269
- c .walletEventsWatcher = nil
279
+ func (c * Controller ) startBalanceChangeWatcher () {
280
+ if c .multistandardBalancePublisher == nil {
281
+ return
270
282
}
283
+
284
+ ch , unsub := pubsub .Subscribe [multistandardbalance.EventBalanceFetchFinished ](c .multistandardBalancePublisher , 10 )
285
+ go func () {
286
+ defer gocommon .LogOnPanic ()
287
+ defer unsub ()
288
+ for {
289
+ select {
290
+ case <- c .stopCh :
291
+ return
292
+ case event , ok := <- ch :
293
+ if ! ok {
294
+ return
295
+ }
296
+ switch event .ResultType {
297
+ case multistandardfetcher .ResultTypeERC721 , multistandardfetcher .ResultTypeERC1155 :
298
+ if event .BalanceChanged {
299
+ c .refetchOwnershipIfRecentTx (event .Key .Account , walletCommon .ChainID (event .Key .ChainID ), event .NewState .FetchedAt )
300
+ }
301
+ }
302
+ }
303
+ }
304
+ }()
271
305
}
272
306
273
- func (c * Controller ) startNetworkEventsWatcher () {
274
- if c .networksProvider == nil {
307
+ func (c * Controller ) startTransferDetectionWatcher () {
308
+ if c .transferDetectorPublisher == nil {
275
309
return
276
310
}
277
311
278
- ch , unsub := pubsub .Subscribe [network. EventActiveNetworksChanged ](c .networksProvider . GetPublisher () , 10 )
312
+ ch , unsub := pubsub .Subscribe [transferdetector. EventTransferDetectionFinished ](c .transferDetectorPublisher , 10 )
279
313
go func () {
280
314
defer gocommon .LogOnPanic ()
281
315
defer unsub ()
282
316
for {
283
317
select {
284
318
case <- c .stopCh :
285
319
return
286
- case _ , ok := <- ch :
320
+ case msg , ok := <- ch :
287
321
if ! ok {
288
322
return
289
323
}
290
- c .checkPeriodicalLoaders ()
324
+ for _ , event := range msg .Events {
325
+ switch event .EventKey {
326
+ case eventlog .ERC721Transfer :
327
+ unpackedEvent , ok := event .Unpacked .(erc721.Erc721Transfer )
328
+ if ! ok {
329
+ c .logger .Error ("failed to unpack ERC721Transfer event" )
330
+ continue
331
+ }
332
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
333
+ case eventlog .ERC1155TransferSingle :
334
+ unpackedEvent , ok := event .Unpacked .(erc1155.Erc1155TransferSingle )
335
+ if ! ok {
336
+ c .logger .Error ("failed to unpack ERC1155TransferSingle event" )
337
+ continue
338
+ }
339
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
340
+ case eventlog .ERC1155TransferBatch :
341
+ unpackedEvent , ok := event .Unpacked .(erc1155.Erc1155TransferBatch )
342
+ if ! ok {
343
+ c .logger .Error ("failed to unpack ERC1155TransferBatch event" )
344
+ continue
345
+ }
346
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
347
+ }
348
+ }
291
349
}
292
350
}
293
351
}()
294
352
}
295
353
296
- func (c * Controller ) refetchOwnershipIfRecentTransfer (account common.Address , chainID walletCommon.ChainID , latestTxTimestamp int64 ) {
354
+ func (c * Controller ) refetchOwnershipIfRelevantEvent (checkedAccounts []common.Address , eventFrom common.Address , eventTo common.Address , chainID uint64 , blockNumber uint64 ) {
355
+ for _ , address := range []common.Address {eventFrom , eventTo } {
356
+ if slices .Contains (checkedAccounts , address ) {
357
+ blockTime , err := c .blockChainStateProvider .GetEstimatedBlockTime (context .TODO (), chainID , blockNumber )
358
+ if err != nil {
359
+ c .logger .Error ("failed to get estimated block time" , zap .Error (err ))
360
+ continue
361
+ }
362
+ c .refetchOwnershipIfRecentTx (address , walletCommon .ChainID (chainID ), blockTime .Unix ())
363
+ }
364
+ }
365
+ }
366
+
367
+ func (c * Controller ) refetchOwnershipIfRecentTx (account common.Address , chainID walletCommon.ChainID , latestTxTimestamp int64 ) {
297
368
// Check last ownership update timestamp
298
369
timestamp , err := c .storage .GetOwnershipUpdateTimestamp (account , chainID )
299
370
0 commit comments