Skip to content

Commit

Permalink
go/worker/common/p2p: Make sure P2P stops before service cleanup runs
Browse files Browse the repository at this point in the history
Otherwise this may result in a crash during shutdown when P2P requests
are processed while database is already closed.
  • Loading branch information
kostko committed Apr 7, 2022
1 parent 0a4aaf5 commit 8361962
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .changelog/4650.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/common/p2p: Make sure P2P stops before service cleanup runs

Otherwise this may result in a crash during shutdown when P2P requests are
processed while database is already closed.
9 changes: 2 additions & 7 deletions go/oasis-node/cmd/debug/byzantine/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ type p2pReqRes struct {
}

type p2pHandle struct {
context context.Context
cancel context.CancelFunc
service *p2p.P2P
requests chan p2pReqRes
}
Expand Down Expand Up @@ -96,9 +94,8 @@ func (ph *p2pHandle) start(ht *honestTendermint, id *identity.Identity, runtimeI
return fmt.Errorf("P2P service already started")
}

ph.context, ph.cancel = context.WithCancel(context.Background())
var err error
ph.service, err = p2p.New(ph.context, id, ht.service)
ph.service, err = p2p.New(id, ht.service)
if err != nil {
return fmt.Errorf("P2P service New: %w", err)
}
Expand All @@ -114,10 +111,8 @@ func (ph *p2pHandle) stop() error {
return fmt.Errorf("P2P service not started")
}

ph.cancel()
ph.service.Stop()
ph.service = nil
ph.context = nil
ph.cancel = nil

return nil
}
Expand Down
6 changes: 2 additions & 4 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/identity"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/persistent"
"github.com/oasisprotocol/oasis-core/go/common/service"
"github.com/oasisprotocol/oasis-core/go/common/version"
consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint"
Expand Down Expand Up @@ -275,15 +274,14 @@ func (n *Node) initRuntimeWorkers() error {
// listening immediately when created, make sure that we don't start it if
// it is not needed.
if n.RuntimeRegistry.Mode() != runtimeRegistry.RuntimeModeNone {
p2pCtx, p2pSvc := service.NewContextCleanup(context.Background())
if genesisDoc.Registry.Parameters.DebugAllowUnroutableAddresses {
p2p.DebugForceAllowUnroutableAddresses()
}
n.P2P, err = p2p.New(p2pCtx, n.Identity, n.Consensus)
n.P2P, err = p2p.New(n.Identity, n.Consensus)
if err != nil {
return err
}
n.svcMgr.RegisterCleanupOnly(p2pSvc, "worker p2p")
n.svcMgr.Register(n.P2P)
}

// Initialize the common worker.
Expand Down
44 changes: 37 additions & 7 deletions go/worker/common/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type P2P struct {
sync.RWMutex
*PeerManager

ctx context.Context
ctxCancel context.CancelFunc
quitCh chan struct{}

chainContext string

Expand All @@ -74,6 +75,33 @@ type P2P struct {
logger *logging.Logger
}

// Cleanup performs the service specific post-termination cleanup.
func (p *P2P) Cleanup() {
}

// Name returns the service name.
func (p *P2P) Name() string {
return "worker p2p"
}

// Start starts the service.
func (p *P2P) Start() error {
// Unfortunately libp2p starts everything on construction.
return nil
}

// Stop halts the service.
func (p *P2P) Stop() {
p.ctxCancel()
_ = p.host.Close() // This blocks until the host stops.
close(p.quitCh)
}

// Quit returns a channel that will be closed when the service terminates.
func (p *P2P) Quit() <-chan struct{} {
return p.quitCh
}

// Addresses returns the P2P addresses of the node.
func (p *P2P) Addresses() []node.Address {
if p == nil {
Expand Down Expand Up @@ -290,7 +318,7 @@ func messageIdFn(pmsg *pb.Message) string {
}

// New creates a new P2P node.
func New(ctx context.Context, identity *identity.Identity, consensus consensus.Backend) (*P2P, error) {
func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) {
// Instantiate the libp2p host.
addresses, err := configparser.ParseAddressList(viper.GetStringSlice(cfgP2pAddresses))
if err != nil {
Expand Down Expand Up @@ -340,12 +368,9 @@ func New(ctx context.Context, identity *identity.Identity, consensus consensus.B
if err != nil {
return nil, fmt.Errorf("worker/common/p2p: failed to initialize libp2p host: %w", err)
}
go func() {
<-ctx.Done()
_ = host.Close()
}()

// Initialize the gossipsub router.
ctx, ctxCancel := context.WithCancel(context.Background())
pubsub, err := pubsub.NewGossipSub(
ctx,
host,
Expand All @@ -359,17 +384,22 @@ func New(ctx context.Context, identity *identity.Identity, consensus consensus.B
pubsub.WithMessageIdFn(messageIdFn),
)
if err != nil {
ctxCancel()
_ = host.Close()
return nil, fmt.Errorf("worker/common/p2p: failed to initialize libp2p gossipsub: %w", err)
}

chainContext, err := consensus.GetChainContext(ctx)
if err != nil {
ctxCancel()
_ = host.Close()
return nil, fmt.Errorf("worker/common/p2p: failed to get consensus chain context: %w", err)
}

p := &P2P{
PeerManager: newPeerManager(ctx, host, cg, consensus),
ctx: ctx,
ctxCancel: ctxCancel,
quitCh: make(chan struct{}),
chainContext: chainContext,
host: host,
pubsub: pubsub,
Expand Down

0 comments on commit 8361962

Please sign in to comment.