Skip to content

Commit

Permalink
Merge 5212e88 into c488712
Browse files Browse the repository at this point in the history
  • Loading branch information
rboyer committed Nov 8, 2017
2 parents c488712 + 5212e88 commit 27512c5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
11 changes: 10 additions & 1 deletion api/v1/lib/httpcli/httpsched/httpsched.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type (

client struct {
*httpcli.Client
redirect RedirectSettings
redirect RedirectSettings
allowReconnect bool // feature flag
}

// Caller is the public interface a framework scheduler's should consume
Expand Down Expand Up @@ -96,6 +97,14 @@ func MaxRedirects(mr int) Option {
}
}

func AllowReconnection(v bool) Option {
return func(c *client) Option {
old := c.allowReconnect
c.allowReconnect = v
return AllowReconnection(old)
}
}

// NewCaller returns a scheduler API Client in the form of a Caller. Concurrent invocations
// of Call upon the returned caller are safely executed in a serial fashion. It is expected that
// there are no other users of the given Client since its state may be modified by this impl.
Expand Down
35 changes: 23 additions & 12 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,29 @@ func errorIndicatesSubscriptionLoss(err error) (result bool) {
func connectedFn(ctx context.Context, state *state) stateFn {
// (a) validate call != SUBSCRIBE
if state.call.GetType() == scheduler.Call_SUBSCRIBE {
state.resp = nil

// TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue
// redundant subscribe calls. However, the state tracking mechanism in this module can't
// cope with it (e.g. we'll need to track a new stream-id, etc).
// We make a best effort to transition to a disconnected state if we detect protocol errors,
// error events, or mesos-generated "not subscribed" errors. But we don't handle things such
// as, for example, authentication errors. Granted, the initial subscribe call should fail
// if authentication is an issue, so we should never end up here. I'm not convinced there's
// not other edge cases though with respect to other error codes.
state.err = errAlreadySubscribed
return connectedFn
if state.client.allowReconnect {
// Reset internal state back to DISCONNECTED and re-execute the SUBSCRIBE call.
// Mesos will hangup on the old SUBSCRIBE socket after this one completes.
state.caller = nil
state.resp = nil
state.err = nil
state.fn = disconnectedFn

return state.fn(ctx, state)
} else {
state.resp = nil

// TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue
// redundant subscribe calls. However, the state tracking mechanism in this module can't
// cope with it (e.g. we'll need to track a new stream-id, etc).
// We make a best effort to transition to a disconnected state if we detect protocol errors,
// error events, or mesos-generated "not subscribed" errors. But we don't handle things such
// as, for example, authentication errors. Granted, the initial subscribe call should fail
// if authentication is an issue, so we should never end up here. I'm not convinced there's
// not other edge cases though with respect to other error codes.
state.err = errAlreadySubscribed
return connectedFn
}
}

// (b) execute call, save the result in resp, err
Expand Down

0 comments on commit 27512c5

Please sign in to comment.