Skip to content

Commit

Permalink
kv: Introduce MuxRangeFeed bi-directional RPC.
Browse files Browse the repository at this point in the history
As demonstrated in cockroachdb#74219, running rangefeed over large enough table or
perhaps not consuming rangefeed events quickly enough, may cause undesirable
memory blow up, including up to OOMing the node.  Since rangefeeds tend to
run on (almost) every node, a possiblity exists that a entire cluster might
get distabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking.  cockroachdb#74456 provided partial mitigation for this issue,
where a single RangeFeed stream could be restricted to use less memory.
Nonetheless, the possibility of excessive memory usage remains since the
number of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`.
In a uni-directonal streaming RPC, the client estabilishes connection to the
KV server and requests to receive events for 1 span.  A separate RPC stream
is created for each span.

In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to
connect to the kv server and request as many spans as it wishes to receive
from that server. The server multiplexes all events onto a single stream,
and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
`MuxRangeFeed` method (i.e. a logical rangefeed established by the client)
is treated independently.  That is, even though it is possible to multiplex
all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it
raises questions about scheduling and fairness.  If we need to further
reduce the number of bi-directional streams in the future, from
`number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such
optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional
stream reduces the number of such stream from the `number_of_ranges` down to
`number_of_nodes` (per logical range feed).  This, in turn, enables the
client to reserve the worst case amount of memory before starting the
rangefeed.  This amount of memory is bound by the number of nodes in the
cluster times per-stream maximum memory -- default is `2MB`, but this can be
changed via dedicated rangefeed connection class.  The server side of the
equation may also benefit from knowing how many rangefeeds are running right
now.  Even though the server is somewhat protected from memory blow up via
http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the
follow on PRs.

The use of new RPC is contingent on the callers providing `WithMuxRangefeed`
option when starting the rangefeed.  However, an envornmnet variable "kill
switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` exists to disable the use
of mux rangefeed in case of serious issues being discovered.

Release note (enterprise change): Introduce a new Rangefeed RPC called
`MuxRangeFeed`.  Rangefeeds now use a common HTTP/2 stream per client
for all range replicas on a node, instead of one per replica. This
significantly reduces the amount of network buffer memory usage, which could
cause nodes to run out of memory if a client was slow to consume events.
The caller may opt in to use new mechanism by specifying `WithMuxRangefeed`
option when starting the rangefeed.  However, a cluster wide
`COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to
`false` to inhibit the use of this new RPC.
  • Loading branch information
Yevgeniy Miretskiy committed Aug 16, 2022
1 parent 241e4f2 commit 33ae487
Show file tree
Hide file tree
Showing 30 changed files with 1,384 additions and 158 deletions.
2 changes: 1 addition & 1 deletion build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca
pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB
pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency
pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient
pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go
pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert
pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-58 set the active cluster version in the format '<major>.<minor>'
version version 22.1-60 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-60</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (f rawEventFeed) run(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
var startAfter hlc.Timestamp
for _, s := range spans {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type rangefeedFactory func(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error

type rangefeed struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error {
panic("implement me")
}

func (*mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ const (
// WaitedForDelRangeInGCJob corresponds to the migration which waits for
// the GC jobs to adopt the use of DelRange with tombstones.
WaitedForDelRangeInGCJob
// RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node.
RangefeedUseOneStreamPerNode

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -536,6 +538,11 @@ var versionsSingleton = keyedVersions{
Key: WaitedForDelRangeInGCJob,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 58},
},
{
Key: RangefeedUseOneStreamPerNode,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"doc.go",
"local_test_cluster_util.go",
Expand Down Expand Up @@ -39,6 +40,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -111,6 +113,7 @@ go_test(
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
Expand Down Expand Up @@ -143,6 +146,7 @@ go_test(
deps = [
"//build/bazelutil:noop",
"//pkg/base",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
Expand All @@ -166,6 +170,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage",
Expand All @@ -175,9 +180,11 @@ go_test(
"//pkg/testutils/localtestcluster",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/ctxgroup",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
Loading

0 comments on commit 33ae487

Please sign in to comment.