Skip to content

Commit

Permalink
Merge pull request #7496 from axw/apiserver-logsink
Browse files Browse the repository at this point in the history
apiserver/logsink: extract generic handler code

## Description of change

Extract the logsink HTTP/websocket handler into
a separate package, and tidy up the abstractions
a bit. This precedes work underway to rate-limit
and batch database log writes.

!!DO NOT LAND UNTIL 2.2 IS RELEASED!!

## QA steps

1. juju bootstrap
2. juju debug-log -m controller

## Documentation changes

None.

## Bug reference

None.
  • Loading branch information
jujubot committed Jun 14, 2017
2 parents 8e073d3 + 1697ffc commit a78d770
Show file tree
Hide file tree
Showing 18 changed files with 718 additions and 448 deletions.
23 changes: 14 additions & 9 deletions apiserver/apiserver.go
Expand Up @@ -7,7 +7,6 @@ import (
"crypto/tls"
"crypto/x509"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand All @@ -19,7 +18,6 @@ import (
"time"

"github.com/bmizerany/pat"
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
Expand All @@ -36,8 +34,10 @@ import (
"github.com/juju/juju/apiserver/common"
"github.com/juju/juju/apiserver/common/apihttp"
"github.com/juju/juju/apiserver/facade"
"github.com/juju/juju/apiserver/logsink"
"github.com/juju/juju/apiserver/observer"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/apiserver/websocket"
"github.com/juju/juju/resource"
"github.com/juju/juju/resource/resourceadapters"
"github.com/juju/juju/rpc"
Expand Down Expand Up @@ -317,7 +317,7 @@ func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, e
return nil, errors.Annotatef(err, "cannot set initial certificate")
}

logSinkWriter, err := newLogSinkWriter(filepath.Join(srv.logDir, "logsink.log"))
logSinkWriter, err := logsink.NewFileWriter(filepath.Join(srv.logDir, "logsink.log"))
if err != nil {
return nil, errors.Annotate(err, "creating logsink writer")
}
Expand Down Expand Up @@ -555,11 +555,17 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
add("/model/:modeluuid/logstream", logStreamHandler)
add("/model/:modeluuid/log", debugLogHandler)

logSinkHandler := newLogSinkHandler(httpCtxt, srv.logSinkWriter, newAgentLoggingStrategy)
logSinkHandler := logsink.NewHTTPHandler(
newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter),
httpCtxt.stop(),
)
add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler))

// We don't need to save the migrated logs to a logfile as well as to the DB.
logTransferHandler := newLogSinkHandler(httpCtxt, ioutil.Discard, newMigrationLoggingStrategy)
logTransferHandler := logsink.NewHTTPHandler(
newMigrationLogWriteCloserFunc(httpCtxt),
httpCtxt.stop(),
)
add("/migrate/logtransfer", srv.trackRequests(logTransferHandler))

modelRestHandler := &modelRestHandler{
Expand Down Expand Up @@ -798,18 +804,17 @@ func (srv *Server) apiHandler(w http.ResponseWriter, req *http.Request) {
apiObserver.Join(req, connectionID)
defer apiObserver.Leave()

handler := func(conn *websocket.Conn) {
websocket.Serve(w, req, func(conn *websocket.Conn) {
modelUUID := req.URL.Query().Get(":modeluuid")
logger.Tracef("got a request for model %q", modelUUID)
if err := srv.serveConn(conn, modelUUID, apiObserver, req.Host); err != nil {
logger.Errorf("error serving RPCs: %v", err)
}
}
websocketServer(w, req, handler)
})
}

func (srv *Server) serveConn(wsConn *websocket.Conn, modelUUID string, apiObserver observer.Observer, host string) error {
codec := jsoncodec.NewWebsocket(wsConn)
codec := jsoncodec.NewWebsocket(wsConn.Conn)
conn := rpc.NewConn(codec, apiObserver)

// Note that we don't overwrite modelUUID here because
Expand Down
41 changes: 0 additions & 41 deletions apiserver/authhttp_test.go
Expand Up @@ -4,7 +4,6 @@
package apiserver_test

import (
"bufio"
"crypto/x509"
"encoding/json"
"io"
Expand Down Expand Up @@ -118,17 +117,6 @@ func dialWebsocketFromURL(c *gc.C, server string, header http.Header) *websocket
return conn
}

func assertWebsocketClosed(c *gc.C, ws *websocket.Conn) {
_, _, err := ws.NextReader()
goodClose := []int{
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
}
c.Logf("%#v", err)
c.Assert(websocket.IsCloseError(err, goodClose...), jc.IsTrue)
}

func (s *authHTTPSuite) makeURL(c *gc.C, scheme, path string, queryParams url.Values) *url.URL {
url := s.baseURL(c)
query := ""
Expand Down Expand Up @@ -284,35 +272,6 @@ func (s *authHTTPSuite) uploadRequest(c *gc.C, uri string, contentType, path str
})
}

// assertJSONError checks the JSON encoded error returned by the log
// and logsink APIs matches the expected value.
func assertJSONError(c *gc.C, ws *websocket.Conn, expected string) {
errResult := readJSONErrorLine(c, ws)
c.Assert(errResult.Error, gc.NotNil)
c.Assert(errResult.Error.Message, gc.Matches, expected)
}

// assertJSONInitialErrorNil checks the JSON encoded error returned by the log
// and logsink APIs are nil.
func assertJSONInitialErrorNil(c *gc.C, ws *websocket.Conn) {
errResult := readJSONErrorLine(c, ws)
c.Assert(errResult.Error, gc.IsNil)
}

// readJSONErrorLine returns the error line returned by the log and
// logsink APIS.
func readJSONErrorLine(c *gc.C, ws *websocket.Conn) params.ErrorResult {
messageType, reader, err := ws.NextReader()
c.Assert(err, jc.ErrorIsNil)
c.Assert(messageType, gc.Equals, websocket.TextMessage)
line, err := bufio.NewReader(reader).ReadSlice('\n')
c.Assert(err, jc.ErrorIsNil)
var errResult params.ErrorResult
err = json.Unmarshal(line, &errResult)
c.Assert(err, jc.ErrorIsNil)
return errResult
}

func assertResponse(c *gc.C, resp *http.Response, expHTTPStatus int, expContentType string) []byte {
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expand Down
6 changes: 3 additions & 3 deletions apiserver/debuglog.go
Expand Up @@ -11,12 +11,12 @@ import (
"syscall"
"time"

"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
"gopkg.in/juju/names.v2"

"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/apiserver/websocket"
"github.com/juju/juju/state"
)

Expand Down Expand Up @@ -93,7 +93,7 @@ func (h *debugLogHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
}
websocketServer(w, req, handler)
websocket.Serve(w, req, handler)
}

func isBrokenPipe(err error) bool {
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *debugLogSocketImpl) sendOk() {

// sendError implements debugLogSocket.
func (s *debugLogSocketImpl) sendError(err error) {
if sendErr := sendInitialErrorV0(s.conn, err); sendErr != nil {
if sendErr := s.conn.SendInitialErrorV0(err); sendErr != nil {
logger.Errorf("closing websocket, %v", err)
s.conn.Close()
return
Expand Down
17 changes: 9 additions & 8 deletions apiserver/debuglog_db_test.go
Expand Up @@ -12,6 +12,7 @@ import (
gc "gopkg.in/check.v1"

"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/apiserver/websocket/websockettest"
"github.com/juju/juju/testing/factory"
)

Expand All @@ -26,8 +27,8 @@ var _ = gc.Suite(&debugLogDBSuite{})

func (s *debugLogDBSuite) TestBadParams(c *gc.C) {
reader := s.openWebsocket(c, url.Values{"maxLines": {"foo"}})
assertJSONError(c, reader, `maxLines value "foo" is not a valid unsigned number`)
assertWebsocketClosed(c, reader)
websockettest.AssertJSONError(c, reader, `maxLines value "foo" is not a valid unsigned number`)
websockettest.AssertWebsocketClosed(c, reader)
}

func (s *debugLogDBSuite) TestWithHTTP(c *gc.C) {
Expand All @@ -49,8 +50,8 @@ func (s *debugLogDBSuite) TestNoAuth(c *gc.C) {
conn := s.dialWebsocketInternal(c, nil, nil)
defer conn.Close()

assertJSONError(c, conn, "no credentials provided")
assertWebsocketClosed(c, conn)
websockettest.AssertJSONError(c, conn, "no credentials provided")
websockettest.AssertWebsocketClosed(c, conn)
}

func (s *debugLogDBSuite) TestUnitLoginsRejected(c *gc.C) {
Expand All @@ -59,8 +60,8 @@ func (s *debugLogDBSuite) TestUnitLoginsRejected(c *gc.C) {
conn := s.dialWebsocketInternal(c, nil, header)
defer conn.Close()

assertJSONError(c, conn, "tag kind unit not valid")
assertWebsocketClosed(c, conn)
websockettest.AssertJSONError(c, conn, "tag kind unit not valid")
websockettest.AssertWebsocketClosed(c, conn)
}

var noResultsPlease = url.Values{"maxLines": {"0"}, "noTail": {"true"}}
Expand All @@ -74,7 +75,7 @@ func (s *debugLogDBSuite) TestUserLoginsAccepted(c *gc.C) {
conn := s.dialWebsocketInternal(c, noResultsPlease, header)
defer conn.Close()

result := readJSONErrorLine(c, conn)
result := websockettest.ReadJSONErrorLine(c, conn)
c.Assert(result.Error, gc.IsNil)
}

Expand All @@ -87,7 +88,7 @@ func (s *debugLogDBSuite) TestMachineLoginsAccepted(c *gc.C) {
conn := s.dialWebsocketInternal(c, noResultsPlease, header)
defer conn.Close()

result := readJSONErrorLine(c, conn)
result := websockettest.ReadJSONErrorLine(c, conn)
c.Assert(result.Error, gc.IsNil)
}

Expand Down

0 comments on commit a78d770

Please sign in to comment.