diff --git a/.travis.yml b/.travis.yml index 74e8823..93776c1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,8 @@ before_install: install: - composer install --no-interaction --prefer-source - go get "github.com/spiral/goridge" + - cp -r * $GOPATH/src/github.com/spiral/goridge + - go get "github.com/pkg/errors" - go get "github.com/stretchr/testify/assert" - go build tests/server.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 21161e2..5d4eee7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ CHANGELOG ========= +## v2.1.0 (03.06.2018) +- added golang ClientCodec implementation +- additional error detections +- added sequence support +- more tests + ## v2.0.5 (03.04.2018) - handled possible panic on reading from broken connection in socket relay diff --git a/binary.go b/binary.go new file mode 100644 index 0000000..9157a4a --- /dev/null +++ b/binary.go @@ -0,0 +1,32 @@ +package goridge + +import ( + "bytes" + "encoding/binary" +) + +func pack(m string, s uint64) []byte { + b := bytes.Buffer{} + b.WriteString(m) + + b.Write([]byte{ + byte(s), + byte(s >> 8), + byte(s >> 16), + byte(s >> 24), + byte(s >> 32), + byte(s >> 40), + byte(s >> 48), + byte(s >> 56), + }) + + return b.Bytes() +} + +func unpack(in []byte, m *string, s *uint64) error { + *m = string(in[:len(in)-8]) + *s = binary.LittleEndian.Uint64(in[len(in)-8:]) + + // no errors for now + return nil +} diff --git a/binary_test.go b/binary_test.go new file mode 100644 index 0000000..277f20f --- /dev/null +++ b/binary_test.go @@ -0,0 +1,27 @@ +package goridge + +import ( + "testing" + "net/rpc" + "github.com/stretchr/testify/assert" +) + +func TestPackUnpack(t *testing.T) { + var ( + req = &rpc.Request{ + ServiceMethod: "test.Process", + Seq: 199, + } + res = &rpc.Response{} + ) + + data := pack(req.ServiceMethod, req.Seq) + assert.Len(t, data, len(req.ServiceMethod)+8) + assert.NoError(t, unpack(data, &res.ServiceMethod, &res.Seq)) + + assert.Equal(t, res.ServiceMethod, req.ServiceMethod) + assert.Equal(t, res.Seq, req.Seq) + + assert.Equal(t, "test.Process", res.ServiceMethod) + assert.Equal(t, uint64(199), res.Seq) +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..decb5a7 --- /dev/null +++ b/client.go @@ -0,0 +1,106 @@ +package goridge + +import ( + "io" + "net/rpc" + "reflect" + "encoding/json" + "github.com/pkg/errors" +) + +// Client codec for goridge connection. +type ClientCodec struct { + relay Relay + closed bool +} + +// NewCodec initiates new server rpc codec over socket connection. +func NewClientCodec(rwc io.ReadWriteCloser) *ClientCodec { + return &ClientCodec{relay: NewSocketRelay(rwc)} +} + +// WriteRequest writes request to the connection. Sequential. +func (c *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error { + if err := c.relay.Send(pack(r.ServiceMethod, r.Seq), PayloadControl|PayloadRaw); err != nil { + return err + } + + if bin, ok := body.(*[]byte); ok { + return c.relay.Send(*bin, PayloadRaw) + } + + if bin, ok := body.([]byte); ok { + return c.relay.Send(bin, PayloadRaw) + } + + packed, err := json.Marshal(body) + if err != nil { + return err + } + + return c.relay.Send(packed, 0) +} + +// ReadResponseHeader reads response from the connection. +func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error { + data, p, err := c.relay.Receive() + if err != nil { + return err + } + + if !p.HasFlag(PayloadControl) { + return errors.New("invalid rpc header, control flag is missing") + } + + if !p.HasFlag(PayloadRaw) { + return errors.New("rpc response header must be in {rawData}") + } + + if !p.HasPayload() { + return errors.New("rpc response header can't be empty") + } + + return unpack(data, &r.ServiceMethod, &r.Seq) +} + +// ReadResponseBody response from the connection. +func (c *ClientCodec) ReadResponseBody(out interface{}) error { + data, p, err := c.relay.Receive() + if err != nil { + return err + } + + if out == nil { + // discarding + return nil + } + + if !p.HasPayload() { + return nil + } + + if p.HasFlag(PayloadError) { + return errors.New(string(data)) + } + + if p.HasFlag(PayloadRaw) { + if bin, ok := out.(*[]byte); ok { + *bin = append(*bin, data...) + return nil + } + + return errors.New("{rawData} request for " + reflect.ValueOf(out).String()) + } + + return json.Unmarshal(data, out) +} + +// Close closes the client connection. +func (c *ClientCodec) Close() error { + if c.closed { + return nil + } + + c.closed = true + return c.relay.Close() +} diff --git a/client_server_test.go b/client_server_test.go new file mode 100644 index 0000000..c2f90a4 --- /dev/null +++ b/client_server_test.go @@ -0,0 +1,107 @@ +package goridge + +import ( + "testing" + "strings" + "net" + "net/rpc" + "github.com/stretchr/testify/assert" + "github.com/pkg/errors" +) + +// testService sample +type testService struct{} + +// Payload sample +type Payload struct { + Name string `json:"name"` + Value int `json:"value"` + Keys map[string]string `json:"keys,omitempty"` +} + +// Echo returns incoming message +func (s *testService) Echo(msg string, r *string) error { + *r = msg + return nil +} + +// Echo returns error +func (s *testService) EchoR(msg string, r *string) error { + return errors.New("echoR error") +} + +// Process performs payload conversion +func (s *testService) Process(msg Payload, r *Payload) error { + r.Name = strings.ToUpper(msg.Name) + r.Value = -msg.Value + + if len(msg.Keys) != 0 { + r.Keys = make(map[string]string) + for n, v := range msg.Keys { + r.Keys[v] = n + } + } + + return nil +} + +// EchoBinary work over binary data +func (s *testService) EchoBinary(msg []byte, out *[]byte) error { + *out = append(*out, msg...) + + return nil +} + +func TestClientServer(t *testing.T) { + var ln net.Listener + var err error + + ln, err = net.Listen("tcp", ":8079") + if err != nil { + panic(err) + } + + rpc.RegisterName("test", new(testService)) + + go func() { + for { + conn, err := ln.Accept() + if err != nil { + continue + } + rpc.ServeCodec(NewCodec(conn)) + } + }() + + conn, err := net.Dial("tcp", ":8079") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(NewClientCodec(conn)) + defer client.Close() + + var ( + rs = "" + rp = Payload{} + rb = make([]byte, 0) + ) + + assert.NoError(t, client.Call("test.Process", Payload{ + Name: "name", + Value: 1000, + Keys: map[string]string{"key": "value"}, + }, &rp)) + + assert.Equal(t, "NAME", rp.Name) + assert.Equal(t, -1000, rp.Value) + assert.Equal(t, "key", rp.Keys["value"]) + + assert.NoError(t, client.Call("test.Echo", "hello", &rs)) + assert.Equal(t, "hello", rs) + + assert.NoError(t, client.Call("test.EchoBinary", []byte("hello world"), &rb)) + assert.Equal(t, []byte("hello world"), rb) + + assert.Error(t, client.Call("test.EchoR", "hi", &rs)) +} diff --git a/codec.go b/codec.go index 0506e9f..734d9bb 100644 --- a/codec.go +++ b/codec.go @@ -27,19 +27,18 @@ func (c *Codec) ReadRequestHeader(r *rpc.Request) error { } if !p.HasFlag(PayloadControl) { - return errors.New("invalid request, control data is expected") + return errors.New("invalid rpc header, control flag is missing") } if !p.HasFlag(PayloadRaw) { - return errors.New("rpc control command must be in {rawData}") + return errors.New("rpc response header must be in {rawData}") } if !p.HasPayload() { - return nil + return errors.New("rpc request header can't be empty") } - r.ServiceMethod = string(data) - return nil + return unpack(data, &r.ServiceMethod, &r.Seq) } // ReadRequestBody fetches prefixed body data and automatically unmarshal it as json. RawBody flag will populate @@ -50,6 +49,11 @@ func (c *Codec) ReadRequestBody(out interface{}) error { return err } + if out == nil { + // discarding + return nil + } + if !p.HasPayload() { return nil } @@ -60,7 +64,7 @@ func (c *Codec) ReadRequestBody(out interface{}) error { return nil } - return errors.New("{rawData} request for " + reflect.ValueOf(out).Elem().Kind().String()) + return errors.New("{rawData} request for " + reflect.ValueOf(out).String()) } return json.Unmarshal(data, out) @@ -68,6 +72,10 @@ func (c *Codec) ReadRequestBody(out interface{}) error { // WriteResponse marshals response, byte slice or error to remote party. func (c *Codec) WriteResponse(r *rpc.Response, body interface{}) error { + if err := c.relay.Send(pack(r.ServiceMethod, r.Seq), PayloadControl|PayloadRaw); err != nil { + return err + } + if r.Error != "" { return c.relay.Send([]byte(r.Error), PayloadError|PayloadRaw) } diff --git a/composer.json b/composer.json index 15a5510..8248992 100644 --- a/composer.json +++ b/composer.json @@ -17,7 +17,7 @@ }, "autoload": { "psr-4": { - "Spiral\\Goridge\\": "source/" + "Spiral\\Goridge\\": "php-src/" } }, "autoload-dev": { diff --git a/source/Exceptions/GoridgeException.php b/php-src/Exceptions/GoridgeException.php similarity index 100% rename from source/Exceptions/GoridgeException.php rename to php-src/Exceptions/GoridgeException.php diff --git a/source/Exceptions/InvalidArgumentException.php b/php-src/Exceptions/InvalidArgumentException.php similarity index 100% rename from source/Exceptions/InvalidArgumentException.php rename to php-src/Exceptions/InvalidArgumentException.php diff --git a/source/Exceptions/PrefixException.php b/php-src/Exceptions/PrefixException.php similarity index 100% rename from source/Exceptions/PrefixException.php rename to php-src/Exceptions/PrefixException.php diff --git a/source/Exceptions/RPCException.php b/php-src/Exceptions/RPCException.php similarity index 100% rename from source/Exceptions/RPCException.php rename to php-src/Exceptions/RPCException.php diff --git a/source/Exceptions/RelayException.php b/php-src/Exceptions/RelayException.php similarity index 100% rename from source/Exceptions/RelayException.php rename to php-src/Exceptions/RelayException.php diff --git a/source/Exceptions/ServiceException.php b/php-src/Exceptions/ServiceException.php similarity index 100% rename from source/Exceptions/ServiceException.php rename to php-src/Exceptions/ServiceException.php diff --git a/source/Exceptions/TransportException.php b/php-src/Exceptions/TransportException.php similarity index 100% rename from source/Exceptions/TransportException.php rename to php-src/Exceptions/TransportException.php diff --git a/source/RPC.php b/php-src/RPC.php similarity index 67% rename from source/RPC.php rename to php-src/RPC.php index b281b63..472acb3 100644 --- a/source/RPC.php +++ b/php-src/RPC.php @@ -17,6 +17,11 @@ class RPC /** @var Relay */ private $relay; + /** + * @var int + */ + private $seq; + /** * @param Relay $relay */ @@ -37,19 +42,41 @@ public function __construct(Relay $relay) */ public function call(string $method, $payload, int $flags = 0) { - $this->relay->send($method, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + $this->relay->send( + $method . pack("P", $this->seq), + Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW + ); if ($flags & Relay::PAYLOAD_RAW) { $this->relay->send($payload, $flags); } else { $body = json_encode($payload); if ($body === false) { - throw new Exceptions\ServiceException(sprintf("json encode: %s", json_last_error_msg())); + throw new Exceptions\ServiceException(sprintf( + "json encode: %s", + json_last_error_msg() + )); } $this->relay->send($body); } + $body = $this->relay->receiveSync($flags); + + if (!($flags & Relay::PAYLOAD_CONTROL)) { + throw new Exceptions\TransportException("rpc response header is missing"); + } + + $rpc = unpack("Ps", substr($body, -8)); + $rpc['m'] = substr($body, 0, -8); + + if ($rpc["m"] != $method || $rpc["s"] != $this->seq) { + throw new Exceptions\TransportException("rpc method call mismatch"); + } + + // request id++ + $this->seq++; + // wait for the response $body = $this->relay->receiveSync($flags); diff --git a/source/RelayInterface.php b/php-src/RelayInterface.php similarity index 100% rename from source/RelayInterface.php rename to php-src/RelayInterface.php diff --git a/source/SocketRelay.php b/php-src/SocketRelay.php similarity index 100% rename from source/SocketRelay.php rename to php-src/SocketRelay.php diff --git a/source/StreamRelay.php b/php-src/StreamRelay.php similarity index 100% rename from source/StreamRelay.php rename to php-src/StreamRelay.php diff --git a/phpunit.xml b/phpunit.xml index 3fc9fb3..311e196 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -20,7 +20,7 @@ - source + php-src \ No newline at end of file diff --git a/socket.go b/socket.go index 13baa5a..99a80e8 100644 --- a/socket.go +++ b/socket.go @@ -78,10 +78,5 @@ func (rl *SocketRelay) Receive() (data []byte, p Prefix, err error) { // Close the connection. func (rl *SocketRelay) Close() error { - rl.muw.Lock() - rl.mur.Lock() - defer rl.muw.Unlock() - defer rl.mur.Unlock() - return rl.rwc.Close() -} +} \ No newline at end of file diff --git a/tests/Cases/RPCTest.php b/tests/Cases/RPCTest.php index d29974b..68b2702 100644 --- a/tests/Cases/RPCTest.php +++ b/tests/Cases/RPCTest.php @@ -56,7 +56,7 @@ public function testLongEcho() /** * @expectedException \Spiral\Goridge\Exceptions\ServiceException - * @expectedExceptionMessageRegExp #error '{rawData} request for string'.*# + * @expectedExceptionMessage {rawData} request for <*string Value> */ public function testConvertException() { @@ -120,7 +120,7 @@ public function testPayload() /** * @expectedException \Spiral\Goridge\Exceptions\ServiceException - * @expectedExceptionMessageRegExp #error '{rawData} request for struct.*# + * @expectedExceptionMessage {rawData} request for <*main.Payload Value> */ public function testBadPayload() { diff --git a/tests/server.go b/tests/server.go index 98eb1c5..66a98ab 100644 --- a/tests/server.go +++ b/tests/server.go @@ -29,12 +29,14 @@ func (s *Service) Ping(msg string, r *string) error { if msg == "ping" { *r = "pong" } + return nil } // Echo returns incoming message func (s *Service) Echo(msg string, r *string) error { *r = msg + return nil } @@ -54,8 +56,8 @@ func (s *Service) Process(msg Payload, r *Payload) error { } // EchoBinary work over binary data -func (s *Service) EchoBinary(msg []byte, out *[]byte) error { - *out = append(*out, msg...) +func (s *Service) EchoBinary(msg []byte, r *[]byte) error { + *r = append(*r, msg...) return nil } @@ -80,6 +82,7 @@ func main() { if err != nil { continue } + go rpc.ServeCodec(goridge.NewCodec(conn)) } }