forked from dymensionxyz/dymint
/
block.go
196 lines (167 loc) · 6.46 KB
/
block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package block
import (
"context"
"cosmossdk.io/errors"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmtypes "github.com/tendermint/tendermint/types"
)
// applyBlock applies the block to the store and the abci app.
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// As the entire process can't be atomic we need to make sure the following condition apply before
// we're applying the block in the happy path: block height - 1 == abci app last block height.
// In case the following doesn't hold true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
if block.Header.Height != m.store.Height()+1 {
// We crashed after the commit and before updating the store height.
m.logger.Error("Error getting block height")
m.prevBlock[block.Header.Height] = block
m.prevCommit[block.Header.Height] = commit
m.prevMetaData[block.Header.Height] = blockMetaData
return nil
}
m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source, "block cache", len(m.prevBlock))
m.p2pClient.BlockReceived(block)
// Check if alignment is needed due to incosistencies between the store and the app.
isAlignRequired, err := m.alignStoreWithApp(ctx, block)
if err != nil {
return err
}
if isAlignRequired {
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
if err != nil {
m.logger.Error("Failed to save block", "error", err)
return err
}
responses, err := m.executeBlock(ctx, block, commit)
if err != nil {
m.logger.Error("Failed to execute block", "error", err)
return err
}
newState, err := m.executor.UpdateStateFromResponses(responses, m.lastState, block)
if err != nil {
return err
}
batch := m.store.NewBatch()
batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return err
}
m.lastState = newState
batch, err = m.store.UpdateState(m.lastState, batch)
if err != nil {
batch.Discard()
return err
}
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
if err != nil {
batch.Discard()
return err
}
err = batch.Commit()
if err != nil {
m.logger.Error("Failed to persist batch to disk", "error", err)
return err
}
// Commit block to app
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("Failed to commit to the block", "error", err)
return err
}
// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
if err != nil {
m.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.store.Base()
_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
return err
}
m.lastState = newState
m.store.SetHeight(block.Header.Height)
cachedBlock, exists := m.prevBlock[m.store.Height()+1]
if exists {
m.applyBlock(ctx, cachedBlock, m.prevCommit[m.store.Height()+1], m.prevMetaData[m.store.Height()+1])
}
for k := range m.prevBlock {
if k <= block.Header.Height {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
delete(m.prevMetaData, k)
}
}
return nil
}
// alignStoreWithApp is responsible for aligning the state of the store and the abci app if necessary.
func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bool, error) {
isRequired := false
// Validate incosistency in height wasn't caused by a crash and if so handle it.
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return isRequired, errors.Wrap(err, "failed to get app info")
}
if uint64(proxyAppInfo.LastBlockHeight) != block.Header.Height {
return isRequired, nil
}
isRequired = true
m.logger.Info("Skipping block application and only updating store height and state hash", "height", block.Header.Height)
// update the state with the hash, last store height and last validators.
m.lastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.lastState.LastStoreHeight = block.Header.Height
m.lastState.LastValidators = m.lastState.Validators.Copy()
resp, err := m.store.LoadBlockResponses(block.Header.Height)
if err != nil {
return isRequired, errors.Wrap(err, "failed to load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
return isRequired, errors.Wrap(err, "failed to update state")
}
m.store.SetHeight(block.Header.Height)
return isRequired, nil
}
func (m *Manager) executeBlock(ctx context.Context, block *types.Block, commit *types.Commit) (*tmstate.ABCIResponses, error) {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()
if err := m.executor.Validate(m.lastState, block, commit, proposer); err != nil {
return &tmstate.ABCIResponses{}, err
}
responses, err := m.executor.Execute(ctx, m.lastState, block)
if err != nil {
return &tmstate.ABCIResponses{}, err
}
return responses, nil
}
func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
m.logger.Error("Failed to marshal block", "error", err)
return err
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
m.logger.Error("Failed to gossip block", "error", err)
return err
}
return nil
}