-
Notifications
You must be signed in to change notification settings - Fork 246
/
reactor.go
134 lines (118 loc) · 3.3 KB
/
reactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package wallet
import (
"context"
"errors"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/params"
)
// pow block on main chain is mined once per ~14 seconds
// but for tests we are using clique chain with immediate block signer
// hence we can use different polling periods for methods that depend on mining time.
func pollingPeriodByChain(chain *big.Int) time.Duration {
switch chain.Int64() {
case int64(params.MainNetworkID):
return 10 * time.Second
case int64(params.RopstenNetworkID):
return 4 * time.Second
default:
return 500 * time.Millisecond
}
}
func reorgSafetyDepth(chain *big.Int) *big.Int {
switch chain.Int64() {
case int64(params.MainNetworkID):
return big.NewInt(2)
case int64(params.RopstenNetworkID):
return big.NewInt(15)
default:
return big.NewInt(15)
}
}
var (
erc20BatchSize = big.NewInt(100000)
errAlreadyRunning = errors.New("already running")
)
// HeaderReader interface for reading headers using block number or hash.
type HeaderReader interface {
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}
type reactorClient interface {
HeaderReader
BalanceReader
}
// NewReactor creates instance of the Reactor.
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int, watchNewBlocks bool) *Reactor {
return &Reactor{
db: db,
client: client,
feed: feed,
chain: chain,
watchNewBlocks: watchNewBlocks,
}
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
watchNewBlocks bool
mu sync.Mutex
group *Group
}
func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
signer := types.NewEIP155Signer(r.chain)
ctl := &controlCommand{
db: r.db,
chain: r.chain,
client: r.client,
accounts: accounts,
eth: ÐTransferDownloader{
client: r.client,
accounts: accounts,
signer: signer,
db: r.db,
},
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth(r.chain),
watchNewBlocks: r.watchNewBlocks,
errorsCount: 0,
}
return ctl
}
// Start runs reactor loop in background.
func (r *Reactor) Start(accounts []common.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.group != nil {
return errAlreadyRunning
}
r.group = NewGroup(context.Background())
ctl := r.newControlCommand(accounts)
r.group.Add(ctl.Command())
return nil
}
// Stop stops reactor loop and waits till it exits.
func (r *Reactor) Stop() {
r.mu.Lock()
defer r.mu.Unlock()
if r.group == nil {
return
}
r.group.Stop()
r.group.Wait()
r.group = nil
}