Skip to content

Commit

Permalink
Merge pull request #23 from spiral/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
wolfy-j committed Jun 3, 2018
2 parents 4bff3a2 + 6d1a9af commit 27197f4
Show file tree
Hide file tree
Showing 23 changed files with 333 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -16,6 +16,8 @@ before_install:
install:
- composer install --no-interaction --prefer-source
- go get "github.com/spiral/goridge"
- cp -r * $GOPATH/src/github.com/spiral/goridge
- go get "github.com/pkg/errors"
- go get "github.com/stretchr/testify/assert"
- go build tests/server.go

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,6 +1,12 @@
CHANGELOG
=========

## v2.1.0 (03.06.2018)
- added golang ClientCodec implementation
- additional error detections
- added sequence support
- more tests

## v2.0.5 (03.04.2018)
- handled possible panic on reading from broken connection in socket relay

Expand Down
32 changes: 32 additions & 0 deletions binary.go
@@ -0,0 +1,32 @@
package goridge

import (
"bytes"
"encoding/binary"
)

func pack(m string, s uint64) []byte {
b := bytes.Buffer{}
b.WriteString(m)

b.Write([]byte{
byte(s),
byte(s >> 8),
byte(s >> 16),
byte(s >> 24),
byte(s >> 32),
byte(s >> 40),
byte(s >> 48),
byte(s >> 56),
})

return b.Bytes()
}

func unpack(in []byte, m *string, s *uint64) error {
*m = string(in[:len(in)-8])
*s = binary.LittleEndian.Uint64(in[len(in)-8:])

// no errors for now
return nil
}
27 changes: 27 additions & 0 deletions binary_test.go
@@ -0,0 +1,27 @@
package goridge

import (
"testing"
"net/rpc"
"github.com/stretchr/testify/assert"
)

func TestPackUnpack(t *testing.T) {
var (
req = &rpc.Request{
ServiceMethod: "test.Process",
Seq: 199,
}
res = &rpc.Response{}
)

data := pack(req.ServiceMethod, req.Seq)
assert.Len(t, data, len(req.ServiceMethod)+8)
assert.NoError(t, unpack(data, &res.ServiceMethod, &res.Seq))

assert.Equal(t, res.ServiceMethod, req.ServiceMethod)
assert.Equal(t, res.Seq, req.Seq)

assert.Equal(t, "test.Process", res.ServiceMethod)
assert.Equal(t, uint64(199), res.Seq)
}
106 changes: 106 additions & 0 deletions client.go
@@ -0,0 +1,106 @@
package goridge

import (
"io"
"net/rpc"
"reflect"
"encoding/json"
"github.com/pkg/errors"
)

// Client codec for goridge connection.
type ClientCodec struct {
relay Relay
closed bool
}

// NewCodec initiates new server rpc codec over socket connection.
func NewClientCodec(rwc io.ReadWriteCloser) *ClientCodec {
return &ClientCodec{relay: NewSocketRelay(rwc)}
}

// WriteRequest writes request to the connection. Sequential.
func (c *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error {
if err := c.relay.Send(pack(r.ServiceMethod, r.Seq), PayloadControl|PayloadRaw); err != nil {
return err
}

if bin, ok := body.(*[]byte); ok {
return c.relay.Send(*bin, PayloadRaw)
}

if bin, ok := body.([]byte); ok {
return c.relay.Send(bin, PayloadRaw)
}

packed, err := json.Marshal(body)
if err != nil {
return err
}

return c.relay.Send(packed, 0)
}

// ReadResponseHeader reads response from the connection.
func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
data, p, err := c.relay.Receive()
if err != nil {
return err
}

if !p.HasFlag(PayloadControl) {
return errors.New("invalid rpc header, control flag is missing")
}

if !p.HasFlag(PayloadRaw) {
return errors.New("rpc response header must be in {rawData}")
}

if !p.HasPayload() {
return errors.New("rpc response header can't be empty")
}

return unpack(data, &r.ServiceMethod, &r.Seq)
}

// ReadResponseBody response from the connection.
func (c *ClientCodec) ReadResponseBody(out interface{}) error {
data, p, err := c.relay.Receive()
if err != nil {
return err
}

if out == nil {
// discarding
return nil
}

if !p.HasPayload() {
return nil
}

if p.HasFlag(PayloadError) {
return errors.New(string(data))
}

if p.HasFlag(PayloadRaw) {
if bin, ok := out.(*[]byte); ok {
*bin = append(*bin, data...)
return nil
}

return errors.New("{rawData} request for " + reflect.ValueOf(out).String())
}

return json.Unmarshal(data, out)
}

// Close closes the client connection.
func (c *ClientCodec) Close() error {
if c.closed {
return nil
}

c.closed = true
return c.relay.Close()
}
107 changes: 107 additions & 0 deletions client_server_test.go
@@ -0,0 +1,107 @@
package goridge

import (
"testing"
"strings"
"net"
"net/rpc"
"github.com/stretchr/testify/assert"
"github.com/pkg/errors"
)

// testService sample
type testService struct{}

// Payload sample
type Payload struct {
Name string `json:"name"`
Value int `json:"value"`
Keys map[string]string `json:"keys,omitempty"`
}

// Echo returns incoming message
func (s *testService) Echo(msg string, r *string) error {
*r = msg
return nil
}

// Echo returns error
func (s *testService) EchoR(msg string, r *string) error {
return errors.New("echoR error")
}

// Process performs payload conversion
func (s *testService) Process(msg Payload, r *Payload) error {
r.Name = strings.ToUpper(msg.Name)
r.Value = -msg.Value

if len(msg.Keys) != 0 {
r.Keys = make(map[string]string)
for n, v := range msg.Keys {
r.Keys[v] = n
}
}

return nil
}

// EchoBinary work over binary data
func (s *testService) EchoBinary(msg []byte, out *[]byte) error {
*out = append(*out, msg...)

return nil
}

func TestClientServer(t *testing.T) {
var ln net.Listener
var err error

ln, err = net.Listen("tcp", ":8079")
if err != nil {
panic(err)
}

rpc.RegisterName("test", new(testService))

go func() {
for {
conn, err := ln.Accept()
if err != nil {
continue
}
rpc.ServeCodec(NewCodec(conn))
}
}()

conn, err := net.Dial("tcp", ":8079")
if err != nil {
panic(err)
}

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer client.Close()

var (
rs = ""
rp = Payload{}
rb = make([]byte, 0)
)

assert.NoError(t, client.Call("test.Process", Payload{
Name: "name",
Value: 1000,
Keys: map[string]string{"key": "value"},
}, &rp))

assert.Equal(t, "NAME", rp.Name)
assert.Equal(t, -1000, rp.Value)
assert.Equal(t, "key", rp.Keys["value"])

assert.NoError(t, client.Call("test.Echo", "hello", &rs))
assert.Equal(t, "hello", rs)

assert.NoError(t, client.Call("test.EchoBinary", []byte("hello world"), &rb))
assert.Equal(t, []byte("hello world"), rb)

assert.Error(t, client.Call("test.EchoR", "hi", &rs))
}
20 changes: 14 additions & 6 deletions codec.go
Expand Up @@ -27,19 +27,18 @@ func (c *Codec) ReadRequestHeader(r *rpc.Request) error {
}

if !p.HasFlag(PayloadControl) {
return errors.New("invalid request, control data is expected")
return errors.New("invalid rpc header, control flag is missing")
}

if !p.HasFlag(PayloadRaw) {
return errors.New("rpc control command must be in {rawData}")
return errors.New("rpc response header must be in {rawData}")
}

if !p.HasPayload() {
return nil
return errors.New("rpc request header can't be empty")
}

r.ServiceMethod = string(data)
return nil
return unpack(data, &r.ServiceMethod, &r.Seq)
}

// ReadRequestBody fetches prefixed body data and automatically unmarshal it as json. RawBody flag will populate
Expand All @@ -50,6 +49,11 @@ func (c *Codec) ReadRequestBody(out interface{}) error {
return err
}

if out == nil {
// discarding
return nil
}

if !p.HasPayload() {
return nil
}
Expand All @@ -60,14 +64,18 @@ func (c *Codec) ReadRequestBody(out interface{}) error {
return nil
}

return errors.New("{rawData} request for " + reflect.ValueOf(out).Elem().Kind().String())
return errors.New("{rawData} request for " + reflect.ValueOf(out).String())
}

return json.Unmarshal(data, out)
}

// WriteResponse marshals response, byte slice or error to remote party.
func (c *Codec) WriteResponse(r *rpc.Response, body interface{}) error {
if err := c.relay.Send(pack(r.ServiceMethod, r.Seq), PayloadControl|PayloadRaw); err != nil {
return err
}

if r.Error != "" {
return c.relay.Send([]byte(r.Error), PayloadError|PayloadRaw)
}
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -17,7 +17,7 @@
},
"autoload": {
"psr-4": {
"Spiral\\Goridge\\": "source/"
"Spiral\\Goridge\\": "php-src/"
}
},
"autoload-dev": {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 27197f4

Please sign in to comment.