Skip to content

Commit

Permalink
Merge pull request #393 from micro/legacy
Browse files Browse the repository at this point in the history
Evolution of Codecs and Methods
  • Loading branch information
asim committed Jan 18, 2019
2 parents 9ce9977 + 6468733 commit 943219f
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 59 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Expand Up @@ -38,7 +38,9 @@ type Message interface {
type Request interface {
// The service to call
Service() string
// The endpoint to call
// The action to take
Method() string
// The endpoint to invoke
Endpoint() string
// The content type
ContentType() string
Expand Down
72 changes: 48 additions & 24 deletions client/rpc_client.go
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
"net"
"strconv"
"sync"
"time"

"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/micro/go-micro/broker"
Expand Down Expand Up @@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}

func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error {
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}

msg := &transport.Message{
Header: make(map[string]string),
}
Expand All @@ -75,9 +81,16 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
// set the accept header
msg.Header["Accept"] = req.ContentType()

cf, err := r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
// setup old protocol
cf := setupProtocol(msg, node)

// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
}

var grr error
Expand Down Expand Up @@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}
}

func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}

msg := &transport.Message{
Header: make(map[string]string),
}
Expand All @@ -163,9 +181,16 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
// set the accept header
msg.Header["Accept"] = req.ContentType()

cf, err := r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
// set old codecs
cf := setupProtocol(msg, node)

// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
}

dOpts := []transport.DialOption{
Expand Down Expand Up @@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options {
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// return remote address
if len(opts.Address) > 0 {
address := opts.Address
port := 0

host, sport, err := net.SplitHostPort(opts.Address)
if err == nil {
address = host
port, _ = strconv.Atoi(sport)
}

return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
Address: address,
Port: port,
}, nil
}, nil
}
Expand Down Expand Up @@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}

// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}

// make the call
err = rcall(ctx, address, request, response, callOpts)
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}
Expand Down Expand Up @@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}

address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}

stream, err := r.stream(ctx, address, request, callOpts)
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}
Expand Down
30 changes: 17 additions & 13 deletions client/rpc_client_test.go
Expand Up @@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
address := "10.1.10.1"
port := 8080

wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called = true

if req.Service() != service {
Expand All @@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}

if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
}

if node.Port != port {
return fmt.Errorf("expected address: %d got %d", port, node.Port)
}

// don't do the call
Expand All @@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) {
req := c.NewRequest(service, endpoint, nil)

// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil {
t.Fatal("call with address error", err)
}

Expand All @@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) {
func TestCallRetry(t *testing.T) {
service := "test.service"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
address := "10.1.10.1"

var called int

wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.InternalServerError("test.error", "retry request")
Expand Down Expand Up @@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) {
id := "test.1"
service := "test.service"
endpoint := "Test.Endpoint"
host := "10.1.10.1"
address := "10.1.10.1"
port := 8080
address := "10.1.10.1:8080"

wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called = true

if req.Service() != service {
Expand All @@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}

if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
}

// don't do the call
Expand All @@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) {
Nodes: []*registry.Node{
&registry.Node{
Id: id,
Address: host,
Address: address,
Port: port,
},
},
Expand Down
40 changes: 39 additions & 1 deletion client/rpc_codec.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
)

Expand Down Expand Up @@ -58,6 +59,15 @@ var (
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}

// TODO: remove legacy codec list
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)

func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
Expand All @@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error {
return nil
}

// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"]

// got protocol
if len(protocol) > 0 {
return nil
}

// no protocol use old codecs
switch msg.Header["Content-Type"] {
case "application/json":
msg.Header["Content-Type"] = "application/json-rpc"
case "application/protobuf":
msg.Header["Content-Type"] = "application/proto-rpc"
}

// now return codec
return defaultCodecs[msg.Header["Content-Type"]]
}

func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil),
Expand Down Expand Up @@ -104,6 +135,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
// set the mucp headers
m.Header["X-Micro-Id"] = m.Id
m.Header["X-Micro-Service"] = m.Target
m.Header["X-Micro-Method"] = m.Method
m.Header["X-Micro-Endpoint"] = m.Endpoint

// if body is bytes Frame don't encode
Expand Down Expand Up @@ -154,6 +186,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
// read header
err := c.codec.ReadHeader(&me, r)
wm.Endpoint = me.Endpoint
wm.Method = me.Method
wm.Id = me.Id
wm.Error = me.Error

Expand All @@ -162,11 +195,16 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
wm.Error = me.Header["X-Micro-Error"]
}

// check method in header
// check endpoint in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}

// check method in header
if len(me.Method) == 0 {
wm.Method = me.Header["X-Micro-Method"]
}

if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
Expand Down
6 changes: 6 additions & 0 deletions client/rpc_request.go
Expand Up @@ -6,6 +6,7 @@ import (

type rpcRequest struct {
service string
method string
endpoint string
contentType string
codec codec.Codec
Expand All @@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin

return &rpcRequest{
service: service,
method: endpoint,
endpoint: endpoint,
body: request,
contentType: contentType,
Expand All @@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string {
return r.service
}

func (r *rpcRequest) Method() string {
return r.method
}

func (r *rpcRequest) Endpoint() string {
return r.endpoint
}
Expand Down
1 change: 1 addition & 0 deletions client/rpc_stream.go
Expand Up @@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error {
req := codec.Message{
Id: r.id,
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Type: codec.Request,
}
Expand Down
4 changes: 3 additions & 1 deletion client/wrapper.go
Expand Up @@ -2,10 +2,12 @@ package client

import (
"context"

"github.com/micro/go-micro/registry"
)

// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error

// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc
Expand Down
1 change: 1 addition & 0 deletions codec/codec.go
Expand Up @@ -53,6 +53,7 @@ type Message struct {
Id string
Type MessageType
Target string
Method string
Endpoint string
Error string

Expand Down
6 changes: 3 additions & 3 deletions codec/jsonrpc/client.go
Expand Up @@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {

func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
c.pending[m.Id] = m.Endpoint
c.pending[m.Id] = m.Method
c.Unlock()
c.req.Method = m.Endpoint
c.req.Method = m.Method
c.req.Params[0] = b
c.req.ID = m.Id
return c.enc.Encode(&c.req)
Expand All @@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
}

c.Lock()
m.Endpoint = c.pending[c.resp.ID]
m.Method = c.pending[c.resp.ID]
delete(c.pending, c.resp.ID)
c.Unlock()

Expand Down

0 comments on commit 943219f

Please sign in to comment.