Skip to content

Commit

Permalink
rpc: use async-probing based circuit breakers
Browse files Browse the repository at this point in the history
See cockroachdb#68419 (comment) for the original discussion.

This commit adds a new `circuit` package that uses probing-based
circuit breakers. This breaker does *not* recruit the occasional
request to carry out the probing. Instead, the circuit breaker
is configured with an "asychronous probe" that effectively
determines when the breaker should reset.

We prefer this approach precisely because it avoids recruiting
regular traffic, which is often tied to end-user requests, and
led to inacceptable latencies there.

The potential downside of the probing approach is that the breaker setup
is more complex and there is residual risk of configuring the probe
differently from the actual client requests. In the worst case, the
breaker would be perpetually tripped even though everything should be
fine. This isn't expected - our two uses of circuit breakers are
pretty clear about what they protect - but it is worth mentioning
as this consideration likely influenced the design of the original
breaker.

Touches cockroachdb#69888
Touches cockroachdb#70111
Touches cockroachdb#53410

Also, this breaker was designed to be a good fit for:
cockroachdb#33007

Release note: None
  • Loading branch information
tbg committed Sep 30, 2021
1 parent afb937c commit c7046a3
Show file tree
Hide file tree
Showing 23 changed files with 687 additions and 162 deletions.
3 changes: 2 additions & 1 deletion pkg/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/util",
"//pkg/util/circuit",
"//pkg/util/errorutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand All @@ -37,7 +38,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -74,6 +74,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/circuit",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
16 changes: 8 additions & 8 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"sync"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -73,7 +73,7 @@ func (c *client) startLocked(
disconnected chan *client,
rpcCtx *rpc.Context,
stopper *stop.Stopper,
breaker *circuit.Breaker,
breaker *circuit.BreakerV2,
) {
// Add a placeholder for the new outgoing connection because we may not know
// the ID of the node we're connecting to yet. This will be resolved in
Expand All @@ -98,9 +98,9 @@ func (c *client) startLocked(
disconnected <- c
}()

consecFailures := breaker.ConsecFailures()
//consecFailures := breaker.ConsecFailures()
var stream Gossip_GossipClient
if err := breaker.Call(func() error {
if err := breaker.Call(ctx, func(ctx context.Context) error {
// Note: avoid using `grpc.WithBlock` here. This code is already
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
Expand All @@ -113,10 +113,10 @@ func (c *client) startLocked(
return err
}
return c.requestGossip(g, stream)
}, 0); err != nil {
if consecFailures == 0 {
log.Warningf(ctx, "failed to start gossip client to %s: %s", c.addr, err)
}
}); err != nil {
// if consecFailures == 0 {
log.Warningf(ctx, "failed to start gossip client to %s: %s", c.addr, err)
// }
return
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -156,13 +157,17 @@ func gossipSucceedsSoon(
disconnected <- c
}

neverTripBreaker := circuit.NewBreakerV2(circuit.OptionsV2{
Name: "never-breaker",
ShouldTrip: func(err error) error { return nil },
})
testutils.SucceedsSoon(t, func() error {
select {
case client := <-disconnected:
// If the client wasn't able to connect, restart it.
g := gossip[client]
g.mu.Lock()
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
client.startLocked(g, disconnected, rpcContext, stopper, neverTripBreaker)
g.mu.Unlock()
default:
}
Expand Down Expand Up @@ -305,6 +310,10 @@ func TestClientNodeID(t *testing.T) {
// A gossip client may fail to start if the grpc connection times out which
// can happen under load (such as in CircleCI or using `make stress`). So we
// loop creating clients until success or the test times out.
neverTripBreaker := circuit.NewBreakerV2(circuit.OptionsV2{
Name: "never-breaker",
ShouldTrip: func(err error) error { return nil },
})
for {
// Wait for c.gossip to start.
select {
Expand All @@ -316,7 +325,7 @@ func TestClientNodeID(t *testing.T) {
case <-disconnected:
// The client hasn't been started or failed to start, loop and try again.
local.mu.Lock()
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
c.startLocked(local, disconnected, rpcContext, stopper, neverTripBreaker)
local.mu.Unlock()
}
}
Expand Down
30 changes: 25 additions & 5 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ import (
"sync"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -237,7 +237,7 @@ type Gossip struct {
syncutil.Mutex
clients []*client
// One breaker per client for the life of the process.
breakers map[string]*circuit.Breaker
breakers map[string]*circuit.BreakerV2
}

disconnected chan *client // Channel of disconnected clients
Expand Down Expand Up @@ -325,7 +325,7 @@ func New(
stopper.AddCloser(stop.CloserFn(g.server.AmbientContext.FinishEventLog))

registry.AddMetric(g.outgoing.gauge)
g.clientsMu.breakers = map[string]*circuit.Breaker{}
g.clientsMu.breakers = map[string]*circuit.BreakerV2{}

g.mu.Lock()
// Add ourselves as a SystemConfig watcher.
Expand Down Expand Up @@ -1526,8 +1526,28 @@ func (g *Gossip) startClientLocked(addr util.UnresolvedAddr) {
defer g.clientsMu.Unlock()
breaker, ok := g.clientsMu.breakers[addr.String()]
if !ok {
name := fmt.Sprintf("gossip %v->%v", g.rpcContext.Config.Addr, addr)
breaker = g.rpcContext.NewBreaker(name)
name := fmt.Sprintf("gossip to %v", addr)
// Don't actually probe anything here, just wait and then declare the
// breaker good to go. This is sufficient for the Gossip use case.
//
// TODO(tbg): refactor this such that a client wraps the breaker and the
// dialing logic, so that the probe has easy access to exactly the same
// operation that the breaker later guards.
asyncProbe := func(report func(error), done func()) {
g.stopper.RunAsyncTask(g.AnnotateCtx(context.Background()), "gossip-probe",
func(ctx context.Context) {
defer done()
select {
case <-time.After(1 * time.Second):
report(nil)
case <-ctx.Done():
return
case <-g.stopper.ShouldQuiesce():
return
}
})
}
breaker = g.rpcContext.NewBreaker(name, asyncProbe)
g.clientsMu.breakers[addr.String()] = breaker
}
ctx := g.AnnotateCtx(context.TODO())
Expand Down
7 changes: 6 additions & 1 deletion pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -517,11 +518,15 @@ func TestGossipNoForwardSelf(t *testing.T) {
local.server.mu.Unlock()
peer := startGossip(clusterID, roachpb.NodeID(i+maxSize+2), stopper, t, metric.NewRegistry())

neverTripBreaker := circuit.NewBreakerV2(circuit.OptionsV2{
Name: "never-breaker",
ShouldTrip: func(err error) error { return nil },
})
for {
localAddr := local.GetNodeAddr()
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, localAddr, makeMetrics())
peer.mu.Lock()
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker(""))
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, neverTripBreaker)
peer.mu.Unlock()

disconnectedClient := <-disconnectedCh
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ go_test(
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/circuit",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand All @@ -382,7 +383,6 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,7 +2465,10 @@ func TestReportUnreachableHeartbeats(t *testing.T) {
// heartbeat transmission to the other store.
cb := tc.Servers[followerIdx].RaftTransport().GetCircuitBreaker(
tc.Target(followerIdx).NodeID, rpc.DefaultClass)
cb.Break()
// TODO(tbg): this is a no-op since the breaker in this test is
// configured to never trip. See:
// https://github.com/cockroachdb/cockroach/blob/885075b9c16ae04f537ffe4a0cfe7113c28c4811/pkg/server/testserver.go#L1304-L1310
cb.Trip()

// Send a command to ensure Raft is aware of lost follower so that it won't
// quiesce (which would prevent heartbeats).
Expand Down Expand Up @@ -2551,7 +2554,7 @@ func TestReportUnreachableRemoveRace(t *testing.T) {
for i := range tc.Servers {
if i != partitionedMaybeLeaseholderIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
cb.Break()
cb.Trip()
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand All @@ -36,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -489,7 +489,7 @@ func (r *Replica) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) *circuit.Breaker {
) *circuit.BreakerV2 {
return t.dialer.GetCircuitBreaker(nodeID, class)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/server/pgurl",
"//pkg/settings/cluster",
"//pkg/sql/catalog/catalogkeys",
"//pkg/util/circuit",
"//pkg/util/contextutil",
"//pkg/util/envutil",
"//pkg/util/growstack",
Expand All @@ -50,7 +51,6 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cenkalti_backoff//:backoff",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/addjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func GetAddJoinDialOptions(certPool *x509.CertPool) []grpc.DialOption {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor((snappyCompressor{}).Name())))
dialOpts = append(dialOpts, grpc.WithNoProxy())
backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = maxBackoff
backoffConfig.MaxDelay = maxRPCBackoff
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffConfig}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts,
Expand Down

0 comments on commit c7046a3

Please sign in to comment.