Fix a number of StatePool leaks #6566

Merged
merged 7 commits into from Nov 16, 2016
View
@@ -37,6 +37,7 @@ func (h *backupHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
h.sendError(resp, err)
return
}
+ defer h.ctxt.release(st)
backups, closer := newBackups(st)
defer closer.Close()
View
@@ -91,6 +91,8 @@ func (h *charmsHandler) ServePost(w http.ResponseWriter, r *http.Request) error
if err != nil {
return errors.Trace(err)
}
+ defer h.ctxt.release(st)
+
// Add a charm to the store provider.
charmURL, err := h.processPost(r, st)
if err != nil {
@@ -108,6 +110,8 @@ func (h *charmsHandler) ServeGet(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return errors.Trace(err)
}
+ h.ctxt.release(st)
+
// Retrieve or list charm files.
// Requires "url" (charm URL) and an optional "file" (the path to the
// charm file) to be included in the query. Optionally also receives an
View
@@ -80,6 +80,8 @@ func (h *debugLogHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
socket.sendError(err)
return
}
+ defer h.ctxt.release(st)
+
params, err := readDebugLogParams(req.URL.Query())
if err != nil {
socket.sendError(err)
View
@@ -140,6 +140,7 @@ func (gr *guiRouter) ensureFiles(req *http.Request) (rootDir string, hash string
if err != nil {
return "", "", errors.Annotate(err, "cannot open state")
}
+ defer gr.ctxt.release(st)
storage, err := st.GUIStorage()
if err != nil {
return "", "", errors.Annotate(err, "cannot open GUI storage")
@@ -418,6 +419,7 @@ func (h *guiArchiveHandler) handleGet(w http.ResponseWriter, req *http.Request)
if err != nil {
return errors.Annotate(err, "cannot open state")
}
+ defer h.ctxt.release(st)
storage, err := st.GUIStorage()
if err != nil {
return errors.Annotate(err, "cannot open GUI storage")
@@ -485,6 +487,7 @@ func (h *guiArchiveHandler) handlePost(w http.ResponseWriter, req *http.Request)
if err != nil {
return errors.Annotate(err, "cannot open state")
}
+ defer h.ctxt.release(st)
storage, err := st.GUIStorage()
if err != nil {
return errors.Annotate(err, "cannot open GUI storage")
@@ -560,6 +563,7 @@ func (h *guiVersionHandler) handlePut(w http.ResponseWriter, req *http.Request)
if err != nil {
return errors.Annotate(err, "cannot open state")
}
+ defer h.ctxt.release(st)
var selected params.GUIVersionRequest
decoder := json.NewDecoder(req.Body)
@@ -173,6 +173,12 @@ func (ctxt *httpContext) loginRequest(r *http.Request) (params.LoginRequest, err
}, nil
}
+// release indicates that the client doesn't need this State anymore,
+// so it can be removed from the pool if it needs to be.
+func (ctxt *httpContext) release(st *state.State) error {
+ return ctxt.srv.statePool.Release(st.ModelUUID())
+}
+
// stop returns a channel which will be closed when a handler should
// exit.
func (ctxt *httpContext) stop() <-chan struct{} {
View
@@ -68,6 +68,7 @@ func (h *logSinkHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.sendError(socket, req, err)
return
}
+ defer h.ctxt.release(st)
tag := entity.Tag()
// Note that this endpoint is agent-only. Thus the only
@@ -137,6 +138,9 @@ func (h *logSinkHandler) receiveLogs(socket *websocket.Conn) <-chan params.LogRe
logCh := make(chan params.LogRecord)
go func() {
+ // Close the channel to signal ServeHTTP to finish. Otherwise
+ // we leak goroutines on client disconnect, because the server
+ // isn't shutting down so h.ctxt.stop() is never closed.
defer close(logCh)
var m params.LogRecord
for {
View
@@ -22,19 +22,24 @@ type logStreamSource interface {
newTailer(*state.LogTailerParams) (state.LogTailer, error)
}
+type closerFunc func() error
+
// logStreamEndpointHandler takes requests to stream logs from the DB.
type logStreamEndpointHandler struct {
stopCh <-chan struct{}
- newSource func(*http.Request) (logStreamSource, error)
+ newSource func(*http.Request) (logStreamSource, closerFunc, error)
}
func newLogStreamEndpointHandler(ctxt httpContext) *logStreamEndpointHandler {
- newSource := func(req *http.Request) (logStreamSource, error) {
+ newSource := func(req *http.Request) (logStreamSource, closerFunc, error) {
st, _, err := ctxt.stateForRequestAuthenticatedAgent(req)
if err != nil {
- return nil, errors.Trace(err)
+ return nil, nil, errors.Trace(err)
+ }
+ closer := func() error {
+ return ctxt.release(st)
}
- return &logStreamState{st}, nil
+ return &logStreamState{st}, closer, nil
}
return &logStreamEndpointHandler{
stopCh: ctxt.stop(),
@@ -54,8 +59,9 @@ func (eph *logStreamEndpointHandler) ServeHTTP(w http.ResponseWriter, req *http.
defer conn.Close()
reqHandler, err := eph.newLogStreamRequestHandler(req, clock.WallClock)
if err == nil {
- defer reqHandler.tailer.Stop()
+ defer reqHandler.close()
}
+
stream, initErr := initStream(conn, err)
if initErr != nil {
logger.Debugf("failed to send initial error (%v): %v", err, initErr)
@@ -74,7 +80,7 @@ func (eph *logStreamEndpointHandler) newLogStreamRequestHandler(req *http.Reques
// Validate before authenticate because the authentication is
// dependent on the state connection that is determined during the
// validation.
- source, err := eph.newSource(req)
+ source, closer, err := eph.newSource(req)
if err != nil {
return nil, errors.Trace(err)
}
@@ -94,6 +100,7 @@ func (eph *logStreamEndpointHandler) newLogStreamRequestHandler(req *http.Reques
reqHandler := &logStreamRequestHandler{
req: req,
tailer: tailer,
+ closer: closer,
sendModelUUID: cfg.AllModels,
}
return reqHandler, nil
@@ -174,6 +181,7 @@ type logStreamRequestHandler struct {
req *http.Request
tailer state.LogTailer
sendModelUUID bool
+ closer closerFunc
stream *apiLogStream
}
@@ -203,6 +211,11 @@ func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, stream *
}
}
+func (rh logStreamRequestHandler) close() {
+ rh.tailer.Stop()
+ rh.closer()
+}
+
func initStream(conn *websocket.Conn, initial error) (*apiLogStream, error) {
stream := &apiLogStream{
conn: conn,
@@ -258,13 +258,17 @@ type stubSource struct {
ReturnNewTailer state.LogTailer
}
-func (s *stubSource) newSource(req *http.Request) (logStreamSource, error) {
+func (s *stubSource) newSource(req *http.Request) (logStreamSource, closerFunc, error) {
s.stub.AddCall("newSource", req)
if err := s.stub.NextErr(); err != nil {
- return nil, errors.Trace(err)
+ return nil, nil, errors.Trace(err)
}
- return s, nil
+ closer := func() error {
+ s.stub.AddCall("close")
+ return s.stub.NextErr()
+ }
+ return s, closer, nil
}
func (s *stubSource) getStart(sink string, allModels bool) (time.Time, error) {
@@ -47,6 +47,7 @@ func (h *registerUserHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
}
return
}
+ defer h.ctxt.release(st)
userTag, response, err := h.processPost(req, st)
if err != nil {
if err := sendError(w, err); err != nil {
View
@@ -45,6 +45,7 @@ func (h *toolsDownloadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
return
}
+ defer h.ctxt.release(st)
switch r.Method {
case "GET":
@@ -76,6 +77,7 @@ func (h *toolsUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}
+ defer h.ctxt.release(st)
switch r.Method {
case "POST":
View
@@ -1264,6 +1264,7 @@ func (b *allModelWatcherStateBacking) Changed(all *multiwatcherStore, change wat
}
return errors.Trace(err)
}
+ defer b.stPool.Release(modelUUID)
col, closer := st.getCollection(c.name)
defer closer()