Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature flag opt-in for re-subscribe #337

Merged
merged 1 commit into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion api/v1/lib/httpcli/httpsched/httpsched.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type (

client struct {
*httpcli.Client
redirect RedirectSettings
redirect RedirectSettings
allowReconnect bool // feature flag
}

// Caller is the public interface a framework scheduler's should consume
Expand Down Expand Up @@ -96,6 +97,14 @@ func MaxRedirects(mr int) Option {
}
}

func AllowReconnection(v bool) Option {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind writing a minimal comment for this func that explains how it effectively alters the behavior of the client?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm going to add comments in a subsequent PR

return func(c *client) Option {
old := c.allowReconnect
c.allowReconnect = v
return AllowReconnection(old)
}
}

// NewCaller returns a scheduler API Client in the form of a Caller. Concurrent invocations
// of Call upon the returned caller are safely executed in a serial fashion. It is expected that
// there are no other users of the given Client since its state may be modified by this impl.
Expand Down
35 changes: 23 additions & 12 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,29 @@ func errorIndicatesSubscriptionLoss(err error) (result bool) {
func connectedFn(ctx context.Context, state *state) stateFn {
// (a) validate call != SUBSCRIBE
if state.call.GetType() == scheduler.Call_SUBSCRIBE {
state.resp = nil

// TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue
// redundant subscribe calls. However, the state tracking mechanism in this module can't
// cope with it (e.g. we'll need to track a new stream-id, etc).
// We make a best effort to transition to a disconnected state if we detect protocol errors,
// error events, or mesos-generated "not subscribed" errors. But we don't handle things such
// as, for example, authentication errors. Granted, the initial subscribe call should fail
// if authentication is an issue, so we should never end up here. I'm not convinced there's
// not other edge cases though with respect to other error codes.
state.err = errAlreadySubscribed
return connectedFn
if state.client.allowReconnect {
// Reset internal state back to DISCONNECTED and re-execute the SUBSCRIBE call.
// Mesos will hangup on the old SUBSCRIBE socket after this one completes.
state.caller = nil
state.resp = nil
state.err = nil
state.fn = disconnectedFn

return state.fn(ctx, state)
} else {
state.resp = nil

// TODO(jdef) not super happy with this error: I don't think that mesos minds if we issue
// redundant subscribe calls. However, the state tracking mechanism in this module can't
// cope with it (e.g. we'll need to track a new stream-id, etc).
// We make a best effort to transition to a disconnected state if we detect protocol errors,
// error events, or mesos-generated "not subscribed" errors. But we don't handle things such
// as, for example, authentication errors. Granted, the initial subscribe call should fail
// if authentication is an issue, so we should never end up here. I'm not convinced there's
// not other edge cases though with respect to other error codes.
state.err = errAlreadySubscribed
return connectedFn
}
}

// (b) execute call, save the result in resp, err
Expand Down