Skip to content

Commit

Permalink
Merge pull request #2988 from nspcc-dev/client/close-blockers
Browse files Browse the repository at this point in the history
rpcclient: close WSClient subscriber on overflow
  • Loading branch information
roman-khimov committed Apr 26, 2023
2 parents 1bd22ad + dab13a4 commit dcea3f6
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/rpcclient/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,7 @@ func TestRPCClients(t *testing.T) {
})
t.Run("WSClient", func(t *testing.T) {
testRPCClient(t, func(ctx context.Context, endpoint string, opts Options) (*Client, error) {
wsc, err := NewWS(ctx, httpURLtoWS(endpoint), opts)
wsc, err := NewWS(ctx, httpURLtoWS(endpoint), WSOptions{Options: opts})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
require.NoError(t, wsc.Init())
Expand Down
179 changes: 139 additions & 40 deletions pkg/rpcclient/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ import (
// will make WSClient wait for the channel reader to get the event and while
// it waits every other messages (subscription-related or request replies)
// will be blocked. This also means that subscription channel must be properly
// drained after unsubscription.
// drained after unsubscription. If CloseNotificationChannelIfFull option is on
// then the receiver channel will be closed immediately in case if a subsequent
// notification can't be sent to it, which means WSClient's operations are
// unblocking in this mode. No unsubscription is performed in this case, so it's
// still the user responsibility to unsubscribe.
//
// Any received subscription items (blocks/transactions/nofitications) are passed
// via pointers for efficiency, but the actual structures MUST NOT be changed, as
Expand All @@ -47,7 +51,9 @@ import (
// only sent once per channel. The receiver channel will be closed by the WSClient
// immediately after MissedEvent is received from the server; no unsubscription
// is performed in this case, so it's the user responsibility to unsubscribe. It
// will also be closed on disconnection from server.
// will also be closed on disconnection from server or on situation when it's
// impossible to send a subsequent notification to the subscriber's channel and
// CloseNotificationChannelIfFull option is on.
type WSClient struct {
Client
// Notifications is a channel that is used to send events received from
Expand All @@ -66,6 +72,7 @@ type WSClient struct {
Notifications chan Notification

ws *websocket.Conn
wsOpts WSOptions
done chan struct{}
requests chan *neorpc.Request
shutdown chan struct{}
Expand All @@ -86,6 +93,21 @@ type WSClient struct {
respChannels map[uint64]chan *neorpc.Response
}

// WSOptions defines options for the web-socket RPC client. It contains a
// set of options for the underlying standard RPC client as far as
// WSClient-specific options. See Options documentation for more details.
type WSOptions struct {
Options
// CloseNotificationChannelIfFull allows WSClient to close a subscriber's
// receive channel in case if the channel isn't read properly and no more
// events can be pushed to it. This option, if set, allows to avoid WSClient
// blocking on a subsequent notification dispatch. However, if enabled, the
// corresponding subscription is kept even after receiver's channel closing,
// thus it's still the caller's duty to call Unsubscribe() for this
// subscription.
CloseNotificationChannelIfFull bool
}

// notificationReceiver is an interface aimed to provide WS subscriber functionality
// for different types of subscriptions.
type notificationReceiver interface {
Expand All @@ -94,8 +116,11 @@ type notificationReceiver interface {
// Receiver returns notification receiver channel.
Receiver() any
// TrySend checks whether notification passes receiver filter and sends it
// to the underlying channel if so.
TrySend(ntf Notification) bool
// to the underlying channel if so. It is performed under subscriptions lock
// taken. nonBlocking denotes whether the receiving operation shouldn't block
// the client's operation. It returns whether notification matches the filter
// and whether the receiver channel is overflown.
TrySend(ntf Notification, nonBlocking bool) (bool, bool)
// Close closes underlying receiver channel.
Close()
}
Expand Down Expand Up @@ -125,12 +150,21 @@ func (r *blockReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *blockReceiver) TrySend(ntf Notification) bool {
func (r *blockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*block.Block)
return true
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Block):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Block)
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -163,12 +197,21 @@ func (r *txReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *txReceiver) TrySend(ntf Notification) bool {
func (r *txReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*transaction.Transaction)
return true
if nonBlocking {
select {
case r.ch <- ntf.Value.(*transaction.Transaction):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*transaction.Transaction)
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -201,12 +244,21 @@ func (r *executionNotificationReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *executionNotificationReceiver) TrySend(ntf Notification) bool {
func (r *executionNotificationReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
return true
if nonBlocking {
select {
case r.ch <- ntf.Value.(*state.ContainedNotificationEvent):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -239,12 +291,21 @@ func (r *executionReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *executionReceiver) TrySend(ntf Notification) bool {
func (r *executionReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.AppExecResult)
return true
if nonBlocking {
select {
case r.ch <- ntf.Value.(*state.AppExecResult):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*state.AppExecResult)
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -277,12 +338,21 @@ func (r *notaryRequestReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *notaryRequestReceiver) TrySend(ntf Notification) bool {
func (r *notaryRequestReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
return true
if nonBlocking {
select {
case r.ch <- ntf.Value.(*result.NotaryRequestEvent):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -316,12 +386,21 @@ func (r *naiveReceiver) Receiver() any {
}

// TrySend implements notificationReceiver interface.
func (r *naiveReceiver) TrySend(ntf Notification) bool {
func (r *naiveReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf
return true
if nonBlocking {
select {
case r.ch <- ntf:
default:
return true, true
}
} else {
r.ch <- ntf
}

return true, false
}
return false
return false, false
}

// Close implements notificationReceiver interface.
Expand Down Expand Up @@ -382,7 +461,7 @@ var errConnClosedByUser = errors.New("connection closed by user")
// connection). You need to use websocket URL for it like `ws://1.2.3.4/ws`.
// You should call Init method to initialize the network magic the client is
// operating on.
func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error) {
func NewWS(ctx context.Context, endpoint string, opts WSOptions) (*WSClient, error) {
dialer := websocket.Dialer{HandshakeTimeout: opts.DialTimeout}
ws, resp, err := dialer.DialContext(ctx, endpoint, nil)
if resp != nil && resp.Body != nil { // Can be non-nil even with error returned.
Expand All @@ -405,6 +484,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
Notifications: make(chan Notification),

ws: ws,
wsOpts: opts,
shutdown: make(chan struct{}),
done: make(chan struct{}),
closeCalled: *atomic.NewBool(false),
Expand All @@ -414,7 +494,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
receivers: make(map[any][]string),
}

err = initClient(ctx, &wsc.Client, endpoint, opts)
err = initClient(ctx, &wsc.Client, endpoint, opts.Options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -542,16 +622,30 @@ readloop:
c.respLock.Unlock()
c.subscriptionsLock.Lock()
for rcvrCh, ids := range c.receivers {
rcvr := c.subscriptions[ids[0]]
c.dropSubCh(rcvrCh, ids[0], true)
}
c.subscriptionsLock.Unlock()
c.Client.ctxCancel()
}

// dropSubCh closes corresponding subscriber's channel and removes it from the
// receivers map. If the channel belongs to a naive subscriber then it will be
// closed manually without call to Close(). The channel is still being kept in
// the subscribers map as technically the server-side subscription still exists
// and the user is responsible for unsubscription. This method must be called
// under subscriptionsLock taken. It's the caller's duty to ensure dropSubCh
// will be called once per channel, otherwise panic will occur.
func (c *WSClient) dropSubCh(rcvrCh any, id string, ignoreCloseNotificationChannelIfFull bool) {
if ignoreCloseNotificationChannelIfFull || c.wsOpts.CloseNotificationChannelIfFull {
rcvr := c.subscriptions[id]
_, ok := rcvr.(*naiveReceiver)
if !ok { // naiveReceiver uses c.Notifications that is about to be closed below.
c.subscriptions[ids[0]].Close()
if ok { // naiveReceiver uses c.Notifications that should be handled separately.
close(c.Notifications)
} else {
c.subscriptions[id].Close()
}
delete(c.receivers, rcvrCh)
}
c.subscriptionsLock.Unlock()
close(c.Notifications)
c.Client.ctxCancel()
}

func (c *WSClient) wsWriter() {
Expand Down Expand Up @@ -604,15 +698,20 @@ func (c *WSClient) notifySubscribers(ntf Notification) {
c.subscriptionsLock.Unlock()
return
}
c.subscriptionsLock.RLock()
for _, ids := range c.receivers {
c.subscriptionsLock.Lock()
for rcvrCh, ids := range c.receivers {
for _, id := range ids {
if c.subscriptions[id].TrySend(ntf) {
ok, dropCh := c.subscriptions[id].TrySend(ntf, c.wsOpts.CloseNotificationChannelIfFull)
if dropCh {
c.dropSubCh(rcvrCh, id, false)
break // strictly single drop per channel
}
if ok {
break // strictly one notification per channel
}
}
}
c.subscriptionsLock.RUnlock()
c.subscriptionsLock.Unlock()
}

func (c *WSClient) unregisterRespChannel(id uint64) {
Expand Down
Loading

0 comments on commit dcea3f6

Please sign in to comment.