Skip to content

Commit

Permalink
Merge pull request #2966 from nspcc-dev/bugfix/2896-rpc-shutdown-dead…
Browse files Browse the repository at this point in the history
…lock

rpc: Fix deadlock produced during server shutdown
  • Loading branch information
roman-khimov authored Apr 17, 2023
2 parents e0abe2b + e29c33e commit a4cc6da
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
8 changes: 6 additions & 2 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,10 @@ func startServer(ctx *cli.Context) error {

go serv.Start()
if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized {
rpcServer.Start()
// Run RPC server in a separate routine. This is necessary to avoid a potential
// deadlock: Start() can write errors to errChan which is not yet read in the
// current execution context (see for-loop below).
go rpcServer.Start()
}

sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -546,7 +549,8 @@ Main:
rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer)
if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() {
rpcServer.Start()
// Here similar to the initial run (see above for-loop), so async.
go rpcServer.Start()
}
pprof.ShutDown()
pprof = metrics.NewPprofService(cfgnew.ApplicationConfiguration.Pprof, log)
Expand Down
34 changes: 21 additions & 13 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ type (
transactionSubs int
notaryRequestSubs int

blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
subEventsToExitCh chan struct{}
}

// session holds a set of iterators got after invoke* call with corresponding
Expand Down Expand Up @@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,

subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
subEventsToExitCh: make(chan struct{}),
}
}

Expand All @@ -344,6 +346,9 @@ func (s *Server) Start() {
s.log.Info("RPC server already started")
return
}

go s.handleSubEvents()

for _, srv := range s.http {
srv.Handler = http.HandlerFunc(s.handleHTTPRequest)
s.log.Info("starting rpc-server", zap.String("endpoint", srv.Addr))
Expand All @@ -363,7 +368,6 @@ func (s *Server) Start() {
}(srv)
}

go s.handleSubEvents()
if cfg := s.config.TLSConfig; cfg.Enabled {
for _, srv := range s.https {
srv.Handler = http.HandlerFunc(s.handleHTTPRequest)
Expand Down Expand Up @@ -436,7 +440,7 @@ func (s *Server) Shutdown() {
}

// Wait for handleSubEvents to finish.
<-s.executionCh
<-s.subEventsToExitCh
}

// SetOracleHandler allows to update oracle handler used by the Server.
Expand Down Expand Up @@ -2665,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
}
}

// handleSubEvents processes Server subscriptions until Shutdown. Upon
// completion signals to subEventCh channel.
func (s *Server) handleSubEvents() {
var overflowEvent = neorpc.Notification{
JSONRPC: neorpc.JSONRPCVersion,
Expand Down Expand Up @@ -2778,12 +2784,14 @@ drainloop:
}
}
// It's not required closing these, but since they're drained already
// this is safe and it also allows to give a signal to Shutdown routine.
// this is safe.
close(s.blockCh)
close(s.transactionCh)
close(s.notificationCh)
close(s.executionCh)
close(s.notaryRequestCh)
// notify Shutdown routine
close(s.subEventsToExitCh)
}

func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/services/rpcsrv/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -3331,3 +3332,21 @@ func BenchmarkHandleIn(b *testing.B) {
{"type": "Integer", "value": "42"}, {"type": "Boolean", "value": false}]]}`))
})
}

func TestFailedPreconditionShutdown(t *testing.T) {
_, srv, _ := initClearServerWithCustomConfig(t, func(c *config.Config) {
c.ApplicationConfiguration.RPC.Addresses = []string{"not an address"}
})

srv.Start()
require.Positive(t, len(srv.errChan)) // this is how Start reports internal failures

var stopped atomic.Bool

go func() {
srv.Shutdown()
stopped.Store(true)
}()

require.Eventually(t, stopped.Load, 5*time.Second, 100*time.Millisecond, "Shutdown should return")
}

0 comments on commit a4cc6da

Please sign in to comment.