From 5212e881dd1ad9994a05d2c43b98c6c05274c25a Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Wed, 8 Nov 2017 12:25:05 -0600 Subject: [PATCH] feature flag opt-in for re-subscribe --- api/v1/lib/httpcli/httpsched/httpsched.go | 11 ++++++- api/v1/lib/httpcli/httpsched/state.go | 35 +++++++++++++++-------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index 77c5def9..285c1b91 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -37,7 +37,8 @@ type ( client struct { *httpcli.Client - redirect RedirectSettings + redirect RedirectSettings + allowReconnect bool // feature flag } // Caller is the public interface a framework scheduler's should consume @@ -96,6 +97,14 @@ func MaxRedirects(mr int) Option { } } +func AllowReconnection(v bool) Option { + return func(c *client) Option { + old := c.allowReconnect + c.allowReconnect = v + return AllowReconnection(old) + } +} + // NewCaller returns a scheduler API Client in the form of a Caller. Concurrent invocations // of Call upon the returned caller are safely executed in a serial fashion. It is expected that // there are no other users of the given Client since its state may be modified by this impl. diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 7b3b66c7..4cfd80ab 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -189,18 +189,29 @@ func errorIndicatesSubscriptionLoss(err error) (result bool) { func connectedFn(ctx context.Context, state *state) stateFn { // (a) validate call != SUBSCRIBE if state.call.GetType() == scheduler.Call_SUBSCRIBE { - state.resp = nil - - // TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue - // redundant subscribe calls. However, the state tracking mechanism in this module can't - // cope with it (e.g. we'll need to track a new stream-id, etc). - // We make a best effort to transition to a disconnected state if we detect protocol errors, - // error events, or mesos-generated "not subscribed" errors. But we don't handle things such - // as, for example, authentication errors. Granted, the initial subscribe call should fail - // if authentication is an issue, so we should never end up here. I'm not convinced there's - // not other edge cases though with respect to other error codes. - state.err = errAlreadySubscribed - return connectedFn + if state.client.allowReconnect { + // Reset internal state back to DISCONNECTED and re-execute the SUBSCRIBE call. + // Mesos will hangup on the old SUBSCRIBE socket after this one completes. + state.caller = nil + state.resp = nil + state.err = nil + state.fn = disconnectedFn + + return state.fn(ctx, state) + } else { + state.resp = nil + + // TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue + // redundant subscribe calls. However, the state tracking mechanism in this module can't + // cope with it (e.g. we'll need to track a new stream-id, etc). + // We make a best effort to transition to a disconnected state if we detect protocol errors, + // error events, or mesos-generated "not subscribed" errors. But we don't handle things such + // as, for example, authentication errors. Granted, the initial subscribe call should fail + // if authentication is an issue, so we should never end up here. I'm not convinced there's + // not other edge cases though with respect to other error codes. + state.err = errAlreadySubscribed + return connectedFn + } } // (b) execute call, save the result in resp, err