/
buffered_txmgr.go
123 lines (106 loc) · 2.78 KB
/
buffered_txmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package txmgr
import (
"context"
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
type BufferedTxManager struct {
SimpleTxManager // directly embed
wg sync.WaitGroup
txRequestChan chan *TxRequest
ctx context.Context
cancel context.CancelFunc
}
type TxRequest struct {
ctx context.Context
txCandidate *TxCandidate
responseChan chan *TxResponse
}
type TxResponse struct {
Receipt *types.Receipt
Err error
}
func NewBufferedTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLIConfig) (*BufferedTxManager, error) {
simpleTxManager, err := NewSimpleTxManager(name, l, m, cfg)
if err != nil {
return nil, err
}
return &BufferedTxManager{
SimpleTxManager: *simpleTxManager,
}, nil
}
func (m *BufferedTxManager) Start(ctx context.Context) error {
m.txRequestChan = make(chan *TxRequest, m.Config.TxBufferSize)
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(1)
go m.listen(m.ctx)
return nil
}
func (m *BufferedTxManager) Stop() error {
m.cancel()
m.wg.Wait()
close(m.txRequestChan)
return nil
}
func (m *BufferedTxManager) listen(ctx context.Context) {
defer m.wg.Done()
for {
select {
case txRequest := <-m.txRequestChan:
txReceipt, err := m.Send(txRequest.ctx, *txRequest.txCandidate)
if err != nil {
m.l.Error("failed to send transaction in buffered tx manager", "err", err)
}
txRequest.responseChan <- &TxResponse{txReceipt, err}
case <-ctx.Done():
return
}
}
}
func (m *BufferedTxManager) submitTransaction(ctx context.Context, txCandidate *TxCandidate) *TxResponse {
responseChan := make(chan *TxResponse)
defer close(responseChan)
txRequest := &TxRequest{
ctx: ctx,
txCandidate: txCandidate,
responseChan: responseChan,
}
if !m.tryEnqueue(txRequest) {
return &TxResponse{
nil, errors.New("submit transaction failed in tryEnqueue"),
}
}
return txRequest.waitForResponse()
}
func (m *BufferedTxManager) SendTxCandidate(ctx context.Context, txCandidate *TxCandidate) *TxResponse {
return m.submitTransaction(ctx, txCandidate)
}
func (m *BufferedTxManager) SendTransaction(ctx context.Context, tx *types.Transaction) *TxResponse {
return m.SendTxCandidate(ctx, &TxCandidate{
TxData: tx.Data(),
To: tx.To(),
GasLimit: 0,
})
}
func (m *BufferedTxManager) tryEnqueue(txRequest *TxRequest) bool {
select {
case m.txRequestChan <- txRequest:
return true
default:
return false
}
}
func (r *TxRequest) waitForResponse() *TxResponse {
for {
select {
case response := <-r.responseChan:
return response
case <-r.ctx.Done():
return &TxResponse{Err: fmt.Errorf("context cancelled in WaitForResponse: %w", r.ctx.Err())}
}
}
}