diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index b91cb701..5d2091fe 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -37,9 +37,10 @@ type ( client struct { *httpcli.Client - redirect RedirectSettings - allowReconnect bool // feature flag - listener func(Notification) + redirect RedirectSettings + allowReconnect bool // feature flag + listener func(Notification) + candidateSelector CandidateSelector } // Caller is the public interface a framework scheduler's should consume @@ -74,6 +75,10 @@ type ( requestOpts []httpcli.RequestOpt // requestOpts are temporary per-request options opt httpcli.Opt // opt is a temporary client option } + + // CandidateSelector returns the next endpoint to try if there are errors reaching the mesos master, + // or else an empty string if there are no such candidates. + CandidateSelector func() string ) const ( @@ -134,6 +139,14 @@ func AllowReconnection(v bool) Option { } } +func EndpointCandidates(cs CandidateSelector) Option { + return func(c *client) Option { + old := c.candidateSelector + c.candidateSelector = cs + return EndpointCandidates(old) + } +} + // NewCaller returns a scheduler API Client in the form of a Caller. Concurrent invocations // of Call upon the returned caller are safely executed in a serial fashion. It is expected that // there are no other users of the given Client since its state may be modified by this impl. @@ -173,15 +186,35 @@ func (cli *client) httpDo(ctx context.Context, m encoding.Marshaler, opt ...http opt = append(opt, httpcli.Context(ctx)) for attempt := 0; ; attempt++ { resp, err = cli.Client.Do(m, opt...) - redirectErr, ok := err.(*mesosRedirectionError) - if !ok { - return resp, err + if err == nil { + return } + redirectErr, ok := err.(*mesosRedirectionError) + if attempt < cli.redirect.MaxAttempts { + var candidate string + if !ok { + if cli.candidateSelector == nil { + if debug { + log.Printf("not found candidate selector, using url when initilize framework") + } + candidate = cli.Endpoint() + } else { + candidate = cli.candidateSelector() + } + if candidate == "" { + if debug { + log.Printf("not found candidate url, return directly") + } + return + } + } else { + candidate = redirectErr.newURL + } if debug { - log.Println("redirecting to " + redirectErr.newURL) + log.Printf("redirecting to %v", candidate) } - cli.With(httpcli.Endpoint(redirectErr.newURL)) + cli.With(httpcli.Endpoint(candidate)) select { case <-getBackoff(): case <-ctx.Done():