Skip to content

Commit

Permalink
morph: rework subscriptions
Browse files Browse the repository at this point in the history
This is a temporary scheme, just enough to almost get rid of "deprecated"
warnings (refs. #2219). It also fixes #2304.

See also #2306, #2307 and #1337.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
  • Loading branch information
roman-khimov committed Apr 19, 2023
1 parent 1cc7c25 commit cc5820c
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 400 deletions.
29 changes: 2 additions & 27 deletions pkg/morph/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,9 @@ type Client struct {
// on every normal call.
switchLock *sync.RWMutex

// channel for ws notifications
notifications chan rpcclient.Notification

// channel for internal stop
closeChan chan struct{}

// cached subscription information
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
subscribedToNewBlocks bool

// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
Expand Down Expand Up @@ -496,26 +488,9 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res

// NotificationChannel returns channel than receives subscribed
// notification from the connected RPC node.
// Channel is closed when connection to the RPC node has been
// lost without the possibility of recovery.
// Channel is closed when connection to the RPC node is lost.
func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
return c.notifications
}

// inactiveMode switches Client to an inactive mode:
// - notification channel is closed;
// - all the new RPC request would return ErrConnectionLost;
// - inactiveModeCb is called if not nil.
func (c *Client) inactiveMode() {
c.switchLock.Lock()
defer c.switchLock.Unlock()

close(c.notifications)
c.inactive = true

if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return c.client.Notifications
}

func (c *Client) setActor(act *actor.Actor) {
Expand Down
20 changes: 8 additions & 12 deletions pkg/morph/client/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
Expand Down Expand Up @@ -97,16 +96,13 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
}

cli := &Client{
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
closeChan: make(chan struct{}),
cache: newClientCache(),
logger: cfg.logger,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
switchLock: &sync.RWMutex{},
closeChan: make(chan struct{}),
}

var err error
Expand Down Expand Up @@ -139,7 +135,7 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
}
cli.setActor(act)

go cli.notificationLoop()
go cli.closeWaiter()

return cli, nil
}
Expand Down
118 changes: 30 additions & 88 deletions pkg/morph/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (e *endpoints) init(ee []Endpoint) {
e.list = ee
}

func (c *Client) switchRPC() bool {
// SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC() bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()

Expand All @@ -51,16 +52,6 @@ func (c *Client) switchRPC() bool {
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))

if !c.restoreSubscriptions(cli, newEndpoint) {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}

c.client = cli
c.setActor(act)

Expand All @@ -73,62 +64,21 @@ func (c *Client) switchRPC() bool {
return true
}

c.inactive = true

if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return false
}

func (c *Client) notificationLoop() {
for {
c.switchLock.RLock()
nChan := c.client.Notifications
c.switchLock.RUnlock()

select {
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()

return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()

return
case n, ok := <-nChan:
// notification channel is used as a connection
// state: if it is closed, the connection is
// considered to be lost
if !ok {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method that happens only when the client has
// switched to the more prioritized RPC
continue
}

if !c.switchRPC() {
c.logger.Error("could not establish connection to any RPC node")

// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()

return
}

// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost

continue
}

c.notifications <- n
}
func (c *Client) closeWaiter() {
select {
case <-c.cfg.ctx.Done():
case <-c.closeChan:
}
_ = c.UnsubscribeAll()
c.close()
}

func (c *Client) switchToMostPrioritized() {
Expand Down Expand Up @@ -173,42 +123,34 @@ mainLoop:
continue
}

if c.restoreSubscriptions(cli, tryE) {
c.switchLock.Lock()

// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}

c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.endpoints.curr = i
c.switchLock.Lock()

// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()

c.logger.Info("switched to the higher priority RPC",
zap.String("endpoint", tryE))

return
}

c.logger.Warn("could not restore side chain subscriptions using node",
zap.String("endpoint", tryE),
zap.Error(err),
)
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.endpoints.curr = i

c.switchLock.Unlock()

c.logger.Info("switched to the higher priority RPC",
zap.String("endpoint", tryE))

return
}
}
}
}

// close closes notification channel and wrapped WS client.
func (c *Client) close() {
close(c.notifications)
c.client.Close()
}
Loading

0 comments on commit cc5820c

Please sign in to comment.