-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction_downloader.go
218 lines (182 loc) · 6.39 KB
/
transaction_downloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package relay
import (
"context"
"errors"
"sync"
"time"
"github.com/warp-contracts/syncer/src/utils/arweave"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/smartweave"
"github.com/warp-contracts/syncer/src/utils/task"
"github.com/cenkalti/backoff/v4"
)
// Fills in transactions for a given block
type TransactionDownloader struct {
*task.Task
client *arweave.Client
monitor monitoring.Monitor
input chan *Payload
Output chan *Payload
}
// Using Arweave client periodically checks for blocks of transactions
func NewTransactionDownloader(config *config.Config) (self *TransactionDownloader) {
self = new(TransactionDownloader)
self.Output = make(chan *Payload)
self.Task = task.NewTask(config, "transaction-downloader").
WithSubtaskFunc(self.run).
WithWorkerPool(config.TransactionDownloader.NumWorkers, config.TransactionDownloader.WorkerQueueSize).
WithOnAfterStop(func() {
close(self.Output)
})
return
}
func (self *TransactionDownloader) WithMonitor(monitor monitoring.Monitor) *TransactionDownloader {
self.monitor = monitor
return self
}
func (self *TransactionDownloader) WithClient(client *arweave.Client) *TransactionDownloader {
self.client = client
return self
}
func (self *TransactionDownloader) WithInputChannel(v chan *Payload) *TransactionDownloader {
self.input = v
return self
}
// Downloads one transaction, handles automatic retries and validation
func (self *TransactionDownloader) downloadOne(txId string) (out *arweave.Transaction, err error) {
err = task.NewRetry().
WithContext(self.Ctx).
WithMaxElapsedTime(0 /* never give up */).
WithMaxInterval(self.Config.Relayer.ArweaveBlockDownloadMaxInterval).
WithAcceptableDuration(self.Config.Relayer.ArweaveBlockDownloadMaxInterval * 3).
WithOnError(func(err error, isDurationAcceptable bool) error {
self.Log.WithError(err).WithField("txId", txId).Warn("Failed to download transaction, retrying after timeout")
if errors.Is(err, context.Canceled) && self.IsStopping.Load() {
// Stopping
return backoff.Permanent(err)
}
self.monitor.GetReport().TransactionDownloader.Errors.Download.Inc()
if errors.Is(err, arweave.ErrPending) {
// https://docs.arweave.org/developers/server/http-api#undefined-4
// This is a temporary error, after some time the transaction will be available
time.Sleep(time.Second)
return err
}
if errors.Is(err, arweave.ErrOverspend) {
// This is a permanent error
return backoff.Permanent(err)
}
if !isDurationAcceptable {
// This will completly reset the HTTP client and possibly help in solving the problem
self.client.Reset()
}
return err
}).
Run(func() error {
out, err = self.client.GetTransactionById(self.Ctx, txId)
if err != nil {
self.Log.WithField("txId", txId).Error("Failed to download transaction")
return err
}
if smartweave.IsInteractionWithData(out) {
buf, err := self.client.GetTransactionDataById(self.Ctx, out)
if err != nil {
self.Log.WithField("txId", txId).Error("Failed to download transaction data")
self.monitor.GetReport().TransactionDownloader.Errors.DataDownload.Inc()
return err
}
out.Data = arweave.Base64String(buf.Bytes())
}
// Check if transaction is a valid interaction
isInteraction, err := smartweave.ValidateInteraction(out, true /* input from tx's data is enabled */)
if !isInteraction {
err = errors.New("tx is not an interaction")
}
if err != nil {
self.Log.WithField("txId", txId).WithError(err).Error("Transaction is not a valid interaction")
return err
}
// Verify transaction signature.
// Peer might be malicious and send us invalid transaction for this id
err = out.Verify()
if err != nil {
self.Log.WithField("txId", txId).Error("Transaction failed to verify, retry downloading...")
self.monitor.GetReport().TransactionDownloader.Errors.Validation.Inc()
return err
}
return err
})
if err != nil {
// Permanent error
self.monitor.GetReport().TransactionDownloader.Errors.PermanentDownloadFailure.Inc()
self.Log.WithError(err).WithField("txId", txId).Error("Failed to download proper transaction, giving up")
return
}
// Update metrics
self.monitor.GetReport().TransactionDownloader.State.TransactionsDownloaded.Inc()
return
}
func (self *TransactionDownloader) downloadTransactions(block *ArweaveBlock) (out []*arweave.Transaction, err error) {
if len(block.Message.Transactions) == 0 {
// Skip, we'll only update info about the arweave block
return
}
self.Log.WithField("arweave_height", block.Block.Height).WithField("len", len(block.Message.Transactions)).Debug("Start downloading transactions...")
defer self.Log.WithField("arweave_height", block.Block.Height).Debug("...Stopped downloading transactions")
// Sync between workers
var mtx sync.Mutex
var wg sync.WaitGroup
wg.Add(len(block.Message.Transactions))
out = make([]*arweave.Transaction, len(block.Message.Transactions))
for i, txInfo := range block.Message.Transactions {
txInfo := txInfo
i := i
self.SubmitToWorker(func() {
tx, errOne := self.downloadOne(txInfo.Transaction.Id)
mtx.Lock()
if errOne != nil {
err = errOne
} else {
out[i] = tx
}
mtx.Unlock()
wg.Done()
})
}
// Wait for workers to finish
wg.Wait()
return
}
// Fills in transactions for arweave blocks in payload
func (self *TransactionDownloader) run() (err error) {
for payload := range self.input {
// Download transactions one by one using TransactionDownloader
for i, arweaveBlock := range payload.ArweaveBlocks {
transactions, err := self.downloadTransactions(arweaveBlock)
if err != nil {
if self.IsStopping.Load() {
// Neglect those transactions, we're stopping anyway
return nil
}
self.Log.WithError(err).
WithField("sequencer_height", payload.SequencerBlockHeight).
WithField("arweave_height", arweaveBlock.Block.Height).
Error("Failed to one of the transactions")
// Stop everything
// We can't neglect missing transactions
panic(err)
}
payload.ArweaveBlocks[i].Transactions = transactions
self.Log.WithField("hash", arweaveBlock.Message.BlockInfo.Hash).
Info("Downloaded transactions from one arweave block")
}
// Arweave blocks filled
select {
case <-self.Ctx.Done():
return nil
case self.Output <- payload:
}
}
return nil
}