diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index 27a856ad..f3e74986 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -125,6 +125,7 @@ func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Ha state.done = true // TODO(jdef) not goroutine safe return errors.New("mesos gave us an empty frameworkID") } + log.Println("FrameworkID", frameworkID) frameworkIDStore.Set(frameworkID) } return nil diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 4b7a4fc9..64d12290 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -45,7 +45,7 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { if debug { return func(req *http.Request) (*http.Response, error) { if debug { - log.Println("wrapping request") + log.Println("wrapping request", *req) } resp, err := f(req) if debug && err == nil { @@ -99,8 +99,15 @@ func disconnectedFn(state *state) stateFn { stateResp, stateErr := subscribeCaller.Call(state.call) state.err = stateErr + // (d) if err != nil return disconnectedFn since we're unsubscribed + if stateErr != nil { + state.resp = nil + return disconnectedFn + } + // wrap the response: any errors processing the subscription stream should result in a // transition to a disconnected state ASAP. + stateCaller := state.caller state.resp = &mesos.ResponseWrapper{ Response: stateResp, DecoderFunc: func() encoding.Decoder { @@ -109,8 +116,9 @@ func disconnectedFn(state *state) stateFn { err = decoder(u) if err != nil { state.m.Lock() + defer state.m.Unlock() + state.caller = stateCaller // restore the original caller state.fn = disconnectedFn - state.m.Unlock() _ = stateResp.Close() // swallow any error here } return @@ -118,11 +126,6 @@ func disconnectedFn(state *state) stateFn { }, } - // (d) if err != nil return disconnectedFn since we're unsubscribed - if stateErr != nil { - return disconnectedFn - } - // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID), @@ -151,5 +154,10 @@ func (state *state) Call(call *scheduler.Call) (resp mesos.Response, err error) defer state.m.Unlock() state.call = call state.fn = state.fn(state) + + if debug && state.err != nil { + log.Print(*call, state.err) + } + return state.resp, state.err }