Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: typed app conns #261

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions blockchain/reactor.go
Expand Up @@ -44,7 +44,7 @@ type BlockchainReactor struct {

sw *p2p.Switch
state *sm.State
proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
store *BlockStore
pool *BlockPool
fastSync bool
Expand All @@ -55,7 +55,7 @@ type BlockchainReactor struct {
evsw *events.EventSwitch
}

func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor {
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/state.go
Expand Up @@ -215,7 +215,7 @@ type ConsensusState struct {
QuitService

config cfg.Config
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator *types.PrivValidator
Expand All @@ -238,7 +238,7 @@ type ConsensusState struct {
nSteps int // used for testing to limit the number of transitions the state makes
}

func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
config: config,
proxyAppConn: proxyAppConn,
Expand Down Expand Up @@ -1271,7 +1271,7 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo
defer cs.mempool.Unlock()

// flush out any CheckTx that have already started
cs.proxyAppConn.FlushSync()
// cs.proxyAppConn.FlushSync() // ?! XXX

// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()
Expand Down
4 changes: 2 additions & 2 deletions mempool/mempool.go
Expand Up @@ -49,7 +49,7 @@ type Mempool struct {
config cfg.Config

proxyMtx sync.Mutex
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
Expand All @@ -63,7 +63,7 @@ type Mempool struct {
cacheList *list.List // to remove oldest tx when cache gets too big
}

func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool {
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool {
mempool := &Mempool{
config: config,
proxyAppConn: proxyAppConn,
Expand Down
67 changes: 12 additions & 55 deletions node/node.go
Expand Up @@ -6,7 +6,6 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

. "github.com/tendermint/go-common"
Expand All @@ -26,9 +25,6 @@ import (
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
"github.com/tendermint/tmsp/example/nil"
)

import _ "net/http/pprof"
Expand All @@ -47,7 +43,7 @@ type Node struct {
privKey crypto.PrivKeyEd25519
}

func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {

EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here

Expand All @@ -61,12 +57,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// Get State
state := getState(config, stateDB)

// Create two proxyAppConn connections,
// Create the proxyApp, which houses two connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
proxyApp := proxy.NewMultiAppConn(config, state, blockStore)

// add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID)
Expand All @@ -93,14 +86,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
}

// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)

// Make MempoolReactor
mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())
mempoolReactor := mempl.NewMempoolReactor(config, mempool)

// Make ConsensusReactor
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator)
Expand All @@ -125,6 +118,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// run the profile server
profileHost := config.GetString("prof_laddr")
if profileHost != "" {

go func() {
log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
}()
Expand Down Expand Up @@ -270,40 +264,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
return nodeInfo
}

// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing)
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
proxyAppConn = remoteApp
}

// Check the hash
res := proxyAppConn.CommitSync()
if res.IsErr() {
PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
}
if !bytes.Equal(hash, res.Data) {
log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
}

return proxyAppConn
}

// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
Expand All @@ -319,7 +279,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State {

// Users wishing to use an external signer for their validators
// should fork tendermint/tendermint and implement RunNode to
// load their custom priv validator and call NewNode(privVal, getProxyFunc)
// load their custom priv validator and call NewNode
func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file")
Expand Down Expand Up @@ -347,7 +307,7 @@ func RunNode(config cfg.Config) {
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)

// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
n := NewNode(config, privValidator)

protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
Expand Down Expand Up @@ -402,10 +362,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {

// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyApp := proxy.NewMultiAppConn(config, state, blockStore)

// add the chainid to the global config
config.Set("chain_id", state.ChainID)
Expand All @@ -417,9 +374,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
Exit(Fmt("Failed to start event switch: %v", err))
}

mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())

consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusState.SetEventSwitch(eventSwitch)
return consensusState
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Expand Up @@ -17,7 +17,7 @@ func TestNodeStartStop(t *testing.T) {
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)

// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
n := NewNode(config, privValidator)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)
Expand Down
131 changes: 129 additions & 2 deletions proxy/app_conn.go
Expand Up @@ -2,8 +2,135 @@ package proxy

import (
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/types"
)

type AppConn interface {
tmspcli.Client
//----------------------------------------------------------------------------------------
// Enforce which tmsp msgs can be sent on a connection at the type level

type AppConnConsensus interface {
SetResponseCallback(tmspcli.Callback)
Error() error

InitChainSync(validators []*types.Validator) (err error)

BeginBlockSync(height uint64) (err error)
AppendTxAsync(tx []byte) *tmspcli.ReqRes
EndBlockSync(height uint64) (changedValidators []*types.Validator, err error)
CommitSync() (res types.Result)
}

type AppConnMempool interface {
SetResponseCallback(tmspcli.Callback)
Error() error

CheckTxAsync(tx []byte) *tmspcli.ReqRes

FlushAsync() *tmspcli.ReqRes
FlushSync() error
}

type AppConnQuery interface {
Error() error

EchoSync(string) (res types.Result)
InfoSync() (res types.Result)
QuerySync(tx []byte) (res types.Result)

// SetOptionSync(key string, value string) (res types.Result)
}

//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of tmspcli.Client)

type appConnConsensus struct {
appConn tmspcli.Client
}

func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus {
return &appConnConsensus{
appConn: appConn,
}
}

func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) {
return app.appConn.InitChainSync(validators)
}
func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) {
return app.appConn.BeginBlockSync(height)
}
func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.AppendTxAsync(tx)
}

func (app *appConnConsensus) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) {
return app.appConn.EndBlockSync(height)
}

func (app *appConnConsensus) CommitSync() (res types.Result) {
return app.appConn.CommitSync()
}

//------------------------------------------------
// Implements AppConnMempool (subset of tmspcli.Client)

type appConnMempool struct {
appConn tmspcli.Client
}

func NewAppConnMempool(appConn tmspcli.Client) *appConnMempool {
return &appConnMempool{
appConn: appConn,
}
}

func (app *appConnMempool) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}

func (app *appConnMempool) Error() error {
return app.appConn.Error()
}

func (app *appConnMempool) FlushAsync() *tmspcli.ReqRes {
return app.appConn.FlushAsync()
}

func (app *appConnMempool) FlushSync() error {
return app.appConn.FlushSync()
}

func (app *appConnMempool) CheckTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.CheckTxAsync(tx)
}

//------------------------------------------------
// Implements AppConnQuery (subset of tmspcli.Client)

type appConnQuery struct {
appConn tmspcli.Client
}

func NewAppConnQuery(appConn tmspcli.Client) *appConnQuery {
return &appConnQuery{
appConn: appConn,
}
}

func (app *appConnQuery) Error() error {
return app.appConn.Error()
}

func (app *appConnQuery) InfoSync() (res types.Result) {
return app.appConn.InfoSync()
}

func (app *appConnQuery) QuerySync(tx []byte) (res types.Result) {
return app.appConn.QuerySync(tx)
}