Skip to content

Commit

Permalink
Merge pull request #4905 from davecheney/rpc-client-server-refactor
Browse files Browse the repository at this point in the history
rpc: introduce ClientConn and ServerConn interfaces

This PR is in preparation for adding tests for retrying temporary RPC errors.

It introduces two new interface types, `rpc.ClientConn` and `rpc.ServerConn` whose purposes are self explanatory. Also provide constructor functions for these new interface types as `NewConn` has been unexported.

Remove the ability to provide a RequestNotifier when requesting a ClientConn, this was never used by any non test caller and is only part of the bidirectional code for the rpc package, which is also unused by juju.

Adjust the callers in the `api` and `apiserver` packages to these new interfaces, very few changes were needed apart from adjusting the types in various wrapper types.

The bidirection nature of the rpc package remains, but if needed the caller must assert their ServerConn value to a ClientConn, or vice versa. No code in Juju does this, but the rpc tests assert this behaviour

Finally, unexport `rpc.Conn`.
  • Loading branch information
jujubot committed Apr 4, 2016
2 parents 93cd461 + d3d8d73 commit da28cfa
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 180 deletions.
12 changes: 2 additions & 10 deletions api/apiclient.go
Expand Up @@ -49,7 +49,7 @@ var (

// state is the internal implementation of the Connection interface.
type state struct {
client *rpc.Conn
client rpc.ClientConn
conn *websocket.Conn

// addr is the address used to connect to the API server.
Expand Down Expand Up @@ -147,8 +147,7 @@ func open(
return nil, errors.Trace(err)
}

client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil)
client.Start()
client := rpc.NewClientConn(jsoncodec.NewWebsocket(conn))

bakeryClient := opts.BakeryClient
if bakeryClient == nil {
Expand Down Expand Up @@ -572,13 +571,6 @@ func (s *state) Broken() <-chan struct{} {
return s.broken
}

// RPCClient returns the RPC client for the state, so that testing
// functions can tickle parts of the API that the conventional entry
// points don't reach. This is exported for testing purposes only.
func (s *state) RPCClient() *rpc.Conn {
return s.client
}

// Addr returns the address used to connect to the API server.
func (s *state) Addr() string {
return s.addr
Expand Down
5 changes: 0 additions & 5 deletions api/interface.go
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/juju/juju/api/uniter"
"github.com/juju/juju/api/upgrader"
"github.com/juju/juju/network"
"github.com/juju/juju/rpc"
)

// Info encapsulates information about a server holding juju state and
Expand Down Expand Up @@ -175,10 +174,6 @@ type Connection interface {
// either expose Ping() or Broken() but not both.
Ping() error

// RPCClient is apparently exported for testing purposes only, but this
// seems to indicate *some* sort of layering confusion.
RPCClient() *rpc.Conn

// I think this is actually dead code. It's tested, at least, so I'm
// keeping it for now, but it's not apparently used anywhere else.
AllFacadeVersions() map[string][]int
Expand Down
4 changes: 2 additions & 2 deletions apiserver/admin.go
Expand Up @@ -200,7 +200,7 @@ func (a *admin) doLogin(req params.LoginRequest, loginVersion int) (params.Login
authedApi = newClientAuthRoot(authedApi, envUser)
}

a.root.rpcConn.ServeFinder(authedApi, serverError)
a.root.conn.ServeFinder(authedApi, serverError)

return loginResult, nil
}
Expand Down Expand Up @@ -444,7 +444,7 @@ func startPingerIfAgent(root *apiHandler, entity state.Entity) error {

root.getResources().Register(&machinePinger{pinger, root.mongoUnavailable})
action := func() {
if err := root.getRpcConn().Close(); err != nil {
if err := root.conn.Close(); err != nil {
logger.Errorf("error closing the RPC connection: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions apiserver/apiserver.go
Expand Up @@ -498,7 +498,7 @@ func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie
// know we'll need it.
notifier = reqNotifier
}
conn := rpc.NewConn(codec, notifier)
conn := rpc.NewServerConn(codec, notifier)

h, err := srv.newAPIHandler(conn, reqNotifier, modelUUID)
if err != nil {
Expand All @@ -518,7 +518,7 @@ func (srv *Server) serveConn(wsConn *websocket.Conn, reqNotifier *requestNotifie
return conn.Close()
}

func (srv *Server) newAPIHandler(conn *rpc.Conn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) {
func (srv *Server) newAPIHandler(conn rpc.ServerConn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) {
// Note that we don't overwrite modelUUID here because
// newAPIHandler treats an empty modelUUID as signifying
// the API version used.
Expand Down
13 changes: 4 additions & 9 deletions apiserver/root.go
Expand Up @@ -43,7 +43,7 @@ type objectKey struct {
// uses to dispatch Api calls appropriately.
type apiHandler struct {
state *state.State
rpcConn *rpc.Conn
conn rpc.ServerConn
resources *common.Resources
entity state.Entity
mongoUnavailable *uint32
Expand All @@ -54,14 +54,13 @@ type apiHandler struct {
modelUUID string
}

var _ = (*apiHandler)(nil)

// newApiHandler returns a new apiHandler.
func newApiHandler(srv *Server, st *state.State, rpcConn *rpc.Conn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) {
// TODO(dfc) rename to newAPIHandler
func newApiHandler(srv *Server, st *state.State, conn rpc.ServerConn, reqNotifier *requestNotifier, modelUUID string) (*apiHandler, error) {
r := &apiHandler{
state: st,
resources: common.NewResources(),
rpcConn: rpcConn,
conn: conn,
modelUUID: modelUUID,
mongoUnavailable: &srv.mongoUnavailable,
}
Expand All @@ -86,10 +85,6 @@ func (r *apiHandler) getResources() *common.Resources {
return r.resources
}

func (r *apiHandler) getRpcConn() *rpc.Conn {
return r.rpcConn
}

// Kill implements rpc.Killer, cleaning up any resources that need
// cleaning up to ensure that all outstanding requests return.
func (r *apiHandler) Kill() {
Expand Down
89 changes: 52 additions & 37 deletions rpc/client.go
Expand Up @@ -5,6 +5,7 @@ package rpc

import (
"strings"
"sync/atomic"

"github.com/juju/errors"
)
Expand Down Expand Up @@ -41,25 +42,45 @@ func (e *RequestError) ErrorCode() string {
return e.Code
}

func (conn *Conn) send(call *Call) {
conn.sending.Lock()
defer conn.sending.Unlock()
// ClientConn represents a RPC client connection.
type ClientConn interface {
// Call invokes the named action on the object of the given type with the given
// id. The returned values will be stored in response, which should be a pointer.
// If the action fails remotely, the error will have a cause of type RequestError.
// The params value may be nil if no parameters are provided; the response value
// may be nil to indicate that any result should be discarded.
Call(req Request, params, response interface{}) error

// Close closes the connection.
Close() error
}

// NewClientConn returns a ClientConn for the underlying codec.
func NewClientConn(codec Codec) ClientConn {
var notifier dummyNotifier
client := newConn(codec, &notifier)
client.Start()
return client
}

func (c *conn) send(call *Call) {
c.sending.Lock()
defer c.sending.Unlock()

// Register this call.
conn.mutex.Lock()
if conn.dead == nil {
c.mutex.Lock()
if c.dead == nil {
panic("rpc: call made when connection not started")
}
if conn.closing || conn.shutdown {
if c.closing || c.shutdown {
call.Error = ErrShutdown
conn.mutex.Unlock()
c.mutex.Unlock()
call.done()
return
}
conn.reqId++
reqId := conn.reqId
conn.clientPending[reqId] = call
conn.mutex.Unlock()
reqId := atomic.AddUint64(&c.reqId, 1)
c.clientPending[reqId] = call
c.mutex.Unlock()

// Encode and send the request.
hdr := &Header{
Expand All @@ -70,27 +91,27 @@ func (conn *Conn) send(call *Call) {
if params == nil {
params = struct{}{}
}
if conn.notifier != nil {
conn.notifier.ClientRequest(hdr, params)
if c.notifier != nil {
c.notifier.ClientRequest(hdr, params)
}
if err := conn.codec.WriteMessage(hdr, params); err != nil {
conn.mutex.Lock()
call = conn.clientPending[reqId]
delete(conn.clientPending, reqId)
conn.mutex.Unlock()
if err := c.codec.WriteMessage(hdr, params); err != nil {
c.mutex.Lock()
call = c.clientPending[reqId]
delete(c.clientPending, reqId)
c.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

func (conn *Conn) handleResponse(hdr *Header) error {
func (c *conn) handleResponse(hdr *Header) error {
reqId := hdr.RequestId
conn.mutex.Lock()
call := conn.clientPending[reqId]
delete(conn.clientPending, reqId)
conn.mutex.Unlock()
c.mutex.Lock()
call := c.clientPending[reqId]
delete(c.clientPending, reqId)
c.mutex.Unlock()

var err error
switch {
Expand All @@ -100,10 +121,8 @@ func (conn *Conn) handleResponse(hdr *Header) error {
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
if conn.notifier != nil {
conn.notifier.ClientReply(Request{}, hdr, nil)
}
err = conn.readBody(nil, false)
c.notifier.ClientReply(Request{}, hdr, nil)
err = c.readBody(nil, false)
case hdr.Error != "":
// Report rpcreflect.NoSuchMethodError with CodeNotImplemented.
if strings.HasPrefix(hdr.Error, "no such request ") && hdr.ErrorCode == "" {
Expand All @@ -116,16 +135,12 @@ func (conn *Conn) handleResponse(hdr *Header) error {
Message: hdr.Error,
Code: hdr.ErrorCode,
}
err = conn.readBody(nil, false)
if conn.notifier != nil {
conn.notifier.ClientReply(call.Request, hdr, nil)
}
err = c.readBody(nil, false)
c.notifier.ClientReply(call.Request, hdr, nil)
call.done()
default:
err = conn.readBody(call.Response, false)
if conn.notifier != nil {
conn.notifier.ClientReply(call.Request, hdr, call.Response)
}
err = c.readBody(call.Response, false)
c.notifier.ClientReply(call.Request, hdr, call.Response)
call.done()
}
return errors.Annotate(err, "error handling response")
Expand All @@ -147,14 +162,14 @@ func (call *Call) done() {
// If the action fails remotely, the error will have a cause of type RequestError.
// The params value may be nil if no parameters are provided; the response value
// may be nil to indicate that any result should be discarded.
func (conn *Conn) Call(req Request, params, response interface{}) error {
func (c *conn) Call(req Request, params, response interface{}) error {
call := &Call{
Request: req,
Params: params,
Response: response,
Done: make(chan *Call, 1),
}
conn.send(call)
c.send(call)
result := <-call.Done
return errors.Trace(result.Error)
}
2 changes: 1 addition & 1 deletion rpc/dispatch_test.go
Expand Up @@ -27,7 +27,7 @@ var _ = gc.Suite(&dispatchSuite{})
func (s *dispatchSuite) SetUpSuite(c *gc.C) {
rpcServer := func(ws *websocket.Conn) {
codec := jsoncodec.NewWebsocket(ws)
conn := rpc.NewConn(codec, nil)
conn := rpc.NewServerConn(codec, nil)

conn.Serve(&DispatchRoot{}, nil)
conn.Start()
Expand Down
34 changes: 18 additions & 16 deletions rpc/rpc_test.go
Expand Up @@ -6,6 +6,7 @@ package rpc_test
import (
"encoding/json"
"fmt"
"io"
"net"
"reflect"
"regexp"
Expand Down Expand Up @@ -55,7 +56,7 @@ type stringVal struct {

type Root struct {
mu sync.Mutex
conn *rpc.Conn
conn rpc.ServerConn
calls []*callInfo
returnErr bool
simple map[string]*SimpleMethods
Expand Down Expand Up @@ -233,8 +234,9 @@ func (a *CallbackMethods) Factorial(x int64val) (int64val, error) {
if x.I <= 1 {
return int64val{1}, nil
}
client := a.root.conn.(rpc.ClientConn)
var r int64val
err := a.root.conn.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{x.I - 1}, &r)
err := client.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{x.I - 1}, &r)
if err != nil {
return int64val{}, err
}
Expand Down Expand Up @@ -416,7 +418,7 @@ func callName(narg, nret int, retErr bool) string {
type testCallParams struct {
// client holds the client-side of the rpc connection that
// will be used to make the call.
client *rpc.Conn
client rpc.ClientConn

// clientNotifier holds the notifier for the client side.
clientNotifier *notifier
Expand Down Expand Up @@ -939,7 +941,7 @@ func (*rpcSuite) TestBadCall(c *gc.C) {

func testBadCall(
c *gc.C,
client *rpc.Conn,
client rpc.ClientConn,
clientNotifier, serverNotifier *notifier,
req rpc.Request,
expectedErr string,
Expand Down Expand Up @@ -1087,11 +1089,11 @@ func (*rpcSuite) TestRootIsKilledAndCleaned(c *gc.C) {
}

func (*rpcSuite) TestBidirectional(c *gc.C) {
srvRoot := &Root{}
client, srvDone, _, _ := newRPCClientServer(c, srvRoot, nil, true)
var srvRoot Root
client, srvDone, _, _ := newRPCClientServer(c, &srvRoot, nil, true)
defer closeClient(c, client, srvDone)
clientRoot := &Root{conn: client}
client.Serve(clientRoot, nil)
server := client.(rpc.ServerConn)
server.Serve(&Root{conn: server}, nil)
var r int64val
err := client.Call(rpc.Request{"CallbackMethods", 0, "", "Factorial"}, int64val{12}, &r)
c.Assert(err, jc.ErrorIsNil)
Expand Down Expand Up @@ -1187,13 +1189,13 @@ func chanReadError(c *gc.C, ch <-chan error, what string) error {
// single client. When the server has finished serving the connection,
// it sends a value on the returned channel.
// If bidir is true, requests can flow in both directions.
func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidir bool) (client *rpc.Conn, srvDone chan error, clientNotifier, serverNotifier *notifier) {
func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidir bool) (rpc.ClientConn, chan error, *notifier, *notifier) {
l, err := net.Listen("tcp", "127.0.0.1:0")
c.Assert(err, jc.ErrorIsNil)

srvDone = make(chan error, 1)
clientNotifier = new(notifier)
serverNotifier = new(notifier)
srvDone := make(chan error, 1)
clientNotifier := new(notifier)
serverNotifier := new(notifier)
go func() {
conn, err := l.Accept()
if err != nil {
Expand All @@ -1205,7 +1207,7 @@ func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidi
if bidir {
role = roleBoth
}
rpcConn := rpc.NewConn(NewJSONCodec(conn, role), serverNotifier)
rpcConn := rpc.NewServerConn(NewJSONCodec(conn, role), serverNotifier)
if custroot, ok := root.(*CustomMethodFinder); ok {
rpcConn.ServeFinder(custroot, tfErr)
custroot.root.conn = rpcConn
Expand All @@ -1225,12 +1227,12 @@ func newRPCClientServer(c *gc.C, root interface{}, tfErr func(error) error, bidi
if bidir {
role = roleBoth
}
client = rpc.NewConn(NewJSONCodec(conn, role), clientNotifier)
client := rpc.NewServerConn(NewJSONCodec(conn, role), clientNotifier)
client.Start()
return client, srvDone, clientNotifier, serverNotifier
return client.(rpc.ClientConn), srvDone, clientNotifier, serverNotifier
}

func closeClient(c *gc.C, client *rpc.Conn, srvDone <-chan error) {
func closeClient(c *gc.C, client io.Closer, srvDone <-chan error) {
err := client.Close()
c.Assert(err, jc.ErrorIsNil)
err = chanReadError(c, srvDone, "server done")
Expand Down

0 comments on commit da28cfa

Please sign in to comment.