Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ suppressions/
.idea

cmd/rsocket-cli/rsocket-cli

coverage.out
13 changes: 6 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ language: go
go:
- 1.x

env:
- GO111MODULE=on

before_install:
- go get -u golang.org/x/lint/golint
install:
- go get golang.org/x/lint/golint
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls

script:
# - golint ./...
- golangci-lint run ./...
- go test -race -count=1 . -v
- go test -v -covermode=atomic -coverprofile=coverage.out -count=1 ./...
- goveralls -coverprofile=coverage.out -service=travis-ci -repotoken $COVERALLS_TOKEN
94 changes: 27 additions & 67 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
![logo](./logo.jpg)

[![Build Status](https://travis-ci.com/rsocket/rsocket-go.svg?branch=master)](https://travis-ci.com/rsocket/rsocket-go)
[![Coverage Status](https://coveralls.io/repos/github/rsocket/rsocket-go/badge.svg?branch=master)](https://coveralls.io/github/rsocket/rsocket-go?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/rsocket/rsocket-go)](https://goreportcard.com/report/github.com/rsocket/rsocket-go)
[![Slack](https://img.shields.io/badge/slack-rsocket--go-blue.svg)](https://rsocket.slack.com/messages/C9VGZ5MV3)
[![GoDoc](https://godoc.org/github.com/rsocket/rsocket-go?status.svg)](https://godoc.org/github.com/rsocket/rsocket-go)
[![Go Report Card](https://goreportcard.com/badge/github.com/rsocket/rsocket-go)](https://goreportcard.com/report/github.com/rsocket/rsocket-go)
[![License](https://img.shields.io/github/license/rsocket/rsocket-go.svg)](https://github.com/rsocket/rsocket-go/blob/master/LICENSE)
[![GitHub Release](https://img.shields.io/github/release-pre/rsocket/rsocket-go.svg)](https://github.com/rsocket/rsocket-go/releases)

Expand All @@ -28,6 +29,7 @@ package main

import (
"context"
"log"

"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
Expand All @@ -46,10 +48,11 @@ func main() {
}),
), nil
}).
Transport("tcp://127.0.0.1:7878").
Transport(rsocket.TcpServer().SetAddr(":7878").Build()).
Serve(context.Background())
panic(err)
log.Fatalln(err)
}

```

> Connect to echo server
Expand All @@ -71,7 +74,7 @@ func main() {
Resume().
Fragment(1024).
SetupPayload(payload.NewString("Hello", "World")).
Transport("tcp://127.0.0.1:7878").
Transport(rsocket.TcpClient().SetHostAndPort("127.0.0.1", 7878).Build()).
Start(context.Background())
if err != nil {
panic(err)
Expand Down Expand Up @@ -131,12 +134,13 @@ func main() {
DoFinally(func(s rx.SignalType) {
close(done)
}).
DoOnSuccess(func(input payload.Payload) {
DoOnSuccess(func(input payload.Payload) error {
// Handle and consume payload.
// Do something here...
fmt.Println("bingo:", input)
return nil
}).
SubscribeOn(scheduler.Elastic()).
SubscribeOn(scheduler.Parallel()).
Subscribe(context.Background())

<-done
Expand All @@ -155,9 +159,9 @@ import (
"context"
"fmt"

flxx "github.com/jjeffcaii/reactor-go/flux"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
)

Expand All @@ -177,24 +181,27 @@ func main() {
// payload.NewString("qux", extension.TextPlain.String()),
//)

f.
DoOnNext(func(elem payload.Payload) {
// Block
_, _ = f.
DoOnNext(func(elem payload.Payload) error {
// Handle and consume elements
// Do something here...
fmt.Println("bingo:", elem)
return nil
}).
Subscribe(context.Background())
BlockLast(context.Background())

// Or you can use Raw reactor-go API. :-D
f2 := flux.Raw(flxx.Range(0, 10).Map(func(i interface{}) interface{} {
return payload.NewString(fmt.Sprintf("Hello@%d", i.(int)), extension.TextPlain.String())
// Subscribe
f.Subscribe(context.Background(), rx.OnNext(func(input payload.Payload) error {
fmt.Println("bingo:", input)
return nil
}))
f2.
DoOnNext(func(input payload.Payload) {
fmt.Println("bingo:", input)
}).
BlockLast(context.Background())

// Or implement your own subscriber
var s rx.Subscriber
f.SubscribeWith(context.Background(), s)
}

```

#### Backpressure & RequestN
Expand Down Expand Up @@ -237,65 +244,18 @@ func main() {
su = s
su.Request(1)
}),
rx.OnNext(func(elem payload.Payload) {
rx.OnNext(func(elem payload.Payload) error {
// Consume element, do something...
fmt.Println("bingo:", elem)
// Request for next one manually.
su.Request(1)
return nil
}),
)
}

```

#### Logging

We do not use a specific log implementation. You can register your own log implementation. For example:

```go
package main

import (
"log"

"github.com/rsocket/rsocket-go/logger"
)

func init() {
logger.SetFunc(logger.LevelDebug|logger.LevelInfo|logger.LevelWarn|logger.LevelError, func(template string, args ...interface{}) {
// Implement your own logger here...
log.Printf(template, args...)
})
logger.SetLevel(logger.LevelInfo)
}

```

#### Dependencies
- [reactor-go](https://github.com/jjeffcaii/reactor-go)
- [testify](https://github.com/stretchr/testify)
- [websocket](https://github.com/gorilla/websocket)

### TODO

#### Transport
- [x] TCP
- [x] Websocket

#### Duplex Socket
- [x] MetadataPush
- [x] RequestFNF
- [x] RequestResponse
- [x] RequestStream
- [x] RequestChannel

##### Others
- [x] Resume
- [x] Keepalive
- [x] Fragmentation
- [x] Thin Reactor
- [x] Cancel
- [x] Error
- [x] Flow Control: RequestN
- [x] Flow Control: Lease
- [x] Load Balance
7 changes: 4 additions & 3 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package balancer

import (
"context"
"io"

"github.com/rsocket/rsocket-go"
Expand All @@ -11,11 +12,11 @@ import (
type Balancer interface {
io.Closer
// Put puts a new client.
Put(client rsocket.Client)
Put(client rsocket.Client) error
// PutLabel puts a new client with a label.
PutLabel(label string, client rsocket.Client)
PutLabel(label string, client rsocket.Client) error
// Next returns next balanced RSocket client.
Next() rsocket.Client
Next(context.Context) (rsocket.Client, bool)
// OnLeave handle events when a client exit.
OnLeave(fn func(label string))
}
28 changes: 15 additions & 13 deletions balancer/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var errGroupClosed = errors.New("balancer group has been closed")
// Group can be used to create a simple RSocket Broker.
type Group struct {
g func() Balancer
m *sync.Map
l sync.Mutex
m map[string]Balancer
}

// Close close current RSocket group.
Expand All @@ -33,10 +34,12 @@ func (p *Group) Close() (err error) {
}
}
}(all, done)
p.m.Range(func(key, value interface{}) bool {
all <- value.(Balancer)
return true
})

p.l.Lock()
defer p.l.Unlock()
for _, b := range p.m {
all <- b
}
p.m = nil
close(all)
<-done
Expand All @@ -45,24 +48,23 @@ func (p *Group) Close() (err error) {

// Get returns a Balancer with custom id.
func (p *Group) Get(id string) Balancer {
p.l.Lock()
defer p.l.Unlock()
if p.m == nil {
panic(errGroupClosed)
}
if actual, ok := p.m.Load(id); ok {
return actual.(Balancer)
if actual, ok := p.m[id]; ok {
return actual
}
newborn := p.g()
actual, loaded := p.m.LoadOrStore(id, newborn)
if loaded {
_ = newborn.Close()
}
return actual.(Balancer)
p.m[id] = newborn
return newborn
}

// NewGroup returns a new Group.
func NewGroup(gen func() Balancer) *Group {
return &Group{
g: gen,
m: &sync.Map{},
m: make(map[string]Balancer),
}
}
46 changes: 46 additions & 0 deletions balancer/group_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package balancer

import (
"context"
"errors"
"fmt"
"time"

"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"
)

func ExampleNewGroup() {
group := NewGroup(func() Balancer {
return NewRoundRobinBalancer()
})
defer func() {
_ = group.Close()
}()
// Create a broker with resume.
err := rsocket.Receive().
Resume(rsocket.WithServerResumeSessionDuration(10 * time.Second)).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
// Register service using Setup Metadata as service ID.
if serviceID, ok := setup.MetadataUTF8(); ok {
_ = group.Get(serviceID).Put(sendingSocket)
}
// Proxy requests by group.
return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
requestServiceID, ok := msg.MetadataUTF8()
if !ok {
panic(errors.New("missing service ID in metadata"))
}
fmt.Println("[broker] redirect request to service", requestServiceID)
upstream, _ := group.Get(requestServiceID).Next(context.Background())
fmt.Println("[broker] choose upstream:", upstream)
return upstream.RequestResponse(msg)
})), nil
}).
Transport(rsocket.TcpServer().SetAddr(":7878").Build()).
Serve(context.Background())
if err != nil {
panic(err)
}
}
Loading