-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
chain_id_sub.go
95 lines (80 loc) · 2.02 KB
/
chain_id_sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package client
import (
"math/big"
"github.com/ethereum/go-ethereum"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
var _ ethereum.Subscription = &chainIDSubForwarder{}
// chainIDSubForwarder wraps a head subscription in order to intercept and augment each head with chainID before forwarding.
type chainIDSubForwarder struct {
chainID *big.Int
destCh chan<- *evmtypes.Head
srcCh chan *evmtypes.Head
srcSub ethereum.Subscription
done chan struct{}
err chan error
unSub chan struct{}
}
func newChainIDSubForwarder(chainID *big.Int, ch chan<- *evmtypes.Head) *chainIDSubForwarder {
return &chainIDSubForwarder{
chainID: chainID,
destCh: ch,
srcCh: make(chan *evmtypes.Head),
done: make(chan struct{}),
err: make(chan error),
unSub: make(chan struct{}, 1),
}
}
// start spawns the forwarding loop for sub.
func (c *chainIDSubForwarder) start(sub ethereum.Subscription, err error) error {
if err != nil {
close(c.srcCh)
return err
}
c.srcSub = sub
go c.forwardLoop()
return nil
}
// forwardLoop receives from src, adds the chainID, and then sends to dest.
// It also handles Unsubscribing, which may interrupt either forwarding operation.
func (c *chainIDSubForwarder) forwardLoop() {
// the error channel must be closed when unsubscribing
defer close(c.err)
defer close(c.done)
for {
select {
case err := <-c.srcSub.Err():
select {
case c.err <- err:
case <-c.unSub:
c.srcSub.Unsubscribe()
}
return
case h := <-c.srcCh:
h.EVMChainID = utils.NewBig(c.chainID)
select {
case c.destCh <- h:
case <-c.unSub:
c.srcSub.Unsubscribe()
return
}
case <-c.unSub:
c.srcSub.Unsubscribe()
return
}
}
}
func (c *chainIDSubForwarder) Unsubscribe() {
// tell forwardLoop to unsubscribe
select {
case c.unSub <- struct{}{}:
default:
// already triggered
}
// wait for forwardLoop to complete
<-c.done
}
func (c *chainIDSubForwarder) Err() <-chan error {
return c.err
}