diff --git a/api/apiclient.go b/api/apiclient.go index 4dcfdec3315..3555d91890a 100644 --- a/api/apiclient.go +++ b/api/apiclient.go @@ -49,7 +49,7 @@ var ( // state is the internal implementation of the Connection interface. type state struct { - client *rpc.Conn + client rpc.ClientConn conn *websocket.Conn // addr is the address used to connect to the API server. @@ -147,8 +147,7 @@ func open( return nil, errors.Trace(err) } - client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil) - client.Start() + client := rpc.NewClientConn(jsoncodec.NewWebsocket(conn)) bakeryClient := opts.BakeryClient if bakeryClient == nil { @@ -572,13 +571,6 @@ func (s *state) Broken() <-chan struct{} { return s.broken } -// RPCClient returns the RPC client for the state, so that testing -// functions can tickle parts of the API that the conventional entry -// points don't reach. This is exported for testing purposes only. -func (s *state) RPCClient() *rpc.Conn { - return s.client -} - // Addr returns the address used to connect to the API server. func (s *state) Addr() string { return s.addr diff --git a/api/interface.go b/api/interface.go index 0cc60c18a8c..ac5b63f6c39 100644 --- a/api/interface.go +++ b/api/interface.go @@ -29,7 +29,6 @@ import ( "github.com/juju/juju/api/uniter" "github.com/juju/juju/api/upgrader" "github.com/juju/juju/network" - "github.com/juju/juju/rpc" ) // Info encapsulates information about a server holding juju state and @@ -175,10 +174,6 @@ type Connection interface { // either expose Ping() or Broken() but not both. Ping() error - // RPCClient is apparently exported for testing purposes only, but this - // seems to indicate *some* sort of layering confusion. - RPCClient() *rpc.Conn - // I think this is actually dead code. It's tested, at least, so I'm // keeping it for now, but it's not apparently used anywhere else. AllFacadeVersions() map[string][]int diff --git a/apiserver/admin.go b/apiserver/admin.go index e63a60c445e..846599d1794 100644 --- a/apiserver/admin.go +++ b/apiserver/admin.go @@ -200,7 +200,7 @@ func (a *admin) doLogin(req params.LoginRequest, loginVersion int) (params.Login authedApi = newClientAuthRoot(authedApi, envUser) } - a.root.rpcConn.ServeFinder(authedApi, serverError) + a.root.conn.ServeFinder(authedApi, serverError) return loginResult, nil } @@ -444,7 +444,7 @@ func startPingerIfAgent(root *apiHandler, entity state.Entity) error { root.getResources().Register(&machinePinger{pinger, root.mongoUnavailable}) action := func() { - if err := root.getRpcConn().Close(); err != nil { + if err := root.conn.Close(); err != nil { logger.Errorf("error closing the RPC connection: %v", err) } } diff --git a/apiserver/apiserver.go b/apiserver/apiserver.go index 271c99bad2d..38fd98316ee 100644 --- a/apiserver/apiserver.go +++ b/apiserver/apiserver.go @@ -498,7 +498,7 @@ func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie // know we'll need it. notifier = reqNotifier } - conn := rpc.NewConn(codec, notifier) + conn := rpc.NewServerConn(codec, notifier) h, err := srv.newAPIHandler(conn, reqNotifier, modelUUID) if err != nil { @@ -518,7 +518,7 @@ func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie return conn.Close() } -func (srv *Server) newAPIHandler(conn *rpc.Conn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) { +func (srv *Server) newAPIHandler(conn rpc.ServerConn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) { // Note that we don't overwrite modelUUID here because // newAPIHandler treats an empty modelUUID as signifying // the API version used. diff --git a/apiserver/root.go b/apiserver/root.go index 8344af21368..9c197f60d95 100644 --- a/apiserver/root.go +++ b/apiserver/root.go @@ -43,7 +43,7 @@ type objectKey struct { // uses to dispatch Api calls appropriately. type apiHandler struct { state *state.State - rpcConn *rpc.Conn + conn rpc.ServerConn resources *common.Resources entity state.Entity mongoUnavailable *uint32 @@ -54,14 +54,13 @@ type apiHandler struct { modelUUID string } -var _ = (*apiHandler)(nil) - // newApiHandler returns a new apiHandler. -func newApiHandler(srv *Server, st *state.State, rpcConn *rpc.Conn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) { +// TODO(dfc) rename to newAPIHandler +func newApiHandler(srv *Server, st *state.State, conn rpc.ServerConn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) { r := &apiHandler{ state: st, resources: common.NewResources(), - rpcConn: rpcConn, + conn: conn, modelUUID: modelUUID, mongoUnavailable: &srv.mongoUnavailable, } @@ -86,10 +85,6 @@ func (r *apiHandler) getResources() *common.Resources { return r.resources } -func (r *apiHandler) getRpcConn() *rpc.Conn { - return r.rpcConn -} - // Kill implements rpc.Killer, cleaning up any resources that need // cleaning up to ensure that all outstanding requests return. func (r *apiHandler) Kill() { diff --git a/rpc/client.go b/rpc/client.go index 004b08e2509..6c61a932ee5 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -5,6 +5,7 @@ package rpc import ( "strings" + "sync/atomic" "github.com/juju/errors" ) @@ -41,25 +42,45 @@ func (e *RequestError) ErrorCode() string { return e.Code } -func (conn *Conn) send(call *Call) { - conn.sending.Lock() - defer conn.sending.Unlock() +// ClientConn represents a RPC client connection. +type ClientConn interface { + // Call invokes the named action on the object of the given type with the given + // id. The returned values will be stored in response, which should be a pointer. + // If the action fails remotely, the error will have a cause of type RequestError. + // The params value may be nil if no parameters are provided; the response value + // may be nil to indicate that any result should be discarded. + Call(req Request, params, response interface{}) error + + // Close closes the connection. + Close() error +} + +// NewClientConn returns a ClientConn for the underlying codec. +func NewClientConn(codec Codec) ClientConn { + var notifier dummyNotifier + client := newConn(codec, ¬ifier) + client.Start() + return client +} + +func (c *conn) send(call *Call) { + c.sending.Lock() + defer c.sending.Unlock() // Register this call. - conn.mutex.Lock() - if conn.dead == nil { + c.mutex.Lock() + if c.dead == nil { panic("rpc: call made when connection not started") } - if conn.closing || conn.shutdown { + if c.closing || c.shutdown { call.Error = ErrShutdown - conn.mutex.Unlock() + c.mutex.Unlock() call.done() return } - conn.reqId++ - reqId := conn.reqId - conn.clientPending[reqId] = call - conn.mutex.Unlock() + reqId := atomic.AddUint64(&c.reqId, 1) + c.clientPending[reqId] = call + c.mutex.Unlock() // Encode and send the request. hdr := &Header{ @@ -70,14 +91,14 @@ func (conn *Conn) send(call *Call) { if params == nil { params = struct{}{} } - if conn.notifier != nil { - conn.notifier.ClientRequest(hdr, params) + if c.notifier != nil { + c.notifier.ClientRequest(hdr, params) } - if err := conn.codec.WriteMessage(hdr, params); err != nil { - conn.mutex.Lock() - call = conn.clientPending[reqId] - delete(conn.clientPending, reqId) - conn.mutex.Unlock() + if err := c.codec.WriteMessage(hdr, params); err != nil { + c.mutex.Lock() + call = c.clientPending[reqId] + delete(c.clientPending, reqId) + c.mutex.Unlock() if call != nil { call.Error = err call.done() @@ -85,12 +106,12 @@ func (conn *Conn) send(call *Call) { } } -func (conn *Conn) handleResponse(hdr *Header) error { +func (c *conn) handleResponse(hdr *Header) error { reqId := hdr.RequestId - conn.mutex.Lock() - call := conn.clientPending[reqId] - delete(conn.clientPending, reqId) - conn.mutex.Unlock() + c.mutex.Lock() + call := c.clientPending[reqId] + delete(c.clientPending, reqId) + c.mutex.Unlock() var err error switch { @@ -100,10 +121,8 @@ func (conn *Conn) handleResponse(hdr *Header) error { // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. - if conn.notifier != nil { - conn.notifier.ClientReply(Request{}, hdr, nil) - } - err = conn.readBody(nil, false) + c.notifier.ClientReply(Request{}, hdr, nil) + err = c.readBody(nil, false) case hdr.Error != "": // Report rpcreflect.NoSuchMethodError with CodeNotImplemented. if strings.HasPrefix(hdr.Error, "no such request ") && hdr.ErrorCode == "" { @@ -116,16 +135,12 @@ func (conn *Conn) handleResponse(hdr *Header) error { Message: hdr.Error, Code: hdr.ErrorCode, } - err = conn.readBody(nil, false) - if conn.notifier != nil { - conn.notifier.ClientReply(call.Request, hdr, nil) - } + err = c.readBody(nil, false) + c.notifier.ClientReply(call.Request, hdr, nil) call.done() default: - err = conn.readBody(call.Response, false) - if conn.notifier != nil { - conn.notifier.ClientReply(call.Request, hdr, call.Response) - } + err = c.readBody(call.Response, false) + c.notifier.ClientReply(call.Request, hdr, call.Response) call.done() } return errors.Annotate(err, "error handling response") @@ -147,14 +162,14 @@ func (call *Call) done() { // If the action fails remotely, the error will have a cause of type RequestError. // The params value may be nil if no parameters are provided; the response value // may be nil to indicate that any result should be discarded. -func (conn *Conn) Call(req Request, params, response interface{}) error { +func (c *conn) Call(req Request, params, response interface{}) error { call := &Call{ Request: req, Params: params, Response: response, Done: make(chan *Call, 1), } - conn.send(call) + c.send(call) result := <-call.Done return errors.Trace(result.Error) } diff --git a/rpc/dispatch_test.go b/rpc/dispatch_test.go index 1e876171232..cbd388ad17d 100644 --- a/rpc/dispatch_test.go +++ b/rpc/dispatch_test.go @@ -27,7 +27,7 @@ var _ = gc.Suite(&dispatchSuite{}) func (s *dispatchSuite) SetUpSuite(c *gc.C) { rpcServer := func(ws *websocket.Conn) { codec := jsoncodec.NewWebsocket(ws) - conn := rpc.NewConn(codec, nil) + conn := rpc.NewServerConn(codec, nil) conn.Serve(&DispatchRoot{}, nil) conn.Start() diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 8d93bc47929..12e7e5f4cec 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -6,6 +6,7 @@ package rpc_test import ( "encoding/json" "fmt" + "io" "net" "reflect" "regexp" @@ -55,7 +56,7 @@ type stringVal struct { type Root struct { mu sync.Mutex - conn *rpc.Conn + conn rpc.ServerConn calls []*callInfo returnErr bool simple map[string]*SimpleMethods @@ -233,8 +234,9 @@ func (a *CallbackMethods) Factorial(x int64val) (int64val, error) { if x.I <= 1 { return int64val{1}, nil } + client := a.root.conn.(rpc.ClientConn) var r int64val - err := a.root.conn.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{x.I - 1}, &r) + err := client.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{x.I - 1}, &r) if err != nil { return int64val{}, err } @@ -416,7 +418,7 @@ func callName(narg, nret int, retErr bool) string { type testCallParams struct { // client holds the client-side of the rpc connection that // will be used to make the call. - client *rpc.Conn + client rpc.ClientConn // clientNotifier holds the notifier for the client side. clientNotifier *notifier @@ -939,7 +941,7 @@ func (*rpcSuite) TestBadCall(c *gc.C) { func testBadCall( c *gc.C, - client *rpc.Conn, + client rpc.ClientConn, clientNotifier, serverNotifier *notifier, req rpc.Request, expectedErr string, @@ -1087,11 +1089,11 @@ func (*rpcSuite) TestRootIsKilledAndCleaned(c *gc.C) { } func (*rpcSuite) TestBidirectional(c *gc.C) { - srvRoot := &Root{} - client, srvDone, _, _ := newRPCClientServer(c, srvRoot, nil, true) + var srvRoot Root + client, srvDone, _, _ := newRPCClientServer(c, &srvRoot, nil, true) defer closeClient(c, client, srvDone) - clientRoot := &Root{conn: client} - client.Serve(clientRoot, nil) + server := client.(rpc.ServerConn) + server.Serve(&Root{conn: server}, nil) var r int64val err := client.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{12}, &r) c.Assert(err, jc.ErrorIsNil) @@ -1187,13 +1189,13 @@ func chanReadError(c *gc.C, ch <-chan error, what string) error { // single client. When the server has finished serving the connection, // it sends a value on the returned channel. // If bidir is true, requests can flow in both directions. -func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidir bool) (client *rpc.Conn, srvDone chan error, clientNotifier, serverNotifier *notifier) { +func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidir bool) (rpc.ClientConn, chan error, *notifier, *notifier) { l, err := net.Listen("tcp", "127.0.0.1:0") c.Assert(err, jc.ErrorIsNil) - srvDone = make(chan error, 1) - clientNotifier = new(notifier) - serverNotifier = new(notifier) + srvDone := make(chan error, 1) + clientNotifier := new(notifier) + serverNotifier := new(notifier) go func() { conn, err := l.Accept() if err != nil { @@ -1205,7 +1207,7 @@ func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidi if bidir { role = roleBoth } - rpcConn := rpc.NewConn(NewJSONCodec(conn, role), serverNotifier) + rpcConn := rpc.NewServerConn(NewJSONCodec(conn, role), serverNotifier) if custroot, ok := root.(*CustomMethodFinder); ok { rpcConn.ServeFinder(custroot, tfErr) custroot.root.conn = rpcConn @@ -1225,12 +1227,12 @@ func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidi if bidir { role = roleBoth } - client = rpc.NewConn(NewJSONCodec(conn, role), clientNotifier) + client := rpc.NewServerConn(NewJSONCodec(conn, role), clientNotifier) client.Start() - return client, srvDone, clientNotifier, serverNotifier + return client.(rpc.ClientConn), srvDone, clientNotifier, serverNotifier } -func closeClient(c *gc.C, client *rpc.Conn, srvDone <-chan error) { +func closeClient(c *gc.C, client io.Closer, srvDone <-chan error) { err := client.Close() c.Assert(err, jc.ErrorIsNil) err = chanReadError(c, srvDone, "server done") diff --git a/rpc/server.go b/rpc/server.go index 64c23d7a2ad..b1660cdc084 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -88,11 +88,11 @@ func (hdr *Header) IsRequest() bool { // Note that we use "client request" and "server request" to name // requests initiated locally and remotely respectively. -// Conn represents an RPC endpoint. It can both initiate and receive +// conn represents an RPC endpoint. It can both initiate and receive // RPC requests. There may be multiple outstanding Calls associated // with a single Client, and a Client may be used by multiple goroutines // simultaneously. -type Conn struct { +type conn struct { // codec holds the underlying RPC connection. codec Codec @@ -107,6 +107,9 @@ type Conn struct { // It also guards shutdown. sending sync.Mutex + // reqId holds the latest client request id. + reqId uint64 + // mutex guards the following values. mutex sync.Mutex @@ -124,9 +127,6 @@ type Conn struct { // transformErrors is used to transform returned errors. transformErrors func(error) error - // reqId holds the latest client request id. - reqId uint64 - // clientPending holds all pending client requests. clientPending map[uint64]*Call @@ -186,15 +186,85 @@ type RequestNotifier interface { ClientReply(req Request, hdr *Header, body interface{}) } -// NewConn creates a new connection that uses the given codec for -// transport, but it does not start it. Conn.Start must be called before +type ServerConn interface { + + // Serve serves RPC requests on the connection by invoking methods on + // root. Note that it does not start the connection running, + // though it may be called once the connection is already started. + // + // The server executes each client request by calling a method on root + // to obtain an object to act on; then it invokes an method on that + // object with the request parameters, possibly returning some result. + // + // Methods on the root value are of the form: + // + // M(id string) (O, error) + // + // where M is an exported name, conventionally naming the object type, + // id is some identifier for the object and O is the type of the + // returned object. + // + // Methods defined on O may defined in one of the following forms, where + // T and R must be struct types. + // + // Method() + // Method() R + // Method() (R, error) + // Method() error + // Method(T) + // Method(T) R + // Method(T) (R, error) + // Method(T) error + // + // If transformErrors is non-nil, it will be called on all returned + // non-nil errors, for example to transform the errors into ServerErrors + // with specified codes. There will be a panic if transformErrors + // returns nil. + // + // Serve may be called at any time on a connection to change the + // set of methods being served by the connection. This will have + // no effect on calls that are currently being services. + // If root is nil, the connection will serve no methods. + Serve(root interface{}, transformErrors func(error) error) + + // ServeFinder serves RPC requests on the connection by invoking methods retrieved + // from root. Note that it does not start the connection running, though + // it may be called once the connection is already started. + // + // The server executes each client request by calling FindMethod to obtain a + // method to invoke. It invokes that method with the request parameters, + // possibly returning some result. + // + // root can optionally implement the Killer method. If implemented, when the + // connection is closed, root.Kill() will be called. + ServeFinder(finder MethodFinder, transformErrors func(error) error) + + // Start starts listening for requests. + Start() + + // Dead returns a channel that is closed when the connection + // has been closed or the underlying transport has received + // an error. There may still be outstanding requests. + // Dead must be called after conn.Start has been called. + Dead() <-chan struct{} + + // Close shuts down the connection. + Close() error +} + +func NewServerConn(codec Codec, notifier RequestNotifier) ServerConn { + return newConn(codec, notifier) +} + +// newConn creates a new connection that uses the given codec for +// transport, but it does not start it. conn.Start must be called before // any requests are sent or received. If notifier is non-nil, the // appropriate method will be called for every RPC request. -func NewConn(codec Codec, notifier RequestNotifier) *Conn { +func newConn(codec Codec, notifier RequestNotifier) *conn { if notifier == nil { notifier = new(dummyNotifier) } - return &Conn{ + return &conn{ codec: codec, clientPending: make(map[uint64]*Call), notifier: notifier, @@ -214,12 +284,12 @@ func (*dummyNotifier) ClientReply(Request, *Header, interface{}) // if it has already been called. By default, a connection serves no // methods. See Conn.Serve for a description of how to serve methods on // a Conn. -func (conn *Conn) Start() { - conn.mutex.Lock() - defer conn.mutex.Unlock() - if conn.dead == nil { - conn.dead = make(chan struct{}) - go conn.input() +func (c *conn) Start() { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.dead == nil { + c.dead = make(chan struct{}) + go c.input() } } @@ -260,12 +330,12 @@ func (conn *Conn) Start() { // set of methods being served by the connection. This will have // no effect on calls that are currently being services. // If root is nil, the connection will serve no methods. -func (conn *Conn) Serve(root interface{}, transformErrors func(error) error) { +func (c *conn) Serve(root interface{}, transformErrors func(error) error) { rootValue := rpcreflect.ValueOf(reflect.ValueOf(root)) if rootValue.IsValid() { - conn.serve(rootValue, root, transformErrors) + c.serve(rootValue, root, transformErrors) } else { - conn.serve(nil, nil, transformErrors) + c.serve(nil, nil, transformErrors) } } @@ -279,19 +349,19 @@ func (conn *Conn) Serve(root interface{}, transformErrors func(error) error) { // // root can optionally implement the Killer method. If implemented, when the // connection is closed, root.Kill() will be called. -func (conn *Conn) ServeFinder(finder MethodFinder, transformErrors func(error) error) { - conn.serve(finder, finder, transformErrors) +func (c *conn) ServeFinder(finder MethodFinder, transformErrors func(error) error) { + c.serve(finder, finder, transformErrors) } -func (conn *Conn) serve(methodFinder MethodFinder, root interface{}, transformErrors func(error) error) { +func (c *conn) serve(methodFinder MethodFinder, root interface{}, transformErrors func(error) error) { if transformErrors == nil { transformErrors = noopTransform } - conn.mutex.Lock() - defer conn.mutex.Unlock() - conn.methodFinder = methodFinder - conn.root = root - conn.transformErrors = transformErrors + c.mutex.Lock() + defer c.mutex.Unlock() + c.methodFinder = methodFinder + c.root = root + c.transformErrors = transformErrors } // noopTransform is used when transformErrors is not supplied to Serve. @@ -303,8 +373,8 @@ func noopTransform(err error) error { // has been closed or the underlying transport has received // an error. There may still be outstanding requests. // Dead must be called after conn.Start has been called. -func (conn *Conn) Dead() <-chan struct{} { - return conn.dead +func (c *conn) Dead() <-chan struct{} { + return c.dead } // Close closes the connection and its underlying codec; it returns when @@ -316,47 +386,47 @@ func (conn *Conn) Dead() <-chan struct{} { // completed. // // Calling Close multiple times is not an error. -func (conn *Conn) Close() error { - conn.mutex.Lock() - if conn.closing { - conn.mutex.Unlock() +func (c *conn) Close() error { + c.mutex.Lock() + if c.closing { + c.mutex.Unlock() // Golang's net/rpc returns rpc.ErrShutdown if you ask to close // a closing or shutdown connection. Our choice is that Close // is an idempotent way to ask for resources to be released and // isn't a failure if called multiple times. return nil } - conn.closing = true - conn.killRequests() - conn.mutex.Unlock() + c.closing = true + c.killRequests() + c.mutex.Unlock() // Wait for any outstanding server requests to complete // and write their replies before closing the codec. - conn.srvPending.Wait() + c.srvPending.Wait() // Closing the codec should cause the input loop to terminate. - if err := conn.codec.Close(); err != nil { + if err := c.codec.Close(); err != nil { logger.Infof("error closing codec: %v", err) } - <-conn.dead + <-c.dead - conn.mutex.Lock() - conn.cleanRoot() - conn.mutex.Unlock() + c.mutex.Lock() + c.cleanRoot() + c.mutex.Unlock() - return conn.inputLoopError + return c.inputLoopError } // Kill server requests if appropriate. Client requests will be // terminated when the input loop finishes. -func (conn *Conn) killRequests() { - if killer, ok := conn.root.(Killer); ok { +func (c *conn) killRequests() { + if killer, ok := c.root.(Killer); ok { killer.Kill() } } -func (conn *Conn) cleanRoot() { - if cleaner, ok := conn.root.(Cleaner); ok { +func (c *conn) cleanRoot() { + if cleaner, ok := c.root.(Cleaner); ok { cleaner.Cleanup() } } @@ -388,34 +458,34 @@ type Cleaner interface { // input reads messages from the connection and handles them // appropriately. -func (conn *Conn) input() { - err := conn.loop() - conn.sending.Lock() - defer conn.sending.Unlock() - conn.mutex.Lock() - defer conn.mutex.Unlock() - - if conn.closing || err == io.EOF { +func (c *conn) input() { + err := c.loop() + c.sending.Lock() + defer c.sending.Unlock() + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.closing || err == io.EOF { err = ErrShutdown } else { // Make the error available for Conn.Close to see. - conn.inputLoopError = err + c.inputLoopError = err } // Terminate all client requests. - for _, call := range conn.clientPending { + for _, call := range c.clientPending { call.Error = err call.done() } - conn.clientPending = nil - conn.shutdown = true - close(conn.dead) + c.clientPending = nil + c.shutdown = true + close(c.dead) } // loop implements the looping part of Conn.input. -func (conn *Conn) loop() error { +func (c *conn) loop() error { for { var hdr Header - err := conn.codec.ReadHeader(&hdr) + err := c.codec.ReadHeader(&hdr) switch { case err == io.EOF: // handle sentinel error specially @@ -423,35 +493,35 @@ func (conn *Conn) loop() error { case err != nil: return errors.Annotate(err, "codec.ReadHeader error") case hdr.IsRequest(): - if err := conn.handleRequest(&hdr); err != nil { + if err := c.handleRequest(&hdr); err != nil { return errors.Annotatef(err, "codec.handleRequest %#v error", hdr) } default: - if err := conn.handleResponse(&hdr); err != nil { + if err := c.handleResponse(&hdr); err != nil { return errors.Annotatef(err, "codec.handleResponse %#v error", hdr) } } } } -func (conn *Conn) readBody(resp interface{}, isRequest bool) error { +func (c *conn) readBody(resp interface{}, isRequest bool) error { if resp == nil { resp = &struct{}{} } - return conn.codec.ReadBody(resp, isRequest) + return c.codec.ReadBody(resp, isRequest) } -func (conn *Conn) handleRequest(hdr *Header) error { +func (c *conn) handleRequest(hdr *Header) error { startTime := time.Now() - req, err := conn.bindRequest(hdr) + req, err := c.bindRequest(hdr) if err != nil { - conn.notifier.ServerRequest(hdr, nil) - if err := conn.readBody(nil, true); err != nil { + c.notifier.ServerRequest(hdr, nil) + if err := c.readBody(nil, true); err != nil { return err } // We don't transform the error here. bindRequest will have // already transformed it and returned a zero req. - return conn.writeErrorResponse(hdr, err, startTime) + return c.writeErrorResponse(hdr, err, startTime) } var argp interface{} var arg reflect.Value @@ -460,8 +530,8 @@ func (conn *Conn) handleRequest(hdr *Header) error { arg = v.Elem() argp = v.Interface() } - if err := conn.readBody(argp, true); err != nil { - conn.notifier.ServerRequest(hdr, nil) + if err := c.readBody(argp, true); err != nil { + c.notifier.ServerRequest(hdr, nil) // If we get EOF, we know the connection is a // goner, so don't try to respond. if err == io.EOF || err == io.ErrUnexpectedEOF { @@ -475,30 +545,30 @@ func (conn *Conn) handleRequest(hdr *Header) error { // the error is actually a framing or syntax // problem, then the next ReadHeader should pick // up the problem and abort. - return conn.writeErrorResponse(hdr, req.transformErrors(err), startTime) + return c.writeErrorResponse(hdr, req.transformErrors(err), startTime) } if req.ParamsType() != nil { - conn.notifier.ServerRequest(hdr, arg.Interface()) + c.notifier.ServerRequest(hdr, arg.Interface()) } else { - conn.notifier.ServerRequest(hdr, struct{}{}) + c.notifier.ServerRequest(hdr, struct{}{}) } - conn.mutex.Lock() - closing := conn.closing + c.mutex.Lock() + closing := c.closing if !closing { - conn.srvPending.Add(1) - go conn.runRequest(req, arg, startTime) + c.srvPending.Add(1) + go c.runRequest(req, arg, startTime) } - conn.mutex.Unlock() + c.mutex.Unlock() if closing { // We're closing down - no new requests may be initiated. - return conn.writeErrorResponse(hdr, req.transformErrors(ErrShutdown), startTime) + return c.writeErrorResponse(hdr, req.transformErrors(ErrShutdown), startTime) } return nil } -func (conn *Conn) writeErrorResponse(reqHdr *Header, err error, startTime time.Time) error { - conn.sending.Lock() - defer conn.sending.Unlock() +func (c *conn) writeErrorResponse(reqHdr *Header, err error, startTime time.Time) error { + c.sending.Lock() + defer c.sending.Unlock() hdr := &Header{ RequestId: reqHdr.RequestId, } @@ -508,8 +578,8 @@ func (conn *Conn) writeErrorResponse(reqHdr *Header, err error, startTime time.T hdr.ErrorCode = "" } hdr.Error = err.Error() - conn.notifier.ServerReply(reqHdr.Request, hdr, struct{}{}, time.Since(startTime)) - return conn.codec.WriteMessage(hdr, struct{}{}) + c.notifier.ServerReply(reqHdr.Request, hdr, struct{}{}, time.Since(startTime)) + return c.codec.WriteMessage(hdr, struct{}{}) } // boundRequest represents an RPC request that is @@ -523,11 +593,11 @@ type boundRequest struct { // bindRequest searches for methods implementing the // request held in the given header and returns // a boundRequest that can call those methods. -func (conn *Conn) bindRequest(hdr *Header) (boundRequest, error) { - conn.mutex.Lock() - methodFinder := conn.methodFinder - transformErrors := conn.transformErrors - conn.mutex.Unlock() +func (c *conn) bindRequest(hdr *Header) (boundRequest, error) { + c.mutex.Lock() + methodFinder := c.methodFinder + transformErrors := c.transformErrors + c.mutex.Unlock() if methodFinder == nil { return boundRequest{}, errors.New("no service") @@ -552,11 +622,11 @@ func (conn *Conn) bindRequest(hdr *Header) (boundRequest, error) { } // runRequest runs the given request and sends the reply. -func (conn *Conn) runRequest(req boundRequest, arg reflect.Value, startTime time.Time) { - defer conn.srvPending.Done() +func (c *conn) runRequest(req boundRequest, arg reflect.Value, startTime time.Time) { + defer c.srvPending.Done() rv, err := req.Call(req.hdr.Request.Id, arg) if err != nil { - err = conn.writeErrorResponse(&req.hdr, req.transformErrors(err), startTime) + err = c.writeErrorResponse(&req.hdr, req.transformErrors(err), startTime) } else { hdr := &Header{ RequestId: req.hdr.RequestId, @@ -567,10 +637,10 @@ func (conn *Conn) runRequest(req boundRequest, arg reflect.Value, startTime time } else { rvi = struct{}{} } - conn.notifier.ServerReply(req.hdr.Request, hdr, rvi, time.Since(startTime)) - conn.sending.Lock() - err = conn.codec.WriteMessage(hdr, rvi) - conn.sending.Unlock() + c.notifier.ServerReply(req.hdr.Request, hdr, rvi, time.Since(startTime)) + c.sending.Lock() + err = c.codec.WriteMessage(hdr, rvi) + c.sending.Unlock() } if err != nil { logger.Errorf("error writing response: %v", err)