Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit number of /subscribe clients and queries per client #3269

Merged
merged 5 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Special thanks to external contributors on this release:
### BREAKING CHANGES:

* CLI/RPC/Config
- [httpclient] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [rpc/client] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)

* Apps

Expand All @@ -27,6 +27,7 @@ Special thanks to external contributors on this release:
- [config] \#2920 Remove `consensus.blocktime_iota` parameter
- [genesis] \#2920 Add `time_iota_ms` to block's consensus parameters
- [genesis] \#2920 Rename `consensus_params.block_size` to `consensus_params.block`
- [lite] add `/unsubscribe_all` endpoint, which allows you to unsubscribe from all events

### IMPROVEMENTS:
- [libs/common] \#3238 exit with zero (0) code upon receiving SIGTERM/SIGINT
Expand All @@ -43,6 +44,5 @@ Special thanks to external contributors on this release:
CI/CD: * [\#3396](https://github.com/tendermint/tendermint/pull/3396)

### BUG FIXES:

- [p2p/conn] \#3347 Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
- [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
30 changes: 30 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pkg/errors"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
)

const (
Expand Down Expand Up @@ -323,6 +324,19 @@ type RPCConfig struct {
// Should be < {ulimit -Sn} - {MaxNumInboundPeers} - {MaxNumOutboundPeers} - {N of wal, db and other open files}
// 1024 - 40 - 10 - 50 = 924 = ~900
MaxOpenConnections int `mapstructure:"max_open_connections"`

// Maximum number of unique clientIDs that can /subscribe
// If you're using /broadcast_tx_commit, set to the estimated maximum number
// of broadcast_tx_commit calls per block.
MaxSubscriptionClients int `mapstructure:"max_subscription_clients"`

// Maximum number of unique queries a given client can /subscribe to
// If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set
// to the estimated maximum number of broadcast_tx_commit calls per block.
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
melekes marked this conversation as resolved.
Show resolved Hide resolved

// How long to wait for a tx to be committed during /broadcast_tx_commit
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
}

// DefaultRPCConfig returns a default configuration for the RPC server
Expand All @@ -337,6 +351,10 @@ func DefaultRPCConfig() *RPCConfig {

Unsafe: false,
MaxOpenConnections: 900,

MaxSubscriptionClients: 100,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this too low?

MaxSubscriptionsPerClient: 5,
TimeoutBroadcastTxCommit: 10 * time.Second,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is there an explanation why these are good defaults?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because these were picked quasi randomly.

TimeoutBroadcastTxCommit should be less than 20s (rpcserver.WriteTimeout)
MaxSubscriptionClients * MaxSubscriptionsPerClient define total number of subscriptions. I don't think someone will want more than 500. Even 100 may be enough.

}
}

Expand All @@ -358,6 +376,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxOpenConnections < 0 {
return errors.New("max_open_connections can't be negative")
}
if cfg.MaxSubscriptionClients < 0 {
return errors.New("max_subscription_clients can't be negative")
}
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max_subscriptions_per_client can't be negative")
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}
if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to handle this better but separate issue.

return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout)
}
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ unsafe = {{ .RPC.Unsafe }}
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = {{ .RPC.MaxOpenConnections }}

# Maximum number of unique clientIDs that can /subscribe
# If you're using /broadcast_tx_commit, set to the estimated maximum number
# of broadcast_tx_commit calls per block.
max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}

# Maximum number of unique queries a given client can /subscribe to
# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}

# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"

##### peer to peer configuration options #####
[p2p]

Expand Down
13 changes: 13 additions & 0 deletions docs/tendermint-core/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ unsafe = false
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = 900

# Maximum number of unique clientIDs that can /subscribe
# If you're using /broadcast_tx_commit, set to the estimated maximum number
# of broadcast_tx_commit calls per block.
max_subscription_clients = 100

# Maximum number of unique queries a given client can /subscribe to
# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = 5

# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "10s"

##### peer to peer configuration options #####
[p2p]

Expand Down
14 changes: 14 additions & 0 deletions libs/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
}
}

// NumClients returns the number of clients.
func (s *Server) NumClients() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions)
}

// NumClientSubscriptions returns the number of subscriptions the client has.
func (s *Server) NumClientSubscriptions(clientID string) int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions[clientID])
}

// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
Expand Down
4 changes: 4 additions & 0 deletions libs/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func TestSubscribe(t *testing.T) {
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)

assert.Equal(t, 1, s.NumClients())
assert.Equal(t, 1, s.NumClientSubscriptions(clientID))

err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
assertReceive(t, "Ka-Zar", subscription.Out())
Expand Down
16 changes: 10 additions & 6 deletions lite/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"context"
"net/http"

amino "github.com/tendermint/go-amino"
Expand Down Expand Up @@ -34,7 +35,12 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, r, cdc, logger)

wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c))
unsubscribeFromAllEvents := func(remoteAddr string) {
if err := c.UnsubscribeAll(context.Background(), remoteAddr); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should use some timeout here to avoid zombie connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we blocked here forever, something is horribly wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is the sign pubsub does not need context.Context at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this have to do with zombie connections? What could cause this to block?

Copy link
Contributor Author

@melekes melekes Mar 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If indexer (which's using SubscribeUnbuffered) is not reading messages for example.

logger.Error("Failed to unsubscribe from events", "err", err)
}
}
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents))
wm.SetLogger(logger)
core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
Expand All @@ -51,13 +57,11 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
//
// if we want security, the client must implement it as a secure client
func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc {

return map[string]*rpcserver.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events.
// We can just use the core tendermint impl, which uses the
// EventSwitch we registered in NewWebsocketManager above
"subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"),
"subscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).SubscribeWS, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeWS, "query"),
"unsubscribe_all": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeAllWS, ""),
melekes marked this conversation as resolved.
Show resolved Hide resolved

// info API
"status": rpcserver.NewRPCFunc(c.Status, ""),
Expand Down
53 changes: 53 additions & 0 deletions lite/proxy/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package proxy

import (
"context"
"fmt"

cmn "github.com/tendermint/tendermint/libs/common"

"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/lite"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)

var _ rpcclient.Client = Wrapper{}
Expand Down Expand Up @@ -149,6 +153,55 @@ func (w Wrapper) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
w.prt.RegisterOpDecoder(typ, dec)
}

// SubscribeWS subscribes for events using the given query and remote address as
// a subscriber, but does not verify responses (UNSAFE)!
func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
out, err := w.Client.Subscribe(context.Background(), ctx.RemoteAddr(), query)
if err != nil {
return nil, err
}

go func() {
for {
select {
case resultEvent := <-out:
// XXX(melekes) We should have a switch here that performs a validation
// depending on the event's type.
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
resultEvent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this where above mentioned verification should happen? Before TryWriteRPCResponse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should have a switch here that performs a validation depending on the event's type.

))
case <-w.Client.Quit():
return
}
}
}()

return &ctypes.ResultSubscribe{}, nil
}

// UnsubscribeWS calls original client's Unsubscribe using remote address as a
// subscriber.
func (w Wrapper) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
err := w.Client.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}

// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
// as a subscriber.
func (w Wrapper) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
err := w.Client.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}

// // WrappedSwitch creates a websocket connection that auto-verifies any info
// // coming through before passing it along.
// //
Expand Down
13 changes: 11 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
Expand Down Expand Up @@ -658,6 +659,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
rpccore.SetConfig(*n.config.RPC)
}

func (n *Node) startRPC() ([]net.Listener, error) {
Expand All @@ -675,8 +677,15 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
melekes marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does a ErrSubscriptionNotFound happen when calling UnsubscribeAll? Because there are no subscriptions for remoteAddr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there are no subscriptions for remoteAddr?

yes

wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)

Expand Down
6 changes: 6 additions & 0 deletions rpc/client/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) {
})
}
}

// Test HTTPClient resubscribes upon disconnect && subscription error.
// Test Local client resubscribes upon subscription error.
func TestClientsResubscribe(t *testing.T) {
// TODO(melekes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open an issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
8 changes: 3 additions & 5 deletions rpc/client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,16 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
defer cancel()

// register for the next event of this type
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String())
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer c.UnsubscribeAll(ctx, subscriber)

select {
case msg := <-sub.Out():
return msg.Data().(types.TMEventData), nil
case <-sub.Cancelled():
return nil, errors.New("subscription was cancelled")
case event := <-eventCh:
return event.Data.(types.TMEventData), nil
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
}
Expand Down