Skip to content

Commit

Permalink
additional debugging; restore original internal caller state upon det…
Browse files Browse the repository at this point in the history
…ecting disconnect
  • Loading branch information
James DeFelice committed Apr 16, 2017
1 parent eaeca69 commit 6e5373c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
1 change: 1 addition & 0 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Ha
state.done = true // TODO(jdef) not goroutine safe
return errors.New("mesos gave us an empty frameworkID")
}
log.Println("FrameworkID", frameworkID)
frameworkIDStore.Set(frameworkID)
}
return nil
Expand Down
22 changes: 15 additions & 7 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc {
if debug {
return func(req *http.Request) (*http.Response, error) {
if debug {
log.Println("wrapping request")
log.Println("wrapping request", *req)
}
resp, err := f(req)
if debug && err == nil {
Expand Down Expand Up @@ -99,8 +99,15 @@ func disconnectedFn(state *state) stateFn {
stateResp, stateErr := subscribeCaller.Call(state.call)
state.err = stateErr

// (d) if err != nil return disconnectedFn since we're unsubscribed
if stateErr != nil {
state.resp = nil
return disconnectedFn
}

// wrap the response: any errors processing the subscription stream should result in a
// transition to a disconnected state ASAP.
stateCaller := state.caller
state.resp = &mesos.ResponseWrapper{
Response: stateResp,
DecoderFunc: func() encoding.Decoder {
Expand All @@ -109,20 +116,16 @@ func disconnectedFn(state *state) stateFn {
err = decoder(u)
if err != nil {
state.m.Lock()
defer state.m.Unlock()
state.caller = stateCaller // restore the original caller
state.fn = disconnectedFn
state.m.Unlock()
_ = stateResp.Close() // swallow any error here
}
return
}
},
}

// (d) if err != nil return disconnectedFn since we're unsubscribed
if stateErr != nil {
return disconnectedFn
}

// (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed
state.caller = &callerTemporary{
opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID),
Expand Down Expand Up @@ -151,5 +154,10 @@ func (state *state) Call(call *scheduler.Call) (resp mesos.Response, err error)
defer state.m.Unlock()
state.call = call
state.fn = state.fn(state)

if debug && state.err != nil {
log.Print(*call, state.err)
}

return state.resp, state.err
}

0 comments on commit 6e5373c

Please sign in to comment.