/
plasma_data_source.go
102 lines (94 loc) · 3.66 KB
/
plasma_data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package derive
import (
"context"
"errors"
"fmt"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
// PlasmaDataSource is a data source that fetches inputs from a plasma DA provider given
// their onchain commitments. Same as CalldataSource it will keep attempting to fetch.
type PlasmaDataSource struct {
log log.Logger
src DataIter
fetcher PlasmaInputFetcher
l1 L1Fetcher
id eth.BlockID
// keep track of a pending commitment so we can keep trying to fetch the input.
comm plasma.Keccak256Commitment
}
func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
return &PlasmaDataSource{
log: log,
src: src,
fetcher: fetcher,
l1: l1,
id: id,
}
}
func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
// Process origin syncs the challenge contract events and updates the local challenge states
// before we can proceed to fetch the input data. This function can be called multiple times
// for the same origin and noop if the origin was already processed. It is also called if
// there is not commitment in the current origin.
if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil {
if errors.Is(err, plasma.ErrReorgRequired) {
return nil, NewResetError(fmt.Errorf("new expired challenge"))
}
return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err))
}
if s.comm == nil {
// the l1 source returns the input commitment for the batch.
data, err := s.src.Next(ctx)
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, NotEnoughData
}
// If the tx data type is not plasma, we forward it downstream to let the next
// steps validate and potentially parse it as L1 DA inputs.
if data[0] != plasma.TxDataVersion1 {
return data, nil
}
// validate batcher inbox data is a commitment.
comm, err := plasma.DecodeKeccak256(data[1:])
if err != nil {
s.log.Warn("invalid commitment", "commitment", data, "err", err)
return s.Next(ctx)
}
s.comm = comm
}
// use the commitment to fetch the input from the plasma DA provider.
data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
// GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager
// continued syncing origins detached from the pipeline origin.
if errors.Is(err, plasma.ErrReorgRequired) {
// challenge for a new previously derived commitment expired.
return nil, NewResetError(err)
} else if errors.Is(err, plasma.ErrExpiredChallenge) {
// this commitment was challenged and the challenge expired.
s.log.Warn("challenge expired, skipping batch", "comm", s.comm)
s.comm = nil
// skip the input
return s.Next(ctx)
} else if errors.Is(err, plasma.ErrMissingPastWindow) {
return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err))
} else if errors.Is(err, plasma.ErrPendingChallenge) {
// continue stepping without slowing down.
return nil, NotEnoughData
} else if err != nil {
// return temporary error so we can keep retrying.
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err))
}
// inputs are limited to a max size to ensure they can be challenged in the DA contract.
if len(data) > plasma.MaxInputSize {
s.log.Warn("input data exceeds max size", "size", len(data), "max", plasma.MaxInputSize)
s.comm = nil
return s.Next(ctx)
}
// reset the commitment so we can fetch the next one from the source at the next iteration.
s.comm = nil
return data, nil
}