Skip to content

Commit

Permalink
Merge pull request tendermint#68 from binance-chain/state_sync_warp
Browse files Browse the repository at this point in the history
[R4R]parity warp-like state sync
  • Loading branch information
ackratos committed May 27, 2019
2 parents ef181dd + dd82029 commit 801964e
Show file tree
Hide file tree
Showing 21 changed files with 1,535 additions and 974 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ type Client interface {
BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)

LatestSnapshot() (height int64, numKeys []int64, err error)
ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error)
StartRecovery(height int64, numKeys []int64) error
WriteRecoveryChunk(chunk [][]byte) error
EndRecovery(height int64) error
StartRecovery(manifest *types.Manifest) error
WriteRecoveryChunk(hash types.SHA256Sum, chunk *types.AppStateChunk, isComplete bool) error
}

//----------------------------------------
Expand Down
14 changes: 3 additions & 11 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,10 @@ func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.Respon
return reqres.Response.GetEndBlock(), cli.Error()
}

func (cli *grpcClient) LatestSnapshot() (height int64, numKeys []int64, err error) {
return 0, make([]int64, 0), nil
}
func (cli *grpcClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
return make([][]byte, 0), nil
}
func (cli *grpcClient) StartRecovery(height int64, numKeys []int64) error {
func (cli *grpcClient) StartRecovery(manifest *types.Manifest) error {
return nil
}
func (cli *grpcClient) WriteRecoveryChunk(chunk [][]byte) error {
return nil
}
func (cli *grpcClient) EndRecovery(height int64) error {

func (cli *grpcClient) WriteRecoveryChunk(hash types.SHA256Sum, chunk *types.AppStateChunk, isComplete bool) error {
return nil
}
29 changes: 4 additions & 25 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,39 +168,18 @@ func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
)
}

func (app *localClient) LatestSnapshot() (height int64, numKeys []int64, err error) {
func (app *localClient) StartRecovery(manifest *types.Manifest) error {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.LatestSnapshot()
return app.Application.StartRecovery(manifest)
}

func (app *localClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
func (app *localClient) WriteRecoveryChunk(hash types.SHA256Sum, chunk *types.AppStateChunk, isComplete bool) error {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.ReadSnapshotChunk(height, startIndex, endIndex)
}

func (app *localClient) StartRecovery(height int64, numKeys []int64) error {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.StartRecovery(height, numKeys)
}

func (app *localClient) WriteRecoveryChunk(chunk [][]byte) error {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.WriteRecoveryChunk(chunk)
}

func (app *localClient) EndRecovery(height int64) error {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.EndRecovery(height)
return app.Application.WriteRecoveryChunk(hash, chunk, isComplete)
}

//-------------------------------------------------------
Expand Down
13 changes: 2 additions & 11 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,10 @@ func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.Respons

//----------------------------------------

func (cli *socketClient) LatestSnapshot() (height int64, numKeys []int64, err error) {
return 0, make([]int64, 0), nil
}
func (cli *socketClient) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
return make([][]byte, 0), nil
}
func (cli *socketClient) StartRecovery(height int64, numKeys []int64) error {
return nil
}
func (cli *socketClient) WriteRecoveryChunk(chunk [][]byte) error {
func (cli *socketClient) StartRecovery(manifest *types.Manifest) error {
return nil
}
func (cli *socketClient) EndRecovery(height int64) error {
func (cli *socketClient) WriteRecoveryChunk(hash types.SHA256Sum, chunk *types.AppStateChunk, isComplete bool) error {
return nil
}

Expand Down
16 changes: 2 additions & 14 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,11 @@ func (app *PersistentKVStoreApplication) EndBlock(req types.RequestEndBlock) typ
return types.ResponseEndBlock{ValidatorUpdates: app.ValUpdates}
}

func (app *PersistentKVStoreApplication) LatestSnapshot() (height int64, numKeys []int64, err error) {
return 0, make([]int64, 0), nil
}

func (app *PersistentKVStoreApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
return make([][]byte, 0), nil
}

func (app *PersistentKVStoreApplication) StartRecovery(height int64, numKeys []int64) error {
return nil
}

func (app *PersistentKVStoreApplication) WriteRecoveryChunk(chunk [][]byte) error {
func (app *PersistentKVStoreApplication) StartRecovery(manifest *types.Manifest) error {
return nil
}

func (app *PersistentKVStoreApplication) EndRecovery(height int64) error {
func (app *PersistentKVStoreApplication) WriteRecoveryChunk(hash types.SHA256Sum, chunk *types.AppStateChunk, isComplete bool) error {
return nil
}

Expand Down
45 changes: 5 additions & 40 deletions abci/types/application.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package types // nolint: goimports

import (
context "golang.org/x/net/context"
"golang.org/x/net/context"
)

// Application is an interface that enables any finite, deterministic state machine
Expand All @@ -26,11 +26,8 @@ type Application interface {
Commit() ResponseCommit // Commit the state and return the application Merkle root hash

// State Connection
LatestSnapshot() (height int64, numKeys []int64, err error) // query application state height and numOfKeys
ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error)
StartRecovery(height int64, numKeys []int64) error
WriteRecoveryChunk(chunk [][]byte) error
EndRecovery(height int64) error
StartRecovery(manifest *Manifest) error
WriteRecoveryChunk(hash SHA256Sum, chunk *AppStateChunk, isComplete bool) error
}

//-------------------------------------------------------
Expand Down Expand Up @@ -85,23 +82,11 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock {
return ResponseEndBlock{}
}

func (BaseApplication) LatestSnapshot() (height int64, numKeys []int64, err error) {
return 0, make([]int64, 0), nil
}

func (BaseApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
return make([][]byte, 0), nil
}

func (BaseApplication) StartRecovery(height int64, numKeys []int64) error {
return nil
}

func (BaseApplication) WriteRecoveryChunk(chunk [][]byte) error {
func (BaseApplication) StartRecovery(manifest *Manifest) error {
return nil
}

func (BaseApplication) EndRecovery(height int64) error {
func (BaseApplication) WriteRecoveryChunk(hash SHA256Sum, chunk *AppStateChunk, isComplete bool) error {
return nil
}

Expand Down Expand Up @@ -168,23 +153,3 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock)
res := app.app.EndBlock(*req)
return &res, nil
}

func (app *GRPCApplication) LatestSnapshot() (height int64, numKeys []int64, err error) {
return 0, make([]int64, 0), nil
}

func (app *GRPCApplication) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk map[string][][]byte, err error) {
return make(map[string][][]byte, 0), nil
}

func (app *GRPCApplication) StartRecovery(height int64, numKeys []int64) error {
return nil
}

func (app *GRPCApplication) WriteRecoveryChunk(storeName string, chunk [][]byte) error {
return nil
}

func (app *GRPCApplication) EndRecovery(height int64) error {
return nil
}
179 changes: 179 additions & 0 deletions abci/types/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package types

import (
"crypto/sha256"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
)

const (
ManifestVersion int32 = 0
ChunkPayloadMaxBytes int = 4 * 1024 * 1024 // 4M before compression
snapshotDir string = "snapshot"
finalizedDir string = "current"
restorationDir string = "restoration"
manifestFileName string = "MANIFEST"
)

// AppStateChunk completeness enums
// We don't use enum because amino doesn't support encode enum
const (
Complete uint8 = iota
InComplete_First
InComplete_Mid
InComplete_Last
)

type SHA256Sum [sha256.Size]byte // check sum of chunk

type Manifest struct {
Version int32 // snapshot Version

Height int64 // the height of this snapshot
StateHashes []SHA256Sum // hashes of state chunks
AppStateHashes []SHA256Sum // hashes of app state chunks
BlockHashes []SHA256Sum // Block hashes
NumKeys []int64 // num of keys for each substore, this sacrifices clear boundary between cosmos and tendermint, saying tendermint knows applicaction db might organized by substores. But reduce network/disk pressure that each chunk must has a field indicates what's his store
}

func NewManifest(
height int64,
stateHashes []SHA256Sum,
appStateHashes []SHA256Sum,
blockHashes []SHA256Sum,
numKeys []int64) Manifest {
return Manifest{
ManifestVersion,
height,
stateHashes,
appStateHashes,
blockHashes,
numKeys,
}
}

type SnapshotChunk interface{}

type StateChunk struct {
Statepart []byte
}

/**
* Completeness:
* 0 - all nodes in Nodes field are complete in this chunk
* 1 - last node in Nodes field is not complete, and its the first node of following incomplete chunks sequence
* 2 - there is only one incomplete part of node in Nodes which follows previous chunk
* 3 - there is only one incomplete part of node in Nodes and its the last part of previous chunks
*/
type AppStateChunk struct {
StartIdx int64 // compare (startIdx and number of complete nodes) against (Manifest.NumKeys) we know each node's substore
Completeness uint8 // flag of completeness of this chunk, not enum because of go-amino doesn't support enum encoding
Nodes [][]byte // iavl tree serialized node, one big node (i.e. active orders and orderbook) might be split into different chunks (complete is flag to indicate that), ordering is ensured by list on manifest
}

// TODO: should we make block chunk to be blockpart?
// current max block size is 100M while normal chunk should be less than 4M
// but as blocks usually not big
// so we don't split block just like blockchaind reactor (in fast sync)
type BlockChunk struct {
Block []byte // amino encoded block
SeenCommit []byte // amino encoded Commit - we need this because Block only keep LastSeenCommit, for commit of this block, we need load it in same way it was saved
}

// read snapshot manifest and chunks from disk
type SnapshotReader struct {
Height int64
DbDir string
}

func (reader *SnapshotReader) Load(hash SHA256Sum) ([]byte, error) {
return reader.loadImpl(hash, finalizedDir)
}

func (reader *SnapshotReader) LoadFromRestoration(hash SHA256Sum) ([]byte, error) {
return reader.loadImpl(hash, restorationDir)
}

func (reader *SnapshotReader) loadImpl(hash SHA256Sum, category string) ([]byte, error) {
toRead := filepath.Join(reader.DbDir, snapshotDir, strconv.FormatInt(reader.Height, 10), category, fmt.Sprintf("%x", hash))
return ioutil.ReadFile(toRead)
}

func (reader *SnapshotReader) LoadManifest(height int64) (int64, []byte, error) {
var lookupHeight int64
if height == 0 {
lookupHeight = reader.Height
} else {
lookupHeight = height
}

if lookupHeight == 0 {
return 0, nil, fmt.Errorf("requested wrong height: %d, reader height: %d", height, reader.Height)
} else {
toRead := filepath.Join(reader.DbDir, snapshotDir, strconv.FormatInt(lookupHeight, 10), finalizedDir, manifestFileName)
manifest, err := ioutil.ReadFile(toRead)
return lookupHeight, manifest, err
}
}

func (reader *SnapshotReader) IsFinalized() bool {
toCheck := filepath.Join(reader.DbDir, snapshotDir, strconv.FormatInt(reader.Height, 10), finalizedDir)
_, err := os.Stat(toCheck)
return err == nil
}

func (reader *SnapshotReader) InitSnapshotHeight() int64 {
var latestHeight int64

toTraverse := filepath.Join(reader.DbDir, snapshotDir)
if files, err := ioutil.ReadDir(toTraverse); err == nil {
for _, f := range files {
if f.IsDir() {
if height, err := strconv.ParseInt(f.Name(), 10, 64); err == nil && height > latestHeight {
if _, err := os.Stat(filepath.Join(toTraverse, f.Name(), finalizedDir)); err == nil {
latestHeight = height
}
}
}
}
}

reader.Height = latestHeight
return latestHeight
}

// write snapshot manifest and chunks to disk
type SnapshotWriter struct {
Height int64
DbDir string
}

func (writer *SnapshotWriter) Write(hash SHA256Sum, chunk []byte) error {
path := filepath.Join(writer.DbDir, snapshotDir, strconv.FormatInt(writer.Height, 10), restorationDir)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
toWrite := filepath.Join(path, fmt.Sprintf("%x", hash))
return ioutil.WriteFile(toWrite, chunk, 0644)
}

func (writer *SnapshotWriter) WriteManifest(manifest []byte) error {
path := filepath.Join(writer.DbDir, snapshotDir, strconv.FormatInt(writer.Height, 10), restorationDir)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
toWrite := filepath.Join(path, manifestFileName)
return ioutil.WriteFile(toWrite, manifest, 0644)
}

func (writer *SnapshotWriter) Finalize() error {
baseDir := filepath.Join(writer.DbDir, snapshotDir, strconv.FormatInt(writer.Height, 10))
return os.Rename(filepath.Join(baseDir, restorationDir), filepath.Join(baseDir, finalizedDir))
}

func (writer *SnapshotWriter) Delete() error {
return os.RemoveAll(filepath.Join(writer.DbDir, snapshotDir, strconv.FormatInt(writer.Height, 10)))
}
Loading

0 comments on commit 801964e

Please sign in to comment.