Skip to content

Commit d5085e9

Browse files
committed
kgo: retry dial errors on sharded requests if possible
Specifically, if a sharded request has a portion that can go to any broker, and the chosen broker fails due to a dial error, we currently hard fail the request (since dial errors are not retryable). However, dial errors are retryable on other brokers. We can retry the sharded request and we try (up to 3x) to receive a different broker. We could go deeper with this and make sure we *never* retry a broker that we previously deemed skippable -- i.e., make sure we don't just skip all the brokers repeatedly in a loop. That can be a follow up task if needed (and, at that point, would probably be worth wiring into the existing use of shouldRetryNext on the `retryable` type). I'm a little worried about just loop skipping around the entire cluster, but really that means you need to have dial failures for every broker address, which at that point is likely due to invalid seed addresses and/or DNS issues. Both are ideally caught quickly enough.
1 parent 39afb0b commit d5085e9

File tree

1 file changed

+55
-19
lines changed

1 file changed

+55
-19
lines changed

pkg/kgo/client.go

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,17 @@ start:
14401440
}
14411441
}
14421442

1443+
log := func(backoff time.Duration) {
1444+
r.cl.cfg.logger.Log(LogLevelDebug, "retrying request",
1445+
"request", kmsg.NameForKey(req.Key()),
1446+
"tries", tries,
1447+
"backoff", backoff,
1448+
"time_since_start", time.Since(tryStart),
1449+
"request_error", err,
1450+
"response_error", retryErr,
1451+
)
1452+
}
1453+
14431454
if err != nil || retryErr != nil {
14441455
if r.limitRetries == 0 || tries <= r.limitRetries {
14451456
backoff := r.cl.cfg.retryBackoff(tries)
@@ -1449,19 +1460,13 @@ start:
14491460
// is a broker-specific network error, and the next
14501461
// broker is different than the current, we also retry.
14511462
if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) {
1452-
r.cl.cfg.logger.Log(LogLevelDebug, "retrying request",
1453-
"request", kmsg.NameForKey(req.Key()),
1454-
"tries", tries,
1455-
"backoff", backoff,
1456-
"time_since_start", time.Since(tryStart),
1457-
"request_error", err,
1458-
"response_error", retryErr,
1459-
)
1463+
log(backoff)
14601464
if r.cl.waitTries(ctx, backoff) {
14611465
next, nextErr = r.br()
14621466
goto start
14631467
}
14641468
} else if r.cl.shouldRetryNext(tries, err) {
1469+
log(backoff)
14651470
next, nextErr = r.br()
14661471
if next != br && r.cl.waitTries(ctx, backoff) {
14671472
goto start
@@ -2428,7 +2433,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
24282433
retryTimeout = cl.cfg.retryTimeout(req.Key())
24292434

24302435
wg sync.WaitGroup
2431-
issue func(reqTry)
2436+
issue func(reqTry, int32)
24322437
)
24332438

24342439
l := cl.cfg.logger
@@ -2439,7 +2444,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
24392444
//
24402445
// This recursively calls itself if a request fails and can be retried.
24412446
// We avoid stack problems because this calls itself in a goroutine.
2442-
issue = func(try reqTry) {
2447+
issue = func(try reqTry, avoidBroker int32) {
24432448
issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr)
24442449
if err != nil {
24452450
l.Log(LogLevelDebug, "unable to shard request", "req", kmsg.Key(try.req.Key()).Name(), "previous_tries", try.tries, "err", err)
@@ -2494,17 +2499,21 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
24942499
start:
24952500
tries++
24962501

2497-
broker := cl.broker()
2502+
br := cl.broker()
24982503
var err error
24992504
if !myIssue.any {
2500-
broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker)
2505+
br, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker)
2506+
} else if avoidBroker != -1 {
2507+
for i := 0; i < 3 && br.meta.NodeID == avoidBroker; i++ {
2508+
br = cl.broker()
2509+
}
25012510
}
25022511
if err != nil {
25032512
addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request
25042513
return
25052514
}
25062515

2507-
resp, err := broker.waitResp(ctx, myIssue.req)
2516+
resp, err := br.waitResp(ctx, myIssue.req)
25082517
var errIsFromResp bool
25092518
if err == nil {
25102519
err = sharder.onResp(myIssue.req, resp) // perform some potential cleanup, and potentially receive an error to retry
@@ -2521,10 +2530,37 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
25212530
// immediately. The request was not even issued. However, as a
25222531
// safety, we only do this 3 times to avoid some super weird
25232532
// pathological spin loop.
2524-
backoff := cl.cfg.retryBackoff(tries)
2533+
//
2534+
// We do retry on pinnedOld even if noRetries==true because
2535+
// the request was not issued; the sharder may handle
2536+
// errBrokerTooOld by pinning / splitting differently next try.
2537+
var (
2538+
backoff = cl.cfg.retryBackoff(tries)
2539+
pinnedOld = reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3
2540+
notTimedOut = retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout
2541+
shouldRetry = cl.shouldRetry(tries, err)
2542+
shouldRetryNext = myIssue.any && cl.shouldRetryNext(tries, err)
2543+
)
2544+
2545+
// If we retried on a "next" broker, but we randomly chose
2546+
// that same broker 3x, then we avoid retrying again on a
2547+
// "next" broker.
2548+
//
2549+
// If we retry at all, we need to clear `avoidBroker` in
2550+
// case it's already set. however, if we *do* need to retry
2551+
// on a different broker, then we set it.
2552+
if avoidBroker != -1 && br.meta.NodeID == avoidBroker {
2553+
shouldRetryNext = false
2554+
}
2555+
avoidBroker = -1
2556+
if shouldRetryNext {
2557+
avoidBroker = br.meta.NodeID
2558+
}
2559+
25252560
if err != nil &&
2526-
(reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3) ||
2527-
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) && !noRetries {
2561+
(pinnedOld ||
2562+
!noRetries && notTimedOut && (shouldRetry || shouldRetryNext) && cl.waitTries(ctx, backoff)) {
2563+
25282564
// Non-reshardable re-requests just jump back to the
25292565
// top where the broker is loaded. This is the case on
25302566
// requests where the original request is split to
@@ -2534,7 +2570,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
25342570
goto start
25352571
}
25362572
l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", tries, "err", err)
2537-
issue(reqTry{tries, myIssue.req, err})
2573+
issue(reqTry{tries, myIssue.req, err}, avoidBroker)
25382574
return
25392575
}
25402576

@@ -2546,12 +2582,12 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
25462582
if errIsFromResp {
25472583
err = nil
25482584
}
2549-
addShard(shard(broker, myIssue.req, resp, err)) // the error was not retryable
2585+
addShard(shard(br, myIssue.req, resp, err)) // the error was not retryable
25502586
}()
25512587
}
25522588
}
25532589

2554-
issue(reqTry{0, req, nil})
2590+
issue(reqTry{0, req, nil}, -1)
25552591
wg.Wait()
25562592

25572593
return shards, sharder.merge

0 commit comments

Comments
 (0)