-
Notifications
You must be signed in to change notification settings - Fork 199
/
provider_state_tracker.go
172 lines (146 loc) · 8.48 KB
/
provider_state_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package statetracker
import (
"context"
"time"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/metrics"
"github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager"
updaters "github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
protocoltypes "github.com/lavanet/lava/x/protocol/types"
)
// ProviderStateTracker PST is a class for tracking provider data from the lava blockchain, such as epoch changes.
// it allows also to query specific data form the blockchain and acts as a single place to send transactions
type ProviderStateTracker struct {
stateQuery *updaters.ProviderStateQuery
txSender *ProviderTxSender
*StateTracker
*EmergencyTracker
}
func NewProviderStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ProviderMetricsManager) (ret *ProviderStateTracker, err error) {
emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics)
stateTrackerBase, err := NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback)
if err != nil {
return nil, err
}
txSender, err := NewProviderTxSender(ctx, clientCtx, txFactory)
if err != nil {
return nil, err
}
pst := &ProviderStateTracker{
StateTracker: stateTrackerBase,
stateQuery: updaters.NewProviderStateQuery(ctx, clientCtx),
txSender: txSender,
EmergencyTracker: emergencyTracker,
}
pst.RegisterForEpochUpdates(ctx, emergencyTracker)
err = pst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker)
return pst, err
}
func (pst *ProviderStateTracker) RegisterForEpochUpdates(ctx context.Context, epochUpdatable updaters.EpochUpdatable) {
epochUpdater := updaters.NewEpochUpdater(&pst.stateQuery.EpochStateQuery)
epochUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, epochUpdater)
epochUpdater, ok := epochUpdaterRaw.(*updaters.EpochUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: epochUpdaterRaw})
}
epochUpdater.RegisterEpochUpdatable(ctx, epochUpdatable, 0) // adding 0 delay for provider updater
}
func (pst *ProviderStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error {
// register for spec updates sets spec and updates when a spec has been modified
specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, pst.stateQuery, pst.EventTracker)
specUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, specUpdater)
specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: specUpdaterRaw})
}
return specUpdater.RegisterSpecUpdatable(ctx, &specUpdatable, endpoint)
}
func (pst *ProviderStateTracker) RegisterForSpecVerifications(ctx context.Context, specVerifier updaters.SpecVerifier, chainId string) error {
// register for spec verifications sets spec and verifies when a spec has been modified
specUpdater := updaters.NewSpecUpdater(chainId, pst.stateQuery, pst.EventTracker)
specUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, specUpdater)
specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForSpecVerifications", nil, utils.Attribute{Key: "updater", Value: specUpdaterRaw})
}
return specUpdater.RegisterSpecVerifier(ctx, &specVerifier, chainId)
}
func (pst *ProviderStateTracker) RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) {
versionUpdater := updaters.NewVersionUpdater(pst.stateQuery, pst.EventTracker, version, versionValidator)
versionUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, versionUpdater)
versionUpdater, ok := versionUpdaterRaw.(*updaters.VersionUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: versionUpdaterRaw})
}
versionUpdater.RegisterVersionUpdatable()
}
func (pst *ProviderStateTracker) RegisterReliabilityManagerForVoteUpdates(ctx context.Context, voteUpdatable updaters.VoteUpdatable, endpointP *lavasession.RPCProviderEndpoint) {
voteUpdater := updaters.NewVoteUpdater(pst.EventTracker)
voteUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, voteUpdater)
voteUpdater, ok := voteUpdaterRaw.(*updaters.VoteUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: voteUpdaterRaw})
}
endpoint := lavasession.RPCEndpoint{ChainID: endpointP.ChainID, ApiInterface: endpointP.ApiInterface}
voteUpdater.RegisterVoteUpdatable(ctx, &voteUpdatable, endpoint)
}
func (pst *ProviderStateTracker) RegisterPaymentUpdatableForPayments(ctx context.Context, paymentUpdatable updaters.PaymentUpdatable) {
paymentUpdater := updaters.NewPaymentUpdater(pst.EventTracker)
paymentUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, paymentUpdater)
paymentUpdater, ok := paymentUpdaterRaw.(*updaters.PaymentUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: paymentUpdaterRaw})
}
paymentUpdater.RegisterPaymentUpdatable(ctx, &paymentUpdatable)
}
func (pst *ProviderStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error {
// register for downtimeParams updates sets downtimeParams and updates when downtimeParams has been changed
downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(pst.stateQuery, pst.EventTracker)
downtimeParamsUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, downtimeParamsUpdater)
downtimeParamsUpdater, ok := downtimeParamsUpdaterRaw.(*updaters.DowntimeParamsUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: downtimeParamsUpdaterRaw})
}
return downtimeParamsUpdater.RegisterDowntimeParamsUpdatable(ctx, &downtimeParamsUpdatable)
}
func (pst *ProviderStateTracker) TxRelayPayment(ctx context.Context, relayRequests []*pairingtypes.RelaySession, description string, latestBlocks []*pairingtypes.LatestBlockReport) error {
return pst.txSender.TxRelayPayment(ctx, relayRequests, description, latestBlocks)
}
func (pst *ProviderStateTracker) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error {
return pst.txSender.SendVoteReveal(voteID, vote)
}
func (pst *ProviderStateTracker) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error {
return pst.txSender.SendVoteCommitment(voteID, vote)
}
func (pst *ProviderStateTracker) LatestBlock() int64 {
return pst.StateTracker.chainTracker.GetAtomicLatestBlockNum()
}
func (pst *ProviderStateTracker) GetMaxCuForUser(ctx context.Context, consumerAddress, chainID string, epoch uint64) (maxCu uint64, err error) {
return pst.stateQuery.GetMaxCuForUser(ctx, consumerAddress, chainID, epoch)
}
func (pst *ProviderStateTracker) VerifyPairing(ctx context.Context, consumerAddress, providerAddress string, epoch uint64, chainID string) (valid bool, total int64, projectId string, err error) {
return pst.stateQuery.VerifyPairing(ctx, consumerAddress, providerAddress, epoch, chainID)
}
func (pst *ProviderStateTracker) GetEpochSize(ctx context.Context) (uint64, error) {
return pst.stateQuery.GetEpochSize(ctx)
}
func (pst *ProviderStateTracker) EarliestBlockInMemory(ctx context.Context) (uint64, error) {
return pst.stateQuery.EarliestBlockInMemory(ctx)
}
func (pst *ProviderStateTracker) GetRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error) {
return pst.stateQuery.GetRecommendedEpochNumToCollectPayment(ctx)
}
func (pst *ProviderStateTracker) GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error) {
return pst.stateQuery.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx)
}
func (pst *ProviderStateTracker) GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error) {
return pst.stateQuery.GetProtocolVersion(ctx)
}
func (pst *ProviderStateTracker) GetAverageBlockTime() time.Duration {
return pst.StateTracker.GetAverageBlockTime()
}