Skip to content

Commit

Permalink
Merge 6d0be0e into ee67238
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Apr 1, 2019
2 parents ee67238 + 6d0be0e commit ec4db80
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 180 deletions.
6 changes: 4 additions & 2 deletions api/v1/lib/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (c *Client) With(opts ...Opt) Opt {
// WithTemporary configures the Client with the temporary option and returns the results of
// invoking f(). Changes made to the Client by the temporary option are reverted before this
// func returns.
// It is not safe to modify the configuration of a Client as long as said Client is in use by
// multiple goroutines.
func (c *Client) WithTemporary(opt Opt, f func() error) error {
if opt != nil {
undo := c.With(opt)
Expand Down Expand Up @@ -528,8 +530,8 @@ func With(opt ...ConfigOpt) DoFunc {
Timeout: 5 * time.Second,
}
transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dialer.Dial,
Proxy: http.ProxyFromEnvironment,
Dial: dialer.Dial,
ResponseHeaderTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
TLSHandshakeTimeout: 5 * time.Second,
Expand Down
155 changes: 85 additions & 70 deletions api/v1/lib/httpcli/httpsched/httpsched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
mesosclient "github.com/mesos/mesos-go/api/v1/lib/client"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
Expand Down Expand Up @@ -73,7 +72,6 @@ type (
callerTemporary struct {
callerInternal // delegate actually does the work
requestOpts []httpcli.RequestOpt // requestOpts are temporary per-request options
opt httpcli.Opt // opt is a temporary client option
}

// CandidateSelector returns the next endpoint to try if there are errors reaching the mesos master,
Expand All @@ -99,23 +97,17 @@ func (t NotificationType) String() string {
}

func (ct *callerTemporary) httpDo(ctx context.Context, m encoding.Marshaler, opt ...httpcli.RequestOpt) (resp mesos.Response, err error) {
ct.callerInternal.WithTemporary(ct.opt, func() error {
if len(opt) == 0 {
opt = ct.requestOpts
} else if len(ct.requestOpts) > 0 {
opt = append(opt[:], ct.requestOpts...)
}
resp, err = ct.callerInternal.httpDo(ctx, m, opt...)
return nil
})
if len(opt) == 0 {
opt = ct.requestOpts
} else if len(ct.requestOpts) > 0 {
opt = append(opt[:], ct.requestOpts...)
}
resp, err = ct.callerInternal.httpDo(ctx, m, opt...)
return
}

func (ct *callerTemporary) Call(ctx context.Context, call *scheduler.Call) (resp mesos.Response, err error) {
ct.callerInternal.WithTemporary(ct.opt, func() error {
resp, err = ct.callerInternal.Call(ctx, call)
return nil
})
resp, err = ct.httpDo(ctx, call)
return
}

Expand Down Expand Up @@ -159,71 +151,65 @@ func NewCaller(cl *httpcli.Client, opts ...Option) calls.Caller {
}
}
return &state{
client: result,
fn: disconnectedFn,
client: result,
fn: disconnectedPhase(mustSubscribe),
notifyQueue: make(chan Notification, 10),
}
}

type noMasterResponse struct {
mesos.Response
newLeaderURL string
maxAttempts int
clientErr error
minBackoff, maxBackoff time.Duration
}

// httpDo decorates the inherited behavior w/ support for HTTP redirection to follow Mesos leadership changes.
// NOTE: this implementation will change the state of the client upon Mesos leadership changes.
// for schedulers, redirection really only matters for SUBSCRIBE requests as all other calls require an active
// subscription (and the presence of a redirect response implies that a prior, existing subscription is no
// longer active).
func (cli *client) httpDo(ctx context.Context, m encoding.Marshaler, opt ...httpcli.RequestOpt) (resp mesos.Response, err error) {
var (
done chan struct{} // avoid allocating these chans unless we actually need to redirect
redirectBackoff <-chan struct{}
getBackoff = func() <-chan struct{} {
if redirectBackoff == nil {
done = make(chan struct{})
redirectBackoff = backoff.Notifier(cli.redirect.MinBackoffPeriod, cli.redirect.MaxBackoffPeriod, done)
}
return redirectBackoff
}
)
defer func() {
if done != nil {
close(done)
}
}()
opt = append(opt, httpcli.Context(ctx))
for attempt := 0; ; attempt++ {
resp, err = cli.Client.Do(m, opt...)
if err == nil {
return
}
redirectErr, ok := err.(*mesosRedirectionError)

if attempt < cli.redirect.MaxAttempts {
var candidate string
if !ok {
if cli.candidateSelector == nil {
if debug {
log.Printf("not found candidate selector, using url when initilize framework")
}
candidate = cli.Endpoint()
} else {
candidate = cli.candidateSelector()
}
if candidate == "" {
if debug {
log.Printf("not found candidate url, return directly")
}
return
}
} else {
candidate = redirectErr.newURL
}
resp, err = cli.Client.Do(m, opt...)
if err == nil {
return
}

redirectErr, ok := err.(*mesosRedirectionError)
var candidate string
if !ok {
if cli.candidateSelector == nil {
if debug {
log.Printf("redirecting to %v", candidate)
log.Printf("not found candidate selector, using url when initilize framework")
}
cli.With(httpcli.Endpoint(candidate))
select {
case <-getBackoff():
case <-ctx.Done():
return nil, ctx.Err()
candidate = cli.Endpoint()
} else {
candidate = cli.candidateSelector()
}
if candidate == "" {
if debug {
log.Printf("not found candidate url, return directly")
}
continue
return
}
return
} else {
candidate = redirectErr.newURL
}
if debug {
log.Printf("redirecting to %v", candidate)
}

resp = &noMasterResponse{
Response: &mesos.ResponseWrapper{Response: resp}, // for safe Close() ops
newLeaderURL: candidate,
maxAttempts: cli.redirect.MaxAttempts,
clientErr: err,
minBackoff: cli.redirect.MinBackoffPeriod,
maxBackoff: cli.redirect.MaxBackoffPeriod,
}
err = nil
return
}

// Call implements Client
Expand All @@ -237,14 +223,43 @@ func (mre *mesosRedirectionError) Error() string {
return "mesos server sent redirect to: " + mre.newURL
}

type streamIDResponse struct {
mesos.Response
mesosStreamID string
}

func (sr *streamIDResponse) streamID() string { return sr.mesosStreamID }

func tryExtractStreamID(hres *http.Response, resp mesos.Response) mesos.Response {
if hres.StatusCode != 200 {
return resp
}
// grab Mesos-Stream-Id header; if missing then
// close the response body and return an error
mesosStreamID := hres.Header.Get(headerMesosStreamID)
if mesosStreamID == "" {
return resp
}
return &streamIDResponse{
Response: resp,
mesosStreamID: mesosStreamID,
}
}

// redirectHandler returns a config options that decorates the default response handling routine;
// it transforms normal Mesos redirect "errors" into mesosRedirectionErrors by parsing the Location
// header and computing the address of the next endpoint that should be used to replay the failed
// HTTP request.
func (cli *client) redirectHandler() httpcli.Opt {
return httpcli.HandleResponse(func(hres *http.Response, rc mesosclient.ResponseClass, err error) (mesos.Response, error) {
resp, err := cli.HandleResponse(hres, rc, err) // default response handler
if err == nil || !apierrors.CodeNotLeader.Matches(err) {
if err == nil {
if rc == mesosclient.ResponseClassStreaming || rc == mesosclient.ResponseClassAuto {
resp = tryExtractStreamID(hres, resp)
}
return resp, err
}
if !apierrors.CodeNotLeader.Matches(err) {
return resp, err
}
// TODO(jdef) for now, we're tightly coupled to the httpcli package's Response type
Expand Down
Loading

0 comments on commit ec4db80

Please sign in to comment.