Skip to content

Commit

Permalink
rpc: add chunked rpc interface (#6445)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed May 24, 2021
1 parent 7b5a732 commit d913406
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 1 deletion.
9 changes: 9 additions & 0 deletions light/proxy/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
"net_info": rpcserver.NewRPCFunc(makeNetInfoFunc(c), "", false),
"blockchain": rpcserver.NewRPCFunc(makeBlockchainInfoFunc(c), "minHeight,maxHeight", true),
"genesis": rpcserver.NewRPCFunc(makeGenesisFunc(c), "", true),
"genesis_chunked": rpcserver.NewRPCFunc(makeGenesisChunkedFunc(c), "", true),
"block": rpcserver.NewRPCFunc(makeBlockFunc(c), "height", true),
"block_by_hash": rpcserver.NewRPCFunc(makeBlockByHashFunc(c), "hash", true),
"block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height", true),
Expand Down Expand Up @@ -92,6 +93,14 @@ func makeGenesisFunc(c *lrpc.Client) rpcGenesisFunc {
}
}

type rpcGenesisChunkedFunc func(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error)

func makeGenesisChunkedFunc(c *lrpc.Client) rpcGenesisChunkedFunc {
return func(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) {
return c.GenesisChunked(ctx.Context(), chunk)
}
}

type rpcBlockFunc func(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlock, error)

func makeBlockFunc(c *lrpc.Client) rpcBlockFunc {
Expand Down
4 changes: 4 additions & 0 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ func (c *Client) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) {
return c.next.Genesis(ctx)
}

func (c *Client) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) {
return c.next.GenesisChunked(ctx, id)
}

// Block calls rpcclient#Block and then verifies the result.
func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
res, err := c.next.Block(ctx, height)
Expand Down
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,10 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) {
}
rpcCoreEnv.PubKey = pubKey
}
if err := rpcCoreEnv.InitGenesisChunks(); err != nil {
return nil, err
}

return &rpcCoreEnv, nil
}

Expand Down
9 changes: 9 additions & 0 deletions rpc/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,15 @@ func (c *baseRPCClient) Genesis(ctx context.Context) (*ctypes.ResultGenesis, err
return result, nil
}

func (c *baseRPCClient) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) {
result := new(ctypes.ResultGenesisChunk)
_, err := c.caller.Call(ctx, "genesis_chunked", map[string]interface{}{"chunk": id}, result)
if err != nil {
return nil, err
}
return result, nil
}

func (c *baseRPCClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
result := new(ctypes.ResultBlock)
params := make(map[string]interface{})
Expand Down
1 change: 1 addition & 0 deletions rpc/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type SignClient interface {
// HistoryClient provides access to data from genesis to now in large chunks.
type HistoryClient interface {
Genesis(context.Context) (*ctypes.ResultGenesis, error)
GenesisChunked(context.Context, uint) (*ctypes.ResultGenesisChunk, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error)
}

Expand Down
4 changes: 4 additions & 0 deletions rpc/client/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func (c *Local) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) {
return c.env.Genesis(c.ctx)
}

func (c *Local) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) {
return c.env.GenesisChunked(c.ctx, id)
}

func (c *Local) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
return c.env.Block(c.ctx, height)
}
Expand Down
25 changes: 24 additions & 1 deletion rpc/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions rpc/client/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client_test

import (
"context"
"encoding/base64"
"fmt"
"math"
"net/http"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
mempl "github.com/tendermint/tendermint/mempool"
Expand Down Expand Up @@ -193,6 +195,31 @@ func TestGenesisAndValidators(t *testing.T) {
}
}

func TestGenesisChunked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, c := range GetClients(t, NodeSuite(t)) {
first, err := c.GenesisChunked(ctx, 0)
require.NoError(t, err)

decoded := make([]string, 0, first.TotalChunks)
for i := 0; i < first.TotalChunks; i++ {
chunk, err := c.GenesisChunked(ctx, uint(i))
require.NoError(t, err)
data, err := base64.StdEncoding.DecodeString(chunk.Data)
require.NoError(t, err)
decoded = append(decoded, string(data))

}
doc := []byte(strings.Join(decoded, ""))

var out types.GenesisDoc
require.NoError(t, tmjson.Unmarshal(doc, &out),
"first: %+v, doc: %s", first, string(doc))
}
}

func TestABCIQuery(t *testing.T) {
for i, c := range GetClients(t, NodeSuite(t)) {
// write something
Expand Down
38 changes: 38 additions & 0 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package core

import (
"encoding/base64"
"fmt"
"time"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
Expand All @@ -25,6 +27,10 @@ const (
// SubscribeTimeout is the maximum time we wait to subscribe for an event.
// must be less than the server's write timeout (see rpcserver.DefaultConfig)
SubscribeTimeout = 5 * time.Second

// genesisChunkSize is the maximum size, in bytes, of each
// chunk in the genesis structure for the chunked API
genesisChunkSize = 16 * 1024 * 1024 // 16
)

//----------------------------------------------
Expand Down Expand Up @@ -80,6 +86,9 @@ type Environment struct {
Logger log.Logger

Config cfg.RPCConfig

// cache of chunked genesis data.
genChunks []string
}

//----------------------------------------------
Expand Down Expand Up @@ -122,6 +131,35 @@ func (env *Environment) validatePerPage(perPagePtr *int) int {
return perPage
}

// InitGenesisChunks configures the environment and should be called on service
// startup.
func (env *Environment) InitGenesisChunks() error {
if env.genChunks != nil {
return nil
}

if env.GenDoc == nil {
return nil
}

data, err := tmjson.Marshal(env.GenDoc)
if err != nil {
return err
}

for i := 0; i < len(data); i += genesisChunkSize {
end := i + genesisChunkSize

if end > len(data) {
end = len(data)
}

env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
}

return nil
}

func validateSkipCount(page, perPage int) int {
skipCount := (page - 1) * perPage
if skipCount < 0 {
Expand Down
27 changes: 27 additions & 0 deletions rpc/core/net.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -92,9 +93,35 @@ func (env *Environment) UnsafeDialPeers(
// Genesis returns genesis file.
// More: https://docs.tendermint.com/master/rpc/#/Info/genesis
func (env *Environment) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
if len(env.genChunks) > 1 {
return nil, errors.New("genesis response is large, please use the genesis_chunked API instead")
}

return &ctypes.ResultGenesis{Genesis: env.GenDoc}, nil
}

func (env *Environment) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) {
if env.genChunks == nil {
return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized")
}

if len(env.genChunks) == 0 {
return nil, fmt.Errorf("service configuration error, there are no chunks")
}

id := int(chunk)

if id > len(env.genChunks)-1 {
return nil, fmt.Errorf("there are %d chunks, %d is invalid", len(env.genChunks)-1, id)
}

return &ctypes.ResultGenesisChunk{
TotalChunks: len(env.genChunks),
ChunkNumber: id,
Data: env.genChunks[id],
}, nil
}

func getIDs(peers []string) ([]string, error) {
ids := make([]string, 0, len(peers))

Expand Down
1 change: 1 addition & 0 deletions rpc/core/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (env *Environment) GetRoutes() RoutesMap {
"net_info": rpc.NewRPCFunc(env.NetInfo, "", false),
"blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
"genesis": rpc.NewRPCFunc(env.Genesis, "", true),
"genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", true),
"block": rpc.NewRPCFunc(env.Block, "height", true),
"block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true),
"block_results": rpc.NewRPCFunc(env.BlockResults, "height", true),
Expand Down
10 changes: 10 additions & 0 deletions rpc/core/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ type ResultGenesis struct {
Genesis *types.GenesisDoc `json:"genesis"`
}

// ResultGenesisChunk is the output format for the chunked/paginated
// interface. These chunks are produced by converting the genesis
// document to JSON and then splitting the resulting payload into
// 16 megabyte blocks and then base64 encoding each block.
type ResultGenesisChunk struct {
ChunkNumber int `json:"chunk"`
TotalChunks int `json:"total"`
Data string `json:"data"`
}

// Single block (with meta)
type ResultBlock struct {
BlockID types.BlockID `json:"block_id"`
Expand Down

0 comments on commit d913406

Please sign in to comment.