-
Notifications
You must be signed in to change notification settings - Fork 175
/
retry.go
142 lines (125 loc) · 4.17 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package backend
import (
"context"
"errors"
"fmt"
"sync"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state"
"github.com/onflow/flow-go/storage"
)
// retryFrequency has to be less than TransactionExpiry or else this module does nothing
const retryFrequency uint64 = 120 // Blocks
// Retry implements a simple retry mechanism for transaction submission.
type Retry struct {
mu sync.RWMutex
// pending Transactions
transactionByReferencBlockHeight map[uint64]map[flow.Identifier]*flow.TransactionBody
backend *Backend
active bool
log zerolog.Logger // default logger
}
func newRetry(log zerolog.Logger) *Retry {
return &Retry{
log: log,
transactionByReferencBlockHeight: map[uint64]map[flow.Identifier]*flow.TransactionBody{},
}
}
func (r *Retry) Activate() *Retry {
r.active = true
return r
}
func (r *Retry) IsActive() bool {
return r.active
}
func (r *Retry) SetBackend(b *Backend) *Retry {
r.backend = b
return r
}
// Retry attempts to resend transactions for a specified block height.
// It performs cleanup operations, including pruning old transactions, and retries sending
// transactions that are still pending.
// The method takes a block height as input. If the provided height is lower than
// flow.DefaultTransactionExpiry, no retries are performed, and the method returns nil.
// No errors expected during normal operations.
func (r *Retry) Retry(height uint64) error {
// No need to retry if height is lower than DefaultTransactionExpiry
if height < flow.DefaultTransactionExpiry {
return nil
}
// naive cleanup for now, prune every 120 Blocks
if height%retryFrequency == 0 {
r.prune(height)
}
heightToRetry := height - flow.DefaultTransactionExpiry + retryFrequency
for heightToRetry < height {
err := r.retryTxsAtHeight(heightToRetry)
if err != nil {
return err
}
heightToRetry = heightToRetry + retryFrequency
}
return nil
}
// RegisterTransaction adds a transaction that could possibly be retried
func (r *Retry) RegisterTransaction(height uint64, tx *flow.TransactionBody) {
r.mu.Lock()
defer r.mu.Unlock()
if r.transactionByReferencBlockHeight[height] == nil {
r.transactionByReferencBlockHeight[height] = make(map[flow.Identifier]*flow.TransactionBody)
}
r.transactionByReferencBlockHeight[height][tx.ID()] = tx
}
func (r *Retry) prune(height uint64) {
r.mu.Lock()
defer r.mu.Unlock()
// If height is less than the default, there will be no expired Transactions
if height < flow.DefaultTransactionExpiry {
return
}
for h := range r.transactionByReferencBlockHeight {
if h < height-flow.DefaultTransactionExpiry {
delete(r.transactionByReferencBlockHeight, h)
}
}
}
// retryTxsAtHeight retries transactions at a specific block height.
// It looks up transactions at the specified height and retries sending
// raw transactions for those that are still pending. It also cleans up
// transactions that are no longer pending or have an unknown status.
// Error returns:
// - errors are unexpected and potentially symptoms of internal implementation bugs or state corruption (fatal).
func (r *Retry) retryTxsAtHeight(heightToRetry uint64) error {
r.mu.Lock()
defer r.mu.Unlock()
txsAtHeight := r.transactionByReferencBlockHeight[heightToRetry]
for txID, tx := range txsAtHeight {
// find the block for the transaction
block, err := r.backend.lookupBlock(txID)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return err
}
block = nil
}
// find the transaction status
status, err := r.backend.deriveTransactionStatus(tx, false, block)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
return err
}
continue
}
if status == flow.TransactionStatusPending {
err = r.backend.SendRawTransaction(context.Background(), tx)
if err != nil {
r.log.Info().Str("retry", fmt.Sprintf("retryTxsAtHeight: %v", heightToRetry)).Err(err).Msg("failed to send raw transactions")
}
} else if status != flow.TransactionStatusUnknown {
// not pending or unknown, don't need to retry anymore
delete(txsAtHeight, txID)
}
}
return nil
}