Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go:

install:
- go get golang.org/x/lint/golint
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.30.0
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls

Expand Down
108 changes: 54 additions & 54 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,110 +87,110 @@ type clientBuilder struct {
onCloses []func(error)
}

func (p *clientBuilder) Lease() ClientBuilder {
p.setup.Lease = true
return p
func (cb *clientBuilder) Lease() ClientBuilder {
cb.setup.Lease = true
return cb
}

func (p *clientBuilder) Resume(opts ...ClientResumeOptions) ClientBuilder {
if p.resume == nil {
p.resume = newResumeOpts()
func (cb *clientBuilder) Resume(opts ...ClientResumeOptions) ClientBuilder {
if cb.resume == nil {
cb.resume = newResumeOpts()
}
for _, it := range opts {
it(p.resume)
it(cb.resume)
}
return p
return cb
}

func (p *clientBuilder) Fragment(mtu int) ClientBuilder {
func (cb *clientBuilder) Fragment(mtu int) ClientBuilder {
if mtu == 0 {
p.fragment = fragmentation.MaxFragment
cb.fragment = fragmentation.MaxFragment
} else {
p.fragment = mtu
cb.fragment = mtu
}
return p
return cb
}

func (p *clientBuilder) OnClose(fn func(error)) ClientBuilder {
p.onCloses = append(p.onCloses, fn)
return p
func (cb *clientBuilder) OnClose(fn func(error)) ClientBuilder {
cb.onCloses = append(cb.onCloses, fn)
return cb
}

func (p *clientBuilder) KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder {
p.setup.KeepaliveInterval = tickPeriod
p.setup.KeepaliveLifetime = time.Duration(missedAcks) * ackTimeout
return p
func (cb *clientBuilder) KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder {
cb.setup.KeepaliveInterval = tickPeriod
cb.setup.KeepaliveLifetime = time.Duration(missedAcks) * ackTimeout
return cb
}

func (p *clientBuilder) DataMimeType(mime string) ClientBuilder {
p.setup.DataMimeType = []byte(mime)
return p
func (cb *clientBuilder) DataMimeType(mime string) ClientBuilder {
cb.setup.DataMimeType = []byte(mime)
return cb
}

func (p *clientBuilder) MetadataMimeType(mime string) ClientBuilder {
p.setup.MetadataMimeType = []byte(mime)
return p
func (cb *clientBuilder) MetadataMimeType(mime string) ClientBuilder {
cb.setup.MetadataMimeType = []byte(mime)
return cb
}

func (p *clientBuilder) SetupPayload(setup payload.Payload) ClientBuilder {
p.setup.Data = nil
p.setup.Metadata = nil
func (cb *clientBuilder) SetupPayload(setup payload.Payload) ClientBuilder {
cb.setup.Data = nil
cb.setup.Metadata = nil

if data := setup.Data(); len(data) > 0 {
p.setup.Data = make([]byte, len(data))
copy(p.setup.Data, data)
cb.setup.Data = make([]byte, len(data))
copy(cb.setup.Data, data)
}
if metadata, ok := setup.Metadata(); ok {
p.setup.Metadata = make([]byte, len(metadata))
copy(p.setup.Metadata, metadata)
cb.setup.Metadata = make([]byte, len(metadata))
copy(cb.setup.Metadata, metadata)
}
return p
return cb
}

func (p *clientBuilder) Acceptor(acceptor ClientSocketAcceptor) ToClientStarter {
p.acceptor = acceptor
return p
func (cb *clientBuilder) Acceptor(acceptor ClientSocketAcceptor) ToClientStarter {
cb.acceptor = acceptor
return cb
}

func (p *clientBuilder) Transport(t transport.ClientTransportFunc) ClientStarter {
p.tpGen = t
return p
func (cb *clientBuilder) Transport(t transport.ClientTransportFunc) ClientStarter {
cb.tpGen = t
return cb
}

func (p *clientBuilder) Start(ctx context.Context) (client Client, err error) {
func (cb *clientBuilder) Start(ctx context.Context) (client Client, err error) {
// create a blank socket.
err = fragmentation.IsValidFragment(p.fragment)
err = fragmentation.IsValidFragment(cb.fragment)
if err != nil {
return nil, err
}

sk := socket.NewClientDuplexConnection(
p.fragment,
p.setup.KeepaliveInterval,
conn := socket.NewClientDuplexConnection(
cb.fragment,
cb.setup.KeepaliveInterval,
)
// create a client.
var cs setupClientSocket
if p.resume != nil {
p.setup.Token = p.resume.tokenGen()
cs = socket.NewResumableClientSocket(p.tpGen, sk)
if cb.resume != nil {
cb.setup.Token = cb.resume.tokenGen()
cs = socket.NewResumableClientSocket(cb.tpGen, conn)
} else {
cs = socket.NewClient(p.tpGen, sk)
cs = socket.NewClient(cb.tpGen, conn)
}
if p.acceptor != nil {
sk.SetResponder(p.acceptor(cs))
if cb.acceptor != nil {
conn.SetResponder(cb.acceptor(cs))
} else {
sk.SetResponder(_noopSocket)
conn.SetResponder(_noopSocket)
}

// bind closers.
if len(p.onCloses) > 0 {
for _, closer := range p.onCloses {
if len(cb.onCloses) > 0 {
for _, closer := range cb.onCloses {
cs.OnClose(closer)
}
}

// setup client.
err = cs.Setup(ctx, p.setup)
err = cs.Setup(ctx, cb.setup)
if err == nil {
client = cs
}
Expand Down
5 changes: 0 additions & 5 deletions core/framing/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package framing

import (
"errors"
"fmt"
"io"

"github.com/rsocket/rsocket-go/core"
Expand Down Expand Up @@ -139,7 +138,3 @@ func FromBytes(b []byte) (core.Frame, error) {
return FromRawFrame(raw)
}

func PrintFrame(f core.WriteableFrame) string {
// TODO: print frame
return fmt.Sprintf("%+v", f)
}
2 changes: 1 addition & 1 deletion core/framing/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)

const _sid uint32 = 1
const _sid uint32 = 1234

func TestFromBytes(t *testing.T) {
// empty
Expand Down
86 changes: 86 additions & 0 deletions core/framing/misc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package framing

import (
"encoding/binary"
"fmt"
"io"
"strconv"
"strings"

"github.com/rsocket/rsocket-go/core"
"github.com/rsocket/rsocket-go/internal/common"
Expand Down Expand Up @@ -53,6 +57,88 @@ func FromRawFrame(f *RawFrame) (frame core.Frame, err error) {
return
}

// PrintFrame prints frame in bytes dump.
func PrintFrame(f core.WriteableFrame) string {
var initN, reqN uint32
var metadata, data []byte

switch it := f.(type) {
case *PayloadFrame:
metadata, _ = it.Metadata()
data = it.Data()
case *WriteablePayloadFrame:
metadata, data = it.metadata, it.data
case *MetadataPushFrame:
metadata, _ = it.Metadata()
case *FireAndForgetFrame:
metadata, _ = it.Metadata()
data = it.Data()
case *RequestResponseFrame:
metadata, _ = it.Metadata()
data = it.Data()
case *RequestStreamFrame:
metadata, _ = it.Metadata()
data = it.Data()
initN = it.InitialRequestN()
case *RequestChannelFrame:
metadata, _ = it.Metadata()
data = it.Data()
initN = it.InitialRequestN()
case *SetupFrame:
metadata, _ = it.Metadata()
data = it.Data()
case *RequestNFrame:
reqN = it.N()
case *WriteableMetadataPushFrame:
metadata = it.metadata
case *WriteableFireAndForgetFrame:
metadata, data = it.metadata, it.data
case *WriteableRequestResponseFrame:
metadata, data = it.metadata, it.data
case *WriteableRequestStreamFrame:
metadata, data = it.metadata, it.data
reqN = binary.BigEndian.Uint32(it.n[:])
case *WriteableRequestChannelFrame:
metadata, data = it.metadata, it.data
reqN = binary.BigEndian.Uint32(it.n[:])
case *WriteableSetupFrame:
metadata, data = it.metadata, it.data
case *WriteableRequestNFrame:
reqN = binary.BigEndian.Uint32(it.n[:])
}

b := &strings.Builder{}
b.WriteString("\nFrame => Stream ID: ")
h := f.Header()
b.WriteString(strconv.Itoa(int(h.StreamID())))
b.WriteString(" Type: ")
b.WriteString(h.Type().String())
b.WriteString(" Flags: 0b")
_, _ = fmt.Fprintf(b, "%010b", h.Flag())
b.WriteString(" Length: ")
b.WriteString(strconv.Itoa(f.Len()))
if initN > 0 {
b.WriteString(" InitialRequestN: ")
_, _ = fmt.Fprintf(b, "%d", initN)
}

if reqN > 0 {
b.WriteString(" RequestN: ")
_, _ = fmt.Fprintf(b, "%d", reqN)
}

if metadata != nil {
b.WriteString("\nMetadata:\n")
common.AppendPrettyHexDump(b, metadata)
}

if data != nil {
b.WriteString("\nData:\n")
common.AppendPrettyHexDump(b, data)
}
return b.String()
}

func writePayload(w io.Writer, data []byte, metadata []byte) (n int64, err error) {
if l := len(metadata); l > 0 {
var wrote int64
Expand Down
44 changes: 44 additions & 0 deletions core/framing/misc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package framing_test

import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/rsocket/rsocket-go/core"
"github.com/rsocket/rsocket-go/core/framing"
"github.com/stretchr/testify/assert"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

func TestPrintFrame(t *testing.T) {
mime := []byte("fake mime")
data := make([]byte, 100)
metadata := make([]byte, 50)
rand.Read(data)
rand.Read(metadata)
data = append([]byte("fake data"), data...)
metadata = append([]byte("fake metadata"), metadata...)
for _, f := range []core.WriteableFrame{
framing.NewCancelFrame(_sid),
framing.NewPayloadFrame(_sid, data, metadata, core.FlagComplete|core.FlagNext|core.FlagFollow),
framing.NewRequestResponseFrame(_sid, data, metadata, core.FlagComplete|core.FlagNext|core.FlagFollow),
framing.NewMetadataPushFrame(metadata),
framing.NewFireAndForgetFrame(_sid, data, metadata, core.FlagComplete|core.FlagNext|core.FlagFollow),
framing.NewRequestStreamFrame(_sid, 1, data, metadata, core.FlagComplete|core.FlagNext|core.FlagFollow),
framing.NewRequestChannelFrame(_sid, 1, data, metadata, core.FlagComplete|core.FlagNext|core.FlagFollow),
framing.NewSetupFrame(core.DefaultVersion, 30*time.Second, 90*time.Second, nil, mime, mime, data, metadata, false),
} {
tryPrintFrame(t, f)
}
}

func tryPrintFrame(t *testing.T, f core.WriteableFrame) {
s := framing.PrintFrame(f)
assert.True(t, len(s) > 0)
fmt.Println(s)
}
4 changes: 2 additions & 2 deletions core/transport/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (p *TcpConn) Read() (f core.Frame, err error) {
return
}
if logger.IsDebugEnabled() {
logger.Debugf("<--- rcv: %s\n", f)
logger.Debugf("%s\n", framing.PrintFrame(f))
}
return
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (p *TcpConn) Write(frame core.WriteableFrame) (err error) {
return
}
if logger.IsDebugEnabled() {
logger.Debugf("---> snd: %s\n", debugStr)
logger.Debugf("%s\n", debugStr)
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions core/transport/websocket_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *WsConn) Read() (f core.Frame, err error) {
return
}
if logger.IsDebugEnabled() {
logger.Debugf("<--- rcv: %s\n", f)
logger.Debugf("%s\n", framing.PrintFrame(f))
}
return
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (p *WsConn) Write(frame core.WriteableFrame) (err error) {
p.counter.IncWriteBytes(size)
}
if logger.IsDebugEnabled() {
logger.Debugf("---> snd: %s\n", frame)
logger.Debugf("%s\n", framing.PrintFrame(frame))
}
return
}
Expand Down
Loading