From 1697ffc4596585bba077a8db83164d15c888beea Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 13 Jun 2017 16:34:38 +0800 Subject: [PATCH] apiserver/logsink: extract generic handler code Extract the logsink HTTP/websocket handler into a separate package, and tidy up the abstractions a bit. This precedes work underway to rate-limit and batch database log writes. --- apiserver/apiserver.go | 23 +- apiserver/authhttp_test.go | 41 --- apiserver/debuglog.go | 6 +- apiserver/debuglog_db_test.go | 17 +- apiserver/logsink.go | 335 +++----------------- apiserver/logsink/filewriter.go | 39 +++ apiserver/logsink/logsink.go | 225 +++++++++++++ apiserver/logsink/logsink_test.go | 185 +++++++++++ apiserver/logsink/package_test.go | 14 + apiserver/logsink_test.go | 23 +- apiserver/logstream.go | 6 +- apiserver/logstream_test.go | 25 +- apiserver/logtransfer.go | 66 ++-- apiserver/logtransfer_test.go | 33 +- apiserver/pubsub.go | 17 +- apiserver/pubsub_test.go | 7 +- apiserver/{ => websocket}/websocket.go | 47 ++- apiserver/websocket/websockettest/errors.go | 57 ++++ 18 files changed, 718 insertions(+), 448 deletions(-) create mode 100644 apiserver/logsink/filewriter.go create mode 100644 apiserver/logsink/logsink.go create mode 100644 apiserver/logsink/logsink_test.go create mode 100644 apiserver/logsink/package_test.go rename apiserver/{ => websocket}/websocket.go (52%) create mode 100644 apiserver/websocket/websockettest/errors.go diff --git a/apiserver/apiserver.go b/apiserver/apiserver.go index f79513a75b5..23bda58cc17 100644 --- a/apiserver/apiserver.go +++ b/apiserver/apiserver.go @@ -7,7 +7,6 @@ import ( "crypto/tls" "crypto/x509" "io" - "io/ioutil" "log" "net" "net/http" @@ -19,7 +18,6 @@ import ( "time" "github.com/bmizerany/pat" - "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/loggo" "github.com/juju/pubsub" @@ -35,8 +33,10 @@ import ( "github.com/juju/juju/apiserver/common" "github.com/juju/juju/apiserver/common/apihttp" "github.com/juju/juju/apiserver/facade" + "github.com/juju/juju/apiserver/logsink" "github.com/juju/juju/apiserver/observer" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" "github.com/juju/juju/resource" "github.com/juju/juju/resource/resourceadapters" "github.com/juju/juju/rpc" @@ -311,7 +311,7 @@ func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, e return nil, errors.Annotatef(err, "cannot set initial certificate") } - logSinkWriter, err := newLogSinkWriter(filepath.Join(srv.logDir, "logsink.log")) + logSinkWriter, err := logsink.NewFileWriter(filepath.Join(srv.logDir, "logsink.log")) if err != nil { return nil, errors.Annotate(err, "creating logsink writer") } @@ -510,11 +510,17 @@ func (srv *Server) endpoints() []apihttp.Endpoint { add("/model/:modeluuid/logstream", logStreamHandler) add("/model/:modeluuid/log", debugLogHandler) - logSinkHandler := newLogSinkHandler(httpCtxt, srv.logSinkWriter, newAgentLoggingStrategy) + logSinkHandler := logsink.NewHTTPHandler( + newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter), + httpCtxt.stop(), + ) add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler)) // We don't need to save the migrated logs to a logfile as well as to the DB. - logTransferHandler := newLogSinkHandler(httpCtxt, ioutil.Discard, newMigrationLoggingStrategy) + logTransferHandler := logsink.NewHTTPHandler( + newMigrationLogWriteCloserFunc(httpCtxt), + httpCtxt.stop(), + ) add("/migrate/logtransfer", srv.trackRequests(logTransferHandler)) modelRestHandler := &modelRestHandler{ @@ -752,18 +758,17 @@ func (srv *Server) apiHandler(w http.ResponseWriter, req *http.Request) { apiObserver.Join(req, connectionID) defer apiObserver.Leave() - handler := func(conn *websocket.Conn) { + websocket.Serve(w, req, func(conn *websocket.Conn) { modelUUID := req.URL.Query().Get(":modeluuid") logger.Tracef("got a request for model %q", modelUUID) if err := srv.serveConn(conn, modelUUID, apiObserver, req.Host); err != nil { logger.Errorf("error serving RPCs: %v", err) } - } - websocketServer(w, req, handler) + }) } func (srv *Server) serveConn(wsConn *websocket.Conn, modelUUID string, apiObserver observer.Observer, host string) error { - codec := jsoncodec.NewWebsocket(wsConn) + codec := jsoncodec.NewWebsocket(wsConn.Conn) conn := rpc.NewConn(codec, apiObserver) // Note that we don't overwrite modelUUID here because diff --git a/apiserver/authhttp_test.go b/apiserver/authhttp_test.go index b10b948ce58..35c18ac0b0e 100644 --- a/apiserver/authhttp_test.go +++ b/apiserver/authhttp_test.go @@ -4,7 +4,6 @@ package apiserver_test import ( - "bufio" "crypto/x509" "encoding/json" "io" @@ -118,17 +117,6 @@ func dialWebsocketFromURL(c *gc.C, server string, header http.Header) *websocket return conn } -func assertWebsocketClosed(c *gc.C, ws *websocket.Conn) { - _, _, err := ws.NextReader() - goodClose := []int{ - websocket.CloseNormalClosure, - websocket.CloseGoingAway, - websocket.CloseNoStatusReceived, - } - c.Logf("%#v", err) - c.Assert(websocket.IsCloseError(err, goodClose...), jc.IsTrue) -} - func (s *authHTTPSuite) makeURL(c *gc.C, scheme, path string, queryParams url.Values) *url.URL { url := s.baseURL(c) query := "" @@ -284,35 +272,6 @@ func (s *authHTTPSuite) uploadRequest(c *gc.C, uri string, contentType, path str }) } -// assertJSONError checks the JSON encoded error returned by the log -// and logsink APIs matches the expected value. -func assertJSONError(c *gc.C, ws *websocket.Conn, expected string) { - errResult := readJSONErrorLine(c, ws) - c.Assert(errResult.Error, gc.NotNil) - c.Assert(errResult.Error.Message, gc.Matches, expected) -} - -// assertJSONInitialErrorNil checks the JSON encoded error returned by the log -// and logsink APIs are nil. -func assertJSONInitialErrorNil(c *gc.C, ws *websocket.Conn) { - errResult := readJSONErrorLine(c, ws) - c.Assert(errResult.Error, gc.IsNil) -} - -// readJSONErrorLine returns the error line returned by the log and -// logsink APIS. -func readJSONErrorLine(c *gc.C, ws *websocket.Conn) params.ErrorResult { - messageType, reader, err := ws.NextReader() - c.Assert(err, jc.ErrorIsNil) - c.Assert(messageType, gc.Equals, websocket.TextMessage) - line, err := bufio.NewReader(reader).ReadSlice('\n') - c.Assert(err, jc.ErrorIsNil) - var errResult params.ErrorResult - err = json.Unmarshal(line, &errResult) - c.Assert(err, jc.ErrorIsNil) - return errResult -} - func assertResponse(c *gc.C, resp *http.Response, expHTTPStatus int, expContentType string) []byte { body, err := ioutil.ReadAll(resp.Body) resp.Body.Close() diff --git a/apiserver/debuglog.go b/apiserver/debuglog.go index 96f59126c0f..b2e75eab177 100644 --- a/apiserver/debuglog.go +++ b/apiserver/debuglog.go @@ -11,12 +11,12 @@ import ( "syscall" "time" - "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/loggo" "gopkg.in/juju/names.v2" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" "github.com/juju/juju/state" ) @@ -93,7 +93,7 @@ func (h *debugLogHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } } - websocketServer(w, req, handler) + websocket.Serve(w, req, handler) } func isBrokenPipe(err error) bool { @@ -131,7 +131,7 @@ func (s *debugLogSocketImpl) sendOk() { // sendError implements debugLogSocket. func (s *debugLogSocketImpl) sendError(err error) { - if sendErr := sendInitialErrorV0(s.conn, err); sendErr != nil { + if sendErr := s.conn.SendInitialErrorV0(err); sendErr != nil { logger.Errorf("closing websocket, %v", err) s.conn.Close() return diff --git a/apiserver/debuglog_db_test.go b/apiserver/debuglog_db_test.go index 035d74758f6..cfda9dfca8b 100644 --- a/apiserver/debuglog_db_test.go +++ b/apiserver/debuglog_db_test.go @@ -12,6 +12,7 @@ import ( gc "gopkg.in/check.v1" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket/websockettest" "github.com/juju/juju/testing/factory" ) @@ -26,8 +27,8 @@ var _ = gc.Suite(&debugLogDBSuite{}) func (s *debugLogDBSuite) TestBadParams(c *gc.C) { reader := s.openWebsocket(c, url.Values{"maxLines": {"foo"}}) - assertJSONError(c, reader, `maxLines value "foo" is not a valid unsigned number`) - assertWebsocketClosed(c, reader) + websockettest.AssertJSONError(c, reader, `maxLines value "foo" is not a valid unsigned number`) + websockettest.AssertWebsocketClosed(c, reader) } func (s *debugLogDBSuite) TestWithHTTP(c *gc.C) { @@ -49,8 +50,8 @@ func (s *debugLogDBSuite) TestNoAuth(c *gc.C) { conn := s.dialWebsocketInternal(c, nil, nil) defer conn.Close() - assertJSONError(c, conn, "no credentials provided") - assertWebsocketClosed(c, conn) + websockettest.AssertJSONError(c, conn, "no credentials provided") + websockettest.AssertWebsocketClosed(c, conn) } func (s *debugLogDBSuite) TestUnitLoginsRejected(c *gc.C) { @@ -59,8 +60,8 @@ func (s *debugLogDBSuite) TestUnitLoginsRejected(c *gc.C) { conn := s.dialWebsocketInternal(c, nil, header) defer conn.Close() - assertJSONError(c, conn, "tag kind unit not valid") - assertWebsocketClosed(c, conn) + websockettest.AssertJSONError(c, conn, "tag kind unit not valid") + websockettest.AssertWebsocketClosed(c, conn) } var noResultsPlease = url.Values{"maxLines": {"0"}, "noTail": {"true"}} @@ -74,7 +75,7 @@ func (s *debugLogDBSuite) TestUserLoginsAccepted(c *gc.C) { conn := s.dialWebsocketInternal(c, noResultsPlease, header) defer conn.Close() - result := readJSONErrorLine(c, conn) + result := websockettest.ReadJSONErrorLine(c, conn) c.Assert(result.Error, gc.IsNil) } @@ -87,7 +88,7 @@ func (s *debugLogDBSuite) TestMachineLoginsAccepted(c *gc.C) { conn := s.dialWebsocketInternal(c, noResultsPlease, header) defer conn.Close() - result := readJSONErrorLine(c, conn) + result := websockettest.ReadJSONErrorLine(c, conn) c.Assert(result.Error, gc.IsNil) } diff --git a/apiserver/logsink.go b/apiserver/logsink.go index c9a70a74a37..8958c3e19e2 100644 --- a/apiserver/logsink.go +++ b/apiserver/logsink.go @@ -6,64 +6,48 @@ package apiserver import ( "io" "net/http" - "os" "strings" "time" - "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/loggo" - "github.com/juju/utils" - "github.com/juju/utils/featureflag" "github.com/juju/version" "gopkg.in/juju/names.v2" - "gopkg.in/natefinch/lumberjack.v2" + "github.com/juju/juju/apiserver/logsink" "github.com/juju/juju/apiserver/params" - "github.com/juju/juju/feature" "github.com/juju/juju/state" ) -// LoggingStrategy handles the authentication and logging details for -// a particular logsink handler. -type LoggingStrategy interface { - // Authenticate should check that the request identifies the kind - // of client that is expected to be talking to this endpoint. - Authenticate(*http.Request) error - - // Start prepares any underlying loggers before sending them - // messages. This should only be called once. - Start() - - // Log writes out the given record to any backing loggers for the strategy. - Log(params.LogRecord) bool - - // Stop tells the strategy that there are no more log messages - // coming, so it can clean up any resources it holds and close any - // loggers. Once Stop has been called no more log messages can be - // written. - Stop() -} - type agentLoggingStrategy struct { - ctxt httpContext + fileLogger io.Writer + st *state.State releaser func() version version.Number entity names.Tag filePrefix string dbLogger *state.EntityDbLogger - fileLogger io.Writer } -func newAgentLoggingStrategy(ctxt httpContext, fileLogger io.Writer) LoggingStrategy { - return &agentLoggingStrategy{ctxt: ctxt, fileLogger: fileLogger} +// newAgentLogWriteCloserFunc returns a function that will create a +// logsink.LoggingStrategy given an *http.Request, that writes log +// messages to the given writer and also to the state database. +func newAgentLogWriteCloserFunc( + ctxt httpContext, + fileLogger io.Writer, +) logsink.NewLogWriteCloserFunc { + return func(req *http.Request) (logsink.LogWriteCloser, error) { + strategy := &agentLoggingStrategy{fileLogger: fileLogger} + if err := strategy.init(ctxt, req); err != nil { + return nil, errors.Annotate(err, "initialising agent logsink session") + } + return strategy, nil + } } -// Authenticate checks that this is request is from a machine -// agent. Part of LoggingStrategy. -func (s *agentLoggingStrategy) Authenticate(req *http.Request) error { - st, releaser, entity, err := s.ctxt.stateForRequestAuthenticatedAgent(req) +func (s *agentLoggingStrategy) init(ctxt httpContext, req *http.Request) error { + st, releaser, entity, err := ctxt.stateForRequestAuthenticatedAgent(req) if err != nil { return errors.Trace(err) } @@ -75,272 +59,38 @@ func (s *agentLoggingStrategy) Authenticate(req *http.Request) error { // *Juju* version be provided as part of the request. Any // attempt to open this endpoint to broader access must // address this caveat appropriately. - ver, err := jujuClientVersionFromReq(req) + ver, err := logsink.JujuClientVersionFromRequest(req) if err != nil { releaser() return errors.Trace(err) } - s.st = st s.releaser = releaser s.version = ver s.entity = entity.Tag() + s.filePrefix = st.ModelUUID() + ":" + s.dbLogger = state.NewEntityDbLogger(st, s.entity, s.version) return nil } -// Start creates the underlying DB logger. Part of LoggingStrategy. -func (s *agentLoggingStrategy) Start() { - s.filePrefix = s.st.ModelUUID() + ":" - s.dbLogger = state.NewEntityDbLogger(s.st, s.entity, s.version) -} - -// Log writes the record to the file and entity loggers. Part of -// LoggingStrategy. -func (s *agentLoggingStrategy) Log(m params.LogRecord) bool { +// WriteLog is part of the logsink.LogWriteCloser interface. +func (s *agentLoggingStrategy) WriteLog(m params.LogRecord) error { level, _ := loggo.ParseLevel(m.Level) - dbErr := s.dbLogger.Log(m.Time, m.Module, m.Location, level, m.Message) - if dbErr != nil { - logger.Errorf("logging to DB failed: %v", dbErr) - } + dbErr := errors.Annotate( + s.dbLogger.Log(m.Time, m.Module, m.Location, level, m.Message), + "logging to DB failed", + ) m.Entity = s.entity.String() - fileErr := logToFile(s.fileLogger, s.filePrefix, m) - if fileErr != nil { - logger.Errorf("logging to logsink.log failed: %v", fileErr) - } - return dbErr == nil && fileErr == nil -} - -// Stop closes the DB logger and releases the state. It doesn't close -// the file logger because that lives longer than one request. Once it -// has been called then it can't be restarted unless Authenticate has -// been called again. Part of LoggingStrategy. -func (s *agentLoggingStrategy) Stop() { - s.dbLogger.Close() - s.releaser() - // Should we clear out s.st, s.releaser, s.entity here? -} - -func newLogSinkHandler(h httpContext, w io.Writer, newStrategy func(httpContext, io.Writer) LoggingStrategy) http.Handler { - return &logSinkHandler{ctxt: h, fileLogger: w, newStrategy: newStrategy} -} - -func newLogSinkWriter(logPath string) (io.WriteCloser, error) { - if err := primeLogFile(logPath); err != nil { - // This isn't a fatal error so log and continue if priming fails. - logger.Warningf("Unable to prime %s (proceeding anyway): %v", logPath, err) - } - - return &lumberjack.Logger{ - Filename: logPath, - MaxSize: 300, // MB - MaxBackups: 2, - Compress: true, - }, nil -} - -// primeLogFile ensures the logsink log file is created with the -// correct mode and ownership. -func primeLogFile(path string) error { - f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - return errors.Trace(err) - } - f.Close() - err = utils.ChownPath(path, "syslog") - return errors.Trace(err) -} - -type logSinkHandler struct { - ctxt httpContext - newStrategy func(httpContext, io.Writer) LoggingStrategy - fileLogger io.Writer -} - -// Since the logsink only receives messages, it is possible for the other end -// to disappear without the server noticing. To fix this, we use the -// underlying websocket control messages ping/pong. Periodically the server -// writes a ping, and the other end replies with a pong. Now the tricky bit is -// that it appears in all the examples found on the interweb that it is -// possible for the control message to be sent successfully to something that -// isn't entirely alive, which is why relying on an error return from the -// write call is insufficient to mark the connection as dead. Instead the -// write and read deadlines inherent in the underlying Go networking libraries -// are used to force errors on timeouts. However the underlying network -// libraries use time.Now() to determine whether or not to send errors, so -// using a testing clock here isn't going to work. So we rely on manual -// testing, and what is defined as good practice by the library authors. -// -// Now, in theory, we should be using this ping/pong across all the websockets, -// but that is a little outside the scope of this piece of work. - -const ( - // pongDelay is how long the server will wait for a pong to be sent - // before the websocket is considered broken. - pongDelay = 90 * time.Second - - // pingPeriod is how often ping messages are sent. This should be shorter - // than the pongDelay, but not by too much. The difference here allows - // the remote endpoint 30 seconds to respond to the ping as a ping is sent - // every 60s, and when a pong is received the read deadline is advanced - // another 90s. - pingPeriod = 60 * time.Second - - // writeWait is how long the write call can take before it errors out. - writeWait = 10 * time.Second - - // For endpoints that don't support ping/pong (i.e. agents prior to 2.2-beta1) - // we will time out their connections after six hours of inactivity. - vZeroDelay = 6 * time.Hour -) - -// ServeHTTP implements the http.Handler interface. -func (h *logSinkHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - handler := func(socket *websocket.Conn) { - defer socket.Close() - strategy := h.newStrategy(h.ctxt, h.fileLogger) - err := strategy.Authenticate(req) - if err != nil { - h.sendError(socket, req, err) - return - } - endpointVersion, err := h.getVersion(req) - if err != nil { - h.sendError(socket, req, err) - return - } - - strategy.Start() - defer strategy.Stop() - - // If we get to here, no more errors to report, so we report a nil - // error. This way the first line of the socket is always a json - // formatted simple error. - h.sendError(socket, req, nil) - - // Here we configure the ping/pong handling for the websocket so the - // server can notice when the client goes away. Older versions did not - // respond to ping control messages, so don't try. - var tickChannel <-chan time.Time - if endpointVersion > 0 { - socket.SetReadDeadline(time.Now().Add(pongDelay)) - socket.SetPongHandler(func(string) error { - logger.Tracef("pong logsink %p", socket) - socket.SetReadDeadline(time.Now().Add(pongDelay)) - return nil - }) - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() - tickChannel = ticker.C - } else { - socket.SetReadDeadline(time.Now().Add(vZeroDelay)) - } - - logCh := h.receiveLogs(socket, endpointVersion) - for { - select { - case <-h.ctxt.stop(): - return - case <-tickChannel: - deadline := time.Now().Add(writeWait) - logger.Tracef("ping logsink %p", socket) - if err := socket.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil { - // This error is expected if the other end goes away. By - // returning we clean up the strategy and close the socket - // through the defer calls. - logger.Debugf("failed to write ping: %s", err) - return - } - case m, ok := <-logCh: - if !ok { - return - } - success := strategy.Log(m) - if !success { - return - } - } - } - } - websocketServer(w, req, handler) -} - -func (h *logSinkHandler) getVersion(req *http.Request) (int, error) { - verStr := req.URL.Query().Get("version") - switch verStr { - case "": - return 0, nil - case "1": - return 1, nil - default: - return 0, errors.Errorf("unknown version %q", verStr) - } -} - -func jujuClientVersionFromReq(req *http.Request) (version.Number, error) { - verStr := req.URL.Query().Get("jujuclientversion") - if verStr == "" { - return version.Zero, errors.New(`missing "jujuclientversion" in URL query`) - } - ver, err := version.Parse(verStr) - if err != nil { - return version.Zero, errors.Annotatef(err, "invalid jujuclientversion %q", verStr) - } - return ver, nil -} - -func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int) <-chan params.LogRecord { - logCh := make(chan params.LogRecord) - - go func() { - // Close the channel to signal ServeHTTP to finish. Otherwise - // we leak goroutines on client disconnect, because the server - // isn't shutting down so h.ctxt.stop() is never closed. - defer close(logCh) - var m params.LogRecord - for { - // Receive() blocks until data arrives but will also be - // unblocked when the API handler calls socket.Close as it - // finishes. - if err := socket.ReadJSON(&m); err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - logger.Debugf("logsink receive error: %v", err) - } else { - logger.Debugf("disconnected, %p", socket) - } - // Try to tell the other end we are closing. If the other end - // has already disconnected from us, this will fail, but we don't - // care that much. - socket.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - - // Send the log message. - select { - case <-h.ctxt.stop(): - return - case logCh <- m: - // If the remote end does not support ping/pong, we bump - // the read deadline everytime a message is received. - if endpointVersion == 0 { - socket.SetReadDeadline(time.Now().Add(vZeroDelay)) - } - } - } - }() - - return logCh -} - -// sendError sends a JSON-encoded error response. -func (h *logSinkHandler) sendError(ws *websocket.Conn, req *http.Request, err error) { - // There is no need to log the error for normal operators as there is nothing - // they can action. This is for developers. - if err != nil && featureflag.Enabled(feature.DeveloperMode) { - logger.Errorf("returning error from %s %s: %s", req.Method, req.URL.Path, errors.Details(err)) - } - if sendErr := sendInitialErrorV0(ws, err); sendErr != nil { - logger.Errorf("closing websocket, %v", err) - ws.Close() + fileErr := errors.Annotate( + logToFile(s.fileLogger, s.filePrefix, m), + "logging to logsink.log failed", + ) + err := dbErr + if err == nil { + err = fileErr + } else if fileErr != nil { + err = errors.Errorf("%s; %s", dbErr, fileErr) } + return err } // logToFile writes a single log message to the logsink log file. @@ -356,3 +106,12 @@ func logToFile(writer io.Writer, prefix string, m params.LogRecord) error { }, " ") + "\n")) return err } + +// Close is part of the logsink.LogWriteCloser interface. Close closes +// the DB logger and releases the state. It doesn't close the file logger +// because that lives longer than one request. +func (s *agentLoggingStrategy) Close() error { + s.dbLogger.Close() + s.releaser() + return nil +} diff --git a/apiserver/logsink/filewriter.go b/apiserver/logsink/filewriter.go new file mode 100644 index 00000000000..67874538e7a --- /dev/null +++ b/apiserver/logsink/filewriter.go @@ -0,0 +1,39 @@ +// Copyright 2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package logsink + +import ( + "io" + "os" + + "github.com/juju/errors" + "github.com/juju/utils" + "gopkg.in/natefinch/lumberjack.v2" +) + +// NewFileWriter returns an io.WriteCloser that will write log messages to disk. +func NewFileWriter(logPath string) (io.WriteCloser, error) { + if err := primeLogFile(logPath); err != nil { + // This isn't a fatal error so log and continue if priming fails. + logger.Warningf("Unable to prime %s (proceeding anyway): %v", logPath, err) + } + return &lumberjack.Logger{ + Filename: logPath, + MaxSize: 300, // MB + MaxBackups: 2, + Compress: true, + }, nil +} + +// primeLogFile ensures the logsink log file is created with the +// correct mode and ownership. +func primeLogFile(path string) error { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return errors.Trace(err) + } + f.Close() + err = utils.ChownPath(path, "syslog") + return errors.Trace(err) +} diff --git a/apiserver/logsink/logsink.go b/apiserver/logsink/logsink.go new file mode 100644 index 00000000000..3e97d247d3c --- /dev/null +++ b/apiserver/logsink/logsink.go @@ -0,0 +1,225 @@ +// Copyright 2015-2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package logsink + +import ( + "io" + "net/http" + "time" + + gorillaws "github.com/gorilla/websocket" + "github.com/juju/errors" + "github.com/juju/loggo" + "github.com/juju/utils/featureflag" + "github.com/juju/version" + + "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" + "github.com/juju/juju/feature" +) + +var logger = loggo.GetLogger("juju.apiserver.logsink") + +// LogWriteCloser provides an interface for persisting log records. +// The LogCloser's Close method should be called to release any +// resources once it is done with. +type LogWriteCloser interface { + io.Closer + + // WriteLog writes out the given log record. + WriteLog(params.LogRecord) error +} + +// NewLogWriteCloserFunc returns a new LogWriteCloser for the given http.Request. +type NewLogWriteCloserFunc func(*http.Request) (LogWriteCloser, error) + +// NewHTTPHandler returns a new http.Handler for receiving log messages over a +// websocket. +func NewHTTPHandler( + newLogWriteCloser NewLogWriteCloserFunc, + abort <-chan struct{}, +) http.Handler { + return &logSinkHandler{ + newLogWriteCloser: newLogWriteCloser, + abort: abort, + } +} + +type logSinkHandler struct { + newLogWriteCloser NewLogWriteCloserFunc + abort <-chan struct{} +} + +// Since the logsink only receives messages, it is possible for the other end +// to disappear without the server noticing. To fix this, we use the +// underlying websocket control messages ping/pong. Periodically the server +// writes a ping, and the other end replies with a pong. Now the tricky bit is +// that it appears in all the examples found on the interweb that it is +// possible for the control message to be sent successfully to something that +// isn't entirely alive, which is why relying on an error return from the +// write call is insufficient to mark the connection as dead. Instead the +// write and read deadlines inherent in the underlying Go networking libraries +// are used to force errors on timeouts. However the underlying network +// libraries use time.Now() to determine whether or not to send errors, so +// using a testing clock here isn't going to work. So we rely on manual +// testing, and what is defined as good practice by the library authors. +// +// Now, in theory, we should be using this ping/pong across all the websockets, +// but that is a little outside the scope of this piece of work. + +const ( + // For endpoints that don't support ping/pong (i.e. agents prior to 2.2-beta1) + // we will time out their connections after six hours of inactivity. + vZeroDelay = 6 * time.Hour +) + +// ServeHTTP implements the http.Handler interface. +func (h *logSinkHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + handler := func(socket *websocket.Conn) { + defer socket.Close() + endpointVersion, err := h.getVersion(req) + if err != nil { + h.sendError(socket, req, err) + return + } + writer, err := h.newLogWriteCloser(req) + if err != nil { + h.sendError(socket, req, err) + return + } + defer writer.Close() + + // If we get to here, no more errors to report, so we report a nil + // error. This way the first line of the socket is always a json + // formatted simple error. + h.sendError(socket, req, nil) + + // Here we configure the ping/pong handling for the websocket so the + // server can notice when the client goes away. Older versions did not + // respond to ping control messages, so don't try. + var tickChannel <-chan time.Time + if endpointVersion > 0 { + socket.SetReadDeadline(time.Now().Add(websocket.PongDelay)) + socket.SetPongHandler(func(string) error { + logger.Tracef("pong logsink %p", socket) + socket.SetReadDeadline(time.Now().Add(websocket.PongDelay)) + return nil + }) + ticker := time.NewTicker(websocket.PingPeriod) + defer ticker.Stop() + tickChannel = ticker.C + } else { + socket.SetReadDeadline(time.Now().Add(vZeroDelay)) + } + + logCh := h.receiveLogs(socket, endpointVersion) + for { + select { + case <-h.abort: + return + case <-tickChannel: + deadline := time.Now().Add(websocket.WriteWait) + logger.Tracef("ping logsink %p", socket) + if err := socket.WriteControl(gorillaws.PingMessage, []byte{}, deadline); err != nil { + // This error is expected if the other end goes away. By + // returning we clean up the strategy and close the socket + // through the defer calls. + logger.Debugf("failed to write ping: %s", err) + return + } + case m, ok := <-logCh: + if !ok { + return + } + if err := writer.WriteLog(m); err != nil { + h.sendError(socket, req, err) + return + } + } + } + } + websocket.Serve(w, req, handler) +} + +func (h *logSinkHandler) getVersion(req *http.Request) (int, error) { + verStr := req.URL.Query().Get("version") + switch verStr { + case "": + return 0, nil + case "1": + return 1, nil + default: + return 0, errors.Errorf("unknown version %q", verStr) + } +} + +func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int) <-chan params.LogRecord { + logCh := make(chan params.LogRecord) + + go func() { + // Close the channel to signal ServeHTTP to finish. Otherwise + // we leak goroutines on client disconnect, because the server + // isn't shutting down so h.abort is never closed. + defer close(logCh) + var m params.LogRecord + for { + // Receive() blocks until data arrives but will also be + // unblocked when the API handler calls socket.Close as it + // finishes. + if err := socket.ReadJSON(&m); err != nil { + if gorillaws.IsUnexpectedCloseError(err, gorillaws.CloseNormalClosure, gorillaws.CloseGoingAway) { + logger.Debugf("logsink receive error: %v", err) + } else { + logger.Debugf("disconnected, %p", socket) + } + // Try to tell the other end we are closing. If the other end + // has already disconnected from us, this will fail, but we don't + // care that much. + socket.WriteMessage(gorillaws.CloseMessage, []byte{}) + return + } + + // Send the log message. + select { + case <-h.abort: + return + case logCh <- m: + // If the remote end does not support ping/pong, we bump + // the read deadline everytime a message is received. + if endpointVersion == 0 { + socket.SetReadDeadline(time.Now().Add(vZeroDelay)) + } + } + } + }() + + return logCh +} + +// sendError sends a JSON-encoded error response. +func (h *logSinkHandler) sendError(ws *websocket.Conn, req *http.Request, err error) { + // There is no need to log the error for normal operators as there is nothing + // they can action. This is for developers. + if err != nil && featureflag.Enabled(feature.DeveloperMode) { + logger.Errorf("returning error from %s %s: %s", req.Method, req.URL.Path, errors.Details(err)) + } + if sendErr := ws.SendInitialErrorV0(err); sendErr != nil { + logger.Errorf("closing websocket, %v", err) + ws.Close() + } +} + +// JujuClientVersionFromRequest returns the Juju client version +// number from the HTTP request. +func JujuClientVersionFromRequest(req *http.Request) (version.Number, error) { + verStr := req.URL.Query().Get("jujuclientversion") + if verStr == "" { + return version.Zero, errors.New(`missing "jujuclientversion" in URL query`) + } + ver, err := version.Parse(verStr) + if err != nil { + return version.Zero, errors.Annotatef(err, "invalid jujuclientversion %q", verStr) + } + return ver, nil +} diff --git a/apiserver/logsink/logsink_test.go b/apiserver/logsink/logsink_test.go new file mode 100644 index 00000000000..9ff776e52f4 --- /dev/null +++ b/apiserver/logsink/logsink_test.go @@ -0,0 +1,185 @@ +// Copyright 2015-2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package logsink_test + +import ( + "errors" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/juju/loggo" + "github.com/juju/testing" + jc "github.com/juju/testing/checkers" + "github.com/juju/utils" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/apiserver/logsink" + "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket/websockettest" + coretesting "github.com/juju/juju/testing" +) + +var shortAttempt = &utils.AttemptStrategy{ + Total: coretesting.ShortWait, + Delay: 10 * time.Millisecond, +} + +var longAttempt = &utils.AttemptStrategy{ + Total: coretesting.LongWait, + Delay: 10 * time.Millisecond, +} + +type logsinkSuite struct { + testing.IsolationSuite + + srv *httptest.Server + abort chan struct{} + + mu sync.Mutex + opened int + closed int + stub testing.Stub + written chan params.LogRecord + + logs loggo.TestWriter +} + +var _ = gc.Suite(&logsinkSuite{}) + +func (s *logsinkSuite) SetUpTest(c *gc.C) { + s.IsolationSuite.SetUpTest(c) + s.abort = make(chan struct{}) + s.written = make(chan params.LogRecord, 1) + s.stub.ResetCalls() + s.srv = httptest.NewServer(logsink.NewHTTPHandler( + func(req *http.Request) (logsink.LogWriteCloser, error) { + s.stub.AddCall("Open") + return &mockLogWriteCloser{ + &s.stub, + s.written, + }, s.stub.NextErr() + }, + s.abort, + )) + s.AddCleanup(func(*gc.C) { s.srv.Close() }) +} + +func (s *logsinkSuite) dialWebsocket(c *gc.C) *websocket.Conn { + u, err := url.Parse(s.srv.URL) + c.Assert(err, jc.ErrorIsNil) + u.Scheme = "ws" + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + c.Assert(err, jc.ErrorIsNil) + s.AddCleanup(func(*gc.C) { conn.Close() }) + return conn +} + +func (s *logsinkSuite) TestSuccess(c *gc.C) { + conn := s.dialWebsocket(c) + websockettest.AssertJSONInitialErrorNil(c, conn) + + t0 := time.Date(2015, time.June, 1, 23, 2, 1, 0, time.UTC) + record := params.LogRecord{ + Time: t0, + Module: "some.where", + Location: "foo.go:42", + Level: loggo.INFO.String(), + Message: "all is well", + } + err := conn.WriteJSON(&record) + c.Assert(err, jc.ErrorIsNil) + + select { + case written, ok := <-s.written: + c.Assert(ok, jc.IsTrue) + c.Assert(written, jc.DeepEquals, record) + case <-time.After(coretesting.LongWait): + c.Fatal("timed out waiting for log record to be written") + } + select { + case <-s.written: + c.Fatal("unexpected log record") + case <-time.After(coretesting.ShortWait): + } + s.stub.CheckCallNames(c, "Open", "WriteLog") + + err = conn.Close() + c.Assert(err, jc.ErrorIsNil) + for a := longAttempt.Start(); a.Next(); { + if len(s.stub.Calls()) == 3 { + break + } + } + s.stub.CheckCallNames(c, "Open", "WriteLog", "Close") +} + +func (s *logsinkSuite) TestLogMessages(c *gc.C) { + var logs loggo.TestWriter + writer := loggo.NewMinimumLevelWriter(&logs, loggo.INFO) + c.Assert(loggo.RegisterWriter("logsink-tests", writer), jc.ErrorIsNil) + + // Open, then close connection. + conn := s.dialWebsocket(c) + websockettest.AssertJSONInitialErrorNil(c, conn) + err := conn.Close() + c.Assert(err, jc.ErrorIsNil) + + // Ensure that no error is logged when the connection is closed normally. + for a := shortAttempt.Start(); a.Next(); { + for _, log := range logs.Log() { + c.Assert(log.Level, jc.LessThan, loggo.ERROR, gc.Commentf("log: %#v", log)) + } + } +} + +func (s *logsinkSuite) TestLogOpenFails(c *gc.C) { + s.stub.SetErrors(errors.New("rats")) + conn := s.dialWebsocket(c) + websockettest.AssertJSONError(c, conn, "rats") + websockettest.AssertWebsocketClosed(c, conn) +} + +func (s *logsinkSuite) TestLogWriteFails(c *gc.C) { + s.stub.SetErrors(nil, errors.New("cannae write")) + conn := s.dialWebsocket(c) + websockettest.AssertJSONInitialErrorNil(c, conn) + + err := conn.WriteJSON(¶ms.LogRecord{}) + c.Assert(err, jc.ErrorIsNil) + + websockettest.AssertJSONError(c, conn, "cannae write") + websockettest.AssertWebsocketClosed(c, conn) +} + +func (s *logsinkSuite) TestReceiveErrorBreaksConn(c *gc.C) { + conn := s.dialWebsocket(c) + websockettest.AssertJSONInitialErrorNil(c, conn) + + // The logsink handler expects JSON messages. Send some + // junk to verify that the server closes the connection. + err := conn.WriteMessage(websocket.TextMessage, []byte("junk!")) + c.Assert(err, jc.ErrorIsNil) + + websockettest.AssertWebsocketClosed(c, conn) +} + +type mockLogWriteCloser struct { + *testing.Stub + written chan<- params.LogRecord +} + +func (m *mockLogWriteCloser) Close() error { + m.MethodCall(m, "Close") + return m.NextErr() +} + +func (m *mockLogWriteCloser) WriteLog(r params.LogRecord) error { + m.MethodCall(m, "WriteLog", r) + m.written <- r + return m.NextErr() +} diff --git a/apiserver/logsink/package_test.go b/apiserver/logsink/package_test.go new file mode 100644 index 00000000000..57450757191 --- /dev/null +++ b/apiserver/logsink/package_test.go @@ -0,0 +1,14 @@ +// Copyright 2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package logsink_test + +import ( + "testing" + + gc "gopkg.in/check.v1" +) + +func TestPackage(t *testing.T) { + gc.TestingT(t) +} diff --git a/apiserver/logsink_test.go b/apiserver/logsink_test.go index 2421082c736..dc2d1fc8d4b 100644 --- a/apiserver/logsink_test.go +++ b/apiserver/logsink_test.go @@ -21,6 +21,7 @@ import ( "gopkg.in/mgo.v2/bson" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket/websockettest" coretesting "github.com/juju/juju/testing" "github.com/juju/juju/testing/factory" "github.com/juju/juju/version" @@ -60,30 +61,30 @@ func (s *logsinkSuite) SetUpTest(c *gc.C) { func (s *logsinkSuite) TestRejectsBadModelUUID(c *gc.C) { ws := s.openWebsocketCustomPath(c, "/model/does-not-exist/logsink") - assertJSONError(c, ws, `unknown model: "does-not-exist"`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising agent logsink session: unknown model: "does-not-exist"`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logsinkSuite) TestNoAuth(c *gc.C) { - s.checkAuthFails(c, nil, "no credentials provided") + s.checkAuthFails(c, nil, "initialising agent logsink session: no credentials provided") } func (s *logsinkSuite) TestRejectsUserLogins(c *gc.C) { user := s.Factory.MakeUser(c, &factory.UserParams{Password: "sekrit"}) header := utils.BasicAuthHeader(user.Tag().String(), "sekrit") - s.checkAuthFailsWithEntityError(c, header, "tag kind user not valid") + s.checkAuthFailsWithEntityError(c, header, "initialising agent logsink session: tag kind user not valid") } func (s *logsinkSuite) TestRejectsBadPassword(c *gc.C) { header := utils.BasicAuthHeader(s.machineTag.String(), "wrong") header.Add(params.MachineNonceHeader, s.nonce) - s.checkAuthFailsWithEntityError(c, header, "invalid entity name or password") + s.checkAuthFailsWithEntityError(c, header, "initialising agent logsink session: invalid entity name or password") } func (s *logsinkSuite) TestRejectsIncorrectNonce(c *gc.C) { header := utils.BasicAuthHeader(s.machineTag.String(), s.password) header.Add(params.MachineNonceHeader, "wrong") - s.checkAuthFails(c, header, "machine 0 not provisioned") + s.checkAuthFails(c, header, "initialising agent logsink session: machine 0 not provisioned") } func (s *logsinkSuite) checkAuthFailsWithEntityError(c *gc.C, header http.Header, msg string) { @@ -93,8 +94,8 @@ func (s *logsinkSuite) checkAuthFailsWithEntityError(c *gc.C, header http.Header func (s *logsinkSuite) checkAuthFails(c *gc.C, header http.Header, message string) { conn := s.dialWebsocketInternal(c, header) defer conn.Close() - assertJSONError(c, conn, message) - assertWebsocketClosed(c, conn) + websockettest.AssertJSONError(c, conn, message) + websockettest.AssertWebsocketClosed(c, conn) } func (s *logsinkSuite) TestLogging(c *gc.C) { @@ -102,7 +103,7 @@ func (s *logsinkSuite) TestLogging(c *gc.C) { defer conn.Close() // Read back the nil error, indicating that all is well. - assertJSONInitialErrorNil(c, conn) + websockettest.AssertJSONInitialErrorNil(c, conn) t0 := time.Date(2015, time.June, 1, 23, 2, 1, 0, time.UTC) err := conn.WriteJSON(¶ms.LogRecord{ @@ -196,14 +197,14 @@ func (s *logsinkSuite) TestReceiveErrorBreaksConn(c *gc.C) { defer conn.Close() // Read back the nil error, indicating that all is well. - assertJSONInitialErrorNil(c, conn) + websockettest.AssertJSONInitialErrorNil(c, conn) // The logsink handler expects JSON messages. Send some // junk to verify that the server closes the connection. err := conn.WriteMessage(websocket.TextMessage, []byte("junk!")) c.Assert(err, jc.ErrorIsNil) - assertWebsocketClosed(c, conn) + websockettest.AssertWebsocketClosed(c, conn) } func (s *logsinkSuite) dialWebsocket(c *gc.C) *websocket.Conn { diff --git a/apiserver/logstream.go b/apiserver/logstream.go index 0ef0ca08f3e..ff426b38d21 100644 --- a/apiserver/logstream.go +++ b/apiserver/logstream.go @@ -8,12 +8,12 @@ import ( "time" "github.com/gorilla/schema" - "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/utils/clock" "github.com/juju/utils/featureflag" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" "github.com/juju/juju/feature" "github.com/juju/juju/state" ) @@ -71,7 +71,7 @@ func (h *logStreamEndpointHandler) ServeHTTP(w http.ResponseWriter, req *http.Re h.sendError(conn, req, nil) reqHandler.serveWebsocket(h.stopCh) } - websocketServer(w, req, handler) + websocket.Serve(w, req, handler) } func (h *logStreamEndpointHandler) newLogStreamRequestHandler(conn messageWriter, req *http.Request, clock clock.Clock) (rh *logStreamRequestHandler, err error) { @@ -143,7 +143,7 @@ func (h *logStreamEndpointHandler) sendError(ws *websocket.Conn, req *http.Reque if err != nil && featureflag.Enabled(feature.DeveloperMode) { logger.Errorf("returning error from %s %s: %s", req.Method, req.URL.Path, errors.Details(err)) } - if sendErr := sendInitialErrorV0(ws, err); sendErr != nil { + if sendErr := ws.SendInitialErrorV0(err); sendErr != nil { logger.Errorf("closing websocket, %v", err) ws.Close() } diff --git a/apiserver/logstream_test.go b/apiserver/logstream_test.go index f3e528de808..abfc2236a37 100644 --- a/apiserver/logstream_test.go +++ b/apiserver/logstream_test.go @@ -11,7 +11,7 @@ import ( "time" "github.com/google/go-querystring/query" - "github.com/gorilla/websocket" + gorillaws "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/loggo" "github.com/juju/testing" @@ -21,6 +21,7 @@ import ( "gopkg.in/juju/names.v2" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" "github.com/juju/juju/state" coretesting "github.com/juju/juju/testing" "github.com/juju/juju/version" @@ -162,9 +163,9 @@ func (s *LogStreamIntSuite) TestFullRequest(c *gc.C) { defer close(serverDone) defer conn.Close() - sendInitialErrorV0(conn, nil) + conn.SendInitialErrorV0(nil) handler := &logStreamRequestHandler{ - conn: conn, + conn: conn.Conn, req: req, tailer: tailer, } @@ -198,10 +199,10 @@ func (s *LogStreamIntSuite) TestFullRequest(c *gc.C) { } c.Logf("client stopped: %v", err) - if websocket.IsCloseError(err, - websocket.CloseNormalClosure, - websocket.CloseGoingAway, - websocket.CloseNoStatusReceived) { + if gorillaws.IsCloseError(err, + gorillaws.CloseNormalClosure, + gorillaws.CloseGoingAway, + gorillaws.CloseNoStatusReceived) { return // this is fine } if _, ok := err.(*net.OpError); ok { @@ -322,10 +323,10 @@ type testStreamHandler struct { } func (h *testStreamHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - websocketServer(w, req, h.handler) + websocket.Serve(w, req, h.handler) } -func newWebsocketServer(c *gc.C, h func(*websocket.Conn)) *websocket.Conn { +func newWebsocketServer(c *gc.C, h func(*websocket.Conn)) *gorillaws.Conn { listener, err := net.Listen("tcp", ":0") c.Assert(err, jc.ErrorIsNil) port := listener.Addr().(*net.TCPAddr).Port @@ -335,9 +336,9 @@ func newWebsocketServer(c *gc.C, h func(*websocket.Conn)) *websocket.Conn { return newWebsocketClient(c, port) } -func newWebsocketClient(c *gc.C, port int) *websocket.Conn { +func newWebsocketClient(c *gc.C, port int) *gorillaws.Conn { address := fmt.Sprintf("ws://localhost:%d/", port) - client, _, err := websocket.DefaultDialer.Dial(address, nil) + client, _, err := gorillaws.DefaultDialer.Dial(address, nil) if err == nil { return client } @@ -350,7 +351,7 @@ func newWebsocketClient(c *gc.C, port int) *websocket.Conn { case <-time.After(coretesting.ShortWait): } - client, _, err = websocket.DefaultDialer.Dial(address, nil) + client, _, err = gorillaws.DefaultDialer.Dial(address, nil) if err != nil { c.Logf("failed attempt to connect to %s", address) continue diff --git a/apiserver/logtransfer.go b/apiserver/logtransfer.go index ad9d5c675c5..fa4ee7c8d35 100644 --- a/apiserver/logtransfer.go +++ b/apiserver/logtransfer.go @@ -4,37 +4,42 @@ package apiserver import ( - "io" "net/http" "time" "github.com/juju/errors" "github.com/juju/loggo" + "github.com/juju/juju/apiserver/logsink" "github.com/juju/juju/apiserver/params" "github.com/juju/juju/state" ) type migrationLoggingStrategy struct { - ctxt httpContext st *state.State releaser func() filePrefix string dbLogger *state.DbLogger tracker *logTracker - fileLogger io.Writer } -func newMigrationLoggingStrategy(ctxt httpContext, fileLogger io.Writer) LoggingStrategy { - return &migrationLoggingStrategy{ctxt: ctxt, fileLogger: fileLogger} +// newMigrationLogWriteCloserFunc returns a function that will create a +// logsink.LoggingStrategy given an *http.Request, that writes log +// messages to the state database and tracks their migration. +func newMigrationLogWriteCloserFunc(ctxt httpContext) logsink.NewLogWriteCloserFunc { + return func(req *http.Request) (logsink.LogWriteCloser, error) { + strategy := &migrationLoggingStrategy{} + if err := strategy.init(ctxt, req); err != nil { + return nil, errors.Annotate(err, "initialising migration logsink session") + } + return strategy, nil + } } -// Authenticate checks that the user is a controller superuser and -// that the requested model is migrating. Part of LoggingStrategy. -func (s *migrationLoggingStrategy) Authenticate(req *http.Request) error { +func (s *migrationLoggingStrategy) init(ctxt httpContext, req *http.Request) error { // Require MigrationModeNone because logtransfer happens after the // model proper is completely imported. - st, releaser, err := s.ctxt.stateForMigration(req, state.MigrationModeNone) + st, releaser, err := ctxt.stateForMigration(req, state.MigrationModeNone) if err != nil { return errors.Trace(err) } @@ -45,50 +50,35 @@ func (s *migrationLoggingStrategy) Authenticate(req *http.Request) error { // passed, even though we don't use it anywhere at the moment - it // provides future-proofing if we need to do some kind of // conversion of log messages from an old client. - _, err = jujuClientVersionFromReq(req) + _, err = logsink.JujuClientVersionFromRequest(req) if err != nil { releaser() return errors.Trace(err) } - s.st = st + s.releaser = releaser + s.filePrefix = st.ModelUUID() + ":" + s.dbLogger = state.NewDbLogger(st) + s.tracker = newLogTracker(st) return nil } -// Start creates the destination DB logger. Part of LoggingStrategy. -func (s *migrationLoggingStrategy) Start() { - s.filePrefix = s.st.ModelUUID() + ":" - s.dbLogger = state.NewDbLogger(s.st) - s.tracker = newLogTracker(s.st) -} - -// Log writes the given record to the DB and to the backup file -// logger. Part of LoggingStrategy. -func (s *migrationLoggingStrategy) Log(m params.LogRecord) bool { +// WriteLog is part of the logsink.LogWriteCloser interface. +func (s *migrationLoggingStrategy) WriteLog(m params.LogRecord) error { level, _ := loggo.ParseLevel(m.Level) - dbErr := s.dbLogger.Log(m.Time, m.Entity, m.Module, m.Location, level, m.Message) - if dbErr == nil { - dbErr = s.tracker.Track(m.Time) - } - if dbErr != nil { - logger.Errorf("logging to DB failed: %v", dbErr) + err := s.dbLogger.Log(m.Time, m.Entity, m.Module, m.Location, level, m.Message) + if err == nil { + err = s.tracker.Track(m.Time) } - - fileErr := logToFile(s.fileLogger, s.filePrefix, m) - if fileErr != nil { - logger.Errorf("logging to file logger failed: %v", fileErr) - } - - return dbErr == nil && fileErr == nil + return errors.Annotate(err, "logging to DB failed") } -// Stop imdicates that there are no more log records coming, so we can -// release resources and close loggers. Part of LoggingStrategy. -func (s *migrationLoggingStrategy) Stop() { +// Close is part of the logsink.LogWriteCloser interface. +func (s *migrationLoggingStrategy) Close() error { s.dbLogger.Close() s.tracker.Close() s.releaser() - // Perhaps clear s.st and s.releaser? + return nil } const trackingPeriod = 2 * time.Minute diff --git a/apiserver/logtransfer_test.go b/apiserver/logtransfer_test.go index 9745417aa88..ea9081dafdf 100644 --- a/apiserver/logtransfer_test.go +++ b/apiserver/logtransfer_test.go @@ -18,6 +18,7 @@ import ( "gopkg.in/mgo.v2/bson" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket/websockettest" "github.com/juju/juju/permission" "github.com/juju/juju/state" coretesting "github.com/juju/juju/testing" @@ -82,16 +83,16 @@ func (s *logtransferSuite) dialWebsocketInternal(c *gc.C, header http.Header) *w func (s *logtransferSuite) TestRejectsMissingModelHeader(c *gc.C) { header := utils.BasicAuthHeader(s.userTag.String(), s.password) ws := s.dialWebsocketInternal(c, header) - assertJSONError(c, ws, `unknown model: ""`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising migration logsink session: unknown model: ""`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestRejectsBadMigratingModelUUID(c *gc.C) { header := utils.BasicAuthHeader(s.userTag.String(), s.password) header.Add(params.MigrationModelHTTPHeader, "does-not-exist") ws := s.dialWebsocketInternal(c, header) - assertJSONError(c, ws, `unknown model: "does-not-exist"`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising migration logsink session: unknown model: "does-not-exist"`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestRejectsInvalidVersion(c *gc.C) { @@ -101,45 +102,45 @@ func (s *logtransferSuite) TestRejectsInvalidVersion(c *gc.C) { url.RawQuery = query.Encode() conn := dialWebsocketFromURL(c, url.String(), s.makeAuthHeader()) defer conn.Close() - assertJSONError(c, conn, `^invalid jujuclientversion "blah".*`) - assertWebsocketClosed(c, conn) + websockettest.AssertJSONError(c, conn, `^initialising migration logsink session: invalid jujuclientversion "blah".*`) + websockettest.AssertWebsocketClosed(c, conn) } func (s *logtransferSuite) TestRejectsMachineLogins(c *gc.C) { header := utils.BasicAuthHeader(s.machineTag.String(), s.machinePassword) header.Add(params.MachineNonceHeader, "nonce") ws := s.dialWebsocketInternal(c, header) - assertJSONError(c, ws, `tag kind machine not valid`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising migration logsink session: tag kind machine not valid`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestRejectsBadPasword(c *gc.C) { header := utils.BasicAuthHeader(s.userTag.String(), "wrong") header.Add(params.MigrationModelHTTPHeader, s.State.ModelUUID()) ws := s.dialWebsocketInternal(c, header) - assertJSONError(c, ws, "invalid entity name or password") - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, "initialising migration logsink session: invalid entity name or password") + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestRequiresSuperUser(c *gc.C) { s.setUserAccess(c, permission.AddModelAccess) ws := s.dialWebsocketInternal(c, s.makeAuthHeader()) - assertJSONError(c, ws, `not a controller admin`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising migration logsink session: not a controller admin`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestRequiresMigrationModeNone(c *gc.C) { s.setMigrationMode(c, state.MigrationModeImporting) ws := s.dialWebsocket(c) - assertJSONError(c, ws, `model migration mode is "importing" instead of ""`) - assertWebsocketClosed(c, ws) + websockettest.AssertJSONError(c, ws, `initialising migration logsink session: model migration mode is "importing" instead of ""`) + websockettest.AssertWebsocketClosed(c, ws) } func (s *logtransferSuite) TestLogging(c *gc.C) { conn := s.dialWebsocket(c) // Read back the nil error, indicating that all is well. - assertJSONInitialErrorNil(c, conn) + websockettest.AssertJSONInitialErrorNil(c, conn) t0 := time.Date(2015, time.June, 1, 23, 2, 1, 0, time.UTC) err := conn.WriteJSON(¶ms.LogRecord{ @@ -216,7 +217,7 @@ func (s *logtransferSuite) TestTracksLastSentLogTime(c *gc.C) { conn := s.dialWebsocket(c) // Read back the nil error, indicating that all is well. - assertJSONInitialErrorNil(c, conn) + websockettest.AssertJSONInitialErrorNil(c, conn) tracker := state.NewLastSentLogTracker(s.State, s.State.ModelUUID(), "migration-logtransfer") defer tracker.Close() diff --git a/apiserver/pubsub.go b/apiserver/pubsub.go index a76c779b36f..0071eb823c5 100644 --- a/apiserver/pubsub.go +++ b/apiserver/pubsub.go @@ -7,12 +7,13 @@ import ( "net/http" "time" - "github.com/gorilla/websocket" + gorillaws "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/juju/utils/featureflag" "github.com/juju/juju/apiserver/common" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket" "github.com/juju/juju/feature" "github.com/juju/juju/state" ) @@ -78,12 +79,12 @@ func (h *pubsubHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Here we configure the ping/pong handling for the websocket so // the server can notice when the client goes away. // See the long note in logsink.go for the rationale. - socket.SetReadDeadline(time.Now().Add(pongDelay)) + socket.SetReadDeadline(time.Now().Add(websocket.PongDelay)) socket.SetPongHandler(func(string) error { - socket.SetReadDeadline(time.Now().Add(pongDelay)) + socket.SetReadDeadline(time.Now().Add(websocket.PongDelay)) return nil }) - ticker := time.NewTicker(pingPeriod) + ticker := time.NewTicker(websocket.PingPeriod) defer ticker.Stop() messageCh := h.receiveMessages(socket) @@ -92,8 +93,8 @@ func (h *pubsubHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { case <-h.ctxt.stop(): return case <-ticker.C: - deadline := time.Now().Add(writeWait) - if err := socket.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil { + deadline := time.Now().Add(websocket.WriteWait) + if err := socket.WriteControl(gorillaws.PingMessage, []byte{}, deadline); err != nil { // This error is expected if the other end goes away. By // returning we close the socket through the defer call. logger.Debugf("failed to write ping: %s", err) @@ -108,7 +109,7 @@ func (h *pubsubHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } } - websocketServer(w, req, handler) + websocket.Serve(w, req, handler) } func (h *pubsubHandler) receiveMessages(socket *websocket.Conn) <-chan params.PubSubMessage { @@ -146,7 +147,7 @@ func (h *pubsubHandler) sendError(ws *websocket.Conn, req *http.Request, err err if err != nil && featureflag.Enabled(feature.DeveloperMode) { logger.Errorf("returning error from %s %s: %s", req.Method, req.URL.Path, errors.Details(err)) } - if sendErr := sendInitialErrorV0(ws, err); sendErr != nil { + if sendErr := ws.SendInitialErrorV0(err); sendErr != nil { logger.Errorf("closing websocket, %v", err) ws.Close() return diff --git a/apiserver/pubsub_test.go b/apiserver/pubsub_test.go index def355a4581..ccbc7f9d926 100644 --- a/apiserver/pubsub_test.go +++ b/apiserver/pubsub_test.go @@ -19,6 +19,7 @@ import ( "github.com/juju/juju/apiserver" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/apiserver/websocket/websockettest" "github.com/juju/juju/state" statetesting "github.com/juju/juju/state/testing" coretesting "github.com/juju/juju/testing" @@ -96,8 +97,8 @@ func (s *pubsubSuite) TestRejectsIncorrectNonce(c *gc.C) { func (s *pubsubSuite) checkAuthFails(c *gc.C, header http.Header, message string) { conn := s.dialWebsocketInternal(c, header) defer conn.Close() - assertJSONError(c, conn, message) - assertWebsocketClosed(c, conn) + websockettest.AssertJSONError(c, conn, message) + websockettest.AssertWebsocketClosed(c, conn) } func (s *pubsubSuite) TestMessage(c *gc.C) { @@ -119,7 +120,7 @@ func (s *pubsubSuite) TestMessage(c *gc.C) { defer conn.Close() // Read back the nil error, indicating that all is well. - assertJSONInitialErrorNil(c, conn) + websockettest.AssertJSONInitialErrorNil(c, conn) message1 := params.PubSubMessage{ Topic: "first", diff --git a/apiserver/websocket.go b/apiserver/websocket/websocket.go similarity index 52% rename from apiserver/websocket.go rename to apiserver/websocket/websocket.go index 4be3aa66ff9..bf1db9a3967 100644 --- a/apiserver/websocket.go +++ b/apiserver/websocket/websocket.go @@ -1,21 +1,44 @@ -package apiserver +// Copyright 2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package websocket import ( "encoding/json" "net/http" + "time" "github.com/gorilla/websocket" "github.com/juju/errors" + "github.com/juju/loggo" "github.com/juju/juju/apiserver/common" "github.com/juju/juju/apiserver/params" ) +var logger = loggo.GetLogger("juju.apiserver.websocket") + // Use a 64k frame size for the websockets while we need to deal // with x/net/websocket connections that don't deal with recieving // fragmented messages. const websocketFrameSize = 65536 +const ( + // PongDelay is how long the server will wait for a pong to be sent + // before the websocket is considered broken. + PongDelay = 90 * time.Second + + // PingPeriod is how often ping messages are sent. This should be shorter + // than the pongDelay, but not by too much. The difference here allows + // the remote endpoint 30 seconds to respond to the ping as a ping is sent + // every 60s, and when a pong is received the read deadline is advanced + // another 90s. + PingPeriod = 60 * time.Second + + // WriteWait is how long the write call can take before it errors out. + WriteWait = 10 * time.Second +) + var websocketUpgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, // In order to deal with the remote side not handling message @@ -24,23 +47,31 @@ var websocketUpgrader = websocket.Upgrader{ WriteBufferSize: websocketFrameSize, } -func websocketServer(w http.ResponseWriter, req *http.Request, handler func(ws *websocket.Conn)) { +// Conn wraps a gorilla/websocket.Conn, providing additional Juju-specific +// functionality. +type Conn struct { + *websocket.Conn +} + +// Serve upgrades an HTTP connection to a websocket, and +// serves the given handler. +func Serve(w http.ResponseWriter, req *http.Request, handler func(ws *Conn)) { conn, err := websocketUpgrader.Upgrade(w, req, nil) if err != nil { logger.Errorf("problem initiating websocket: %v", err) return } - handler(conn) + handler(&Conn{conn}) } -// sendInitialErrorV0 writes out the error as a params.ErrorResult serialized +// SendInitialErrorV0 writes out the error as a params.ErrorResult serialized // with JSON with a new line character at the end. // -// This is a hangover from the initial debug-log streaming endoing where the +// This is a hangover from the initial debug-log streaming endpoint where the // client read the first line, and then just got a stream of data. We should // look to version the streaming endpoints to get rid of the trailing newline // character for message based connections, which is all of them now. -func sendInitialErrorV0(ws *websocket.Conn, err error) error { +func (conn *Conn) SendInitialErrorV0(err error) error { wrapped := ¶ms.ErrorResult{ Error: common.ServerError(err), } @@ -52,7 +83,7 @@ func sendInitialErrorV0(ws *websocket.Conn, err error) error { } body = append(body, '\n') - writer, err := ws.NextWriter(websocket.TextMessage) + writer, err := conn.NextWriter(websocket.TextMessage) if err != nil { return errors.Annotate(err, "problem getting writer") } @@ -61,7 +92,7 @@ func sendInitialErrorV0(ws *websocket.Conn, err error) error { if wrapped.Error != nil { // Tell the other end we are closing. - ws.WriteMessage(websocket.CloseMessage, []byte{}) + conn.WriteMessage(websocket.CloseMessage, []byte{}) } return errors.Trace(err) diff --git a/apiserver/websocket/websockettest/errors.go b/apiserver/websocket/websockettest/errors.go new file mode 100644 index 00000000000..0861b89a97b --- /dev/null +++ b/apiserver/websocket/websockettest/errors.go @@ -0,0 +1,57 @@ +// Copyright 2017 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package websockettest + +import ( + "bufio" + "encoding/json" + + "github.com/gorilla/websocket" + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/apiserver/params" +) + +// AssertWebsocketClosed checks that the given websocket connection +// is closed. +func AssertWebsocketClosed(c *gc.C, ws *websocket.Conn) { + _, _, err := ws.NextReader() + goodClose := []int{ + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived, + } + c.Logf("%#v", err) + c.Assert(websocket.IsCloseError(err, goodClose...), jc.IsTrue) +} + +// AssertJSONError checks the JSON encoded error returned by the log +// and logsink APIs matches the expected value. +func AssertJSONError(c *gc.C, ws *websocket.Conn, expected string) { + errResult := ReadJSONErrorLine(c, ws) + c.Assert(errResult.Error, gc.NotNil) + c.Assert(errResult.Error.Message, gc.Matches, expected) +} + +// AssertJSONInitialErrorNil checks the JSON encoded error returned by the log +// and logsink APIs are nil. +func AssertJSONInitialErrorNil(c *gc.C, ws *websocket.Conn) { + errResult := ReadJSONErrorLine(c, ws) + c.Assert(errResult.Error, gc.IsNil) +} + +// ReadJSONErrorLine returns the error line returned by the log and +// logsink APIS. +func ReadJSONErrorLine(c *gc.C, ws *websocket.Conn) params.ErrorResult { + messageType, reader, err := ws.NextReader() + c.Assert(err, jc.ErrorIsNil) + c.Assert(messageType, gc.Equals, websocket.TextMessage) + line, err := bufio.NewReader(reader).ReadSlice('\n') + c.Assert(err, jc.ErrorIsNil) + var errResult params.ErrorResult + err = json.Unmarshal(line, &errResult) + c.Assert(err, jc.ErrorIsNil) + return errResult +}