Skip to content

Commit

Permalink
node: allow replacing existing p2p.Reactor(s) (#3846)
Browse files Browse the repository at this point in the history
* node: allow replacing existing p2p.Reactor(s)

using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.

* check the absence of "CUSTOM" prefix

* merge 2 tests

* add doc.go to node package
  • Loading branch information
melekes authored and tac0turtle committed Jul 30, 2019
1 parent 1e3364a commit 88e0973
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ program](https://hackerone.com/tendermint).
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-cmn`

### FEATURES:
- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.

### IMPROVEMENTS:

Expand Down
40 changes: 40 additions & 0 deletions node/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Package node is the main entry point, where the Node struct, which
represents a full node, is defined.
Adding new p2p.Reactor(s)
To add a new p2p.Reactor, use the CustomReactors option:
node, err := NewNode(
config,
privVal,
nodeKey,
clientCreator,
genesisDocProvider,
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"CUSTOM": customReactor}),
)
Replacing existing p2p.Reactor(s)
To replace the built-in p2p.Reactor, use the CustomReactors option:
node, err := NewNode(
config,
privVal,
nodeKey,
clientCreator,
genesisDocProvider,
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
)
The list of existing reactors can be found in CustomReactors documentation.
*/
package node
23 changes: 17 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ import (
dbm "github.com/tendermint/tm-cmn/db"
)

// CustomReactorNamePrefix is a prefix for all custom reactors to prevent
// clashes with built-in reactors.
const CustomReactorNamePrefix = "CUSTOM_"

//------------------------------------------------------------------------------

// DBContext specifies config information for loading a new DB.
Expand Down Expand Up @@ -144,11 +140,26 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
// Option sets a parameter for the node.
type Option func(*Node)

// CustomReactors allows you to add custom reactors to the node's Switch.
// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to
// the node's Switch.
//
// WARNING: using any name from the below list of the existing reactors will
// result in replacing it with the custom one.
//
// - MEMPOOL
// - BLOCKCHAIN
// - CONSENSUS
// - EVIDENCE
// - PEX
func CustomReactors(reactors map[string]p2p.Reactor) Option {
return func(n *Node) {
for name, reactor := range reactors {
n.sw.AddReactor(CustomReactorNamePrefix+name, reactor)
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
n.sw.Logger.Info("Replacing existing reactor with a custom one",
"name", name, "existing", existingReactor, "custom", reactor)
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)

cr := p2pmock.NewReactor()
customBlockchainReactor := p2pmock.NewReactor()

nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
Expand All @@ -300,7 +301,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
)
require.NoError(t, err)

Expand All @@ -309,6 +310,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer n.Stop()

assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))

assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
}

func state(nVals int, height int64) (sm.State, dbm.DB) {
Expand Down
23 changes: 19 additions & 4 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,9 @@ func WithMetrics(metrics *Metrics) SwitchOption {
// AddReactor adds the given reactor to the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor.
// No two reactors can share the same channel.
reactorChannels := reactor.GetChannels()
for _, chDesc := range reactorChannels {
for _, chDesc := range reactor.GetChannels() {
chID := chDesc.ID
// No two reactors can share the same channel.
if sw.reactorsByCh[chID] != nil {
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
}
Expand All @@ -168,6 +166,23 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
return reactor
}

// RemoveReactor removes the given Reactor from the Switch.
// NOTE: Not goroutine safe.
func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
for _, chDesc := range reactor.GetChannels() {
// remove channel description
for i := 0; i < len(sw.chDescs); i++ {
if chDesc.ID == sw.chDescs[i].ID {
sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...)
break
}
}
delete(sw.reactorsByCh, chDesc.ID)
}
delete(sw.reactors, name)
reactor.SetSwitch(nil)
}

// Reactors returns a map of reactors registered on the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor {
Expand Down

0 comments on commit 88e0973

Please sign in to comment.