Skip to content
rsocket-go implementation
Branch: master
Clone or download
Latest commit 224242c Jun 12, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
balancer support unix socket. Jun 12, 2019
cmd support unix socket. Jun 12, 2019
extension add basic client/server in-memory resume support. Jun 4, 2019
internal support unix socket. Jun 12, 2019
logger support unix socket. Jun 12, 2019
payload add basic client/server in-memory resume support. Jun 4, 2019
rx support unix socket. Jun 12, 2019
.editorconfig improve rx functions. Mar 12, 2019
.gitignore add go-fuzz test Mar 11, 2019
.travis.yml support unix socket. Jun 12, 2019
LICENSE Change to Apache 2 License Apr 16, 2019
README.md fix readme Jun 12, 2019
client.go fix lint. Jun 11, 2019
fuzz.go Fix websocket support. Jun 9, 2019
go.mod fix lint. Jun 11, 2019
go.sum fix lint. Jun 11, 2019
logo.jpg fix logo Mar 23, 2019
rsocket.go fix lint. Jun 11, 2019
rsocket_test.go support unix socket. Jun 12, 2019
server.go support unix socket. Jun 12, 2019

README.md

rsocket-go

logo

Travis (.org) Slack GoDoc Go Report Card License GitHub Release

rsocket-go is an implementation of the RSocket protocol in Go. It is still under development, APIs are unstable and maybe change at any time until release of v1.0.0. Please do not use it in a production environment.

Features

  • Design For Golang.
  • Thin reactive-streams implementation.
  • Simulate Java SDK API.

Getting started

Start an echo server

package main

import (
	"context"
	
	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Create and serve
	err := rsocket.Receive().
		Resume().
		Fragment(1024).
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket {
			// bind responder
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) rx.Mono {
					return rx.JustMono(msg)
				}),
			)
		}).
		Transport("tcp://127.0.0.1:7878").
		Serve(context.Background())
	panic(err)
}

Connect to echo server

package main

import (
	"context"
	"log"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Connect to server
	cli, err := rsocket.Connect().
		Resume().
		Fragment(1024).
		SetupPayload(payload.NewString("Hello", "World")).
		Transport("tcp://127.0.0.1:7878").
		Start(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.Close()
	// Send request
	cli.RequestResponse(payload.NewString("你好", "世界")).
		DoOnSuccess(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			log.Println("receive response:", elem)
		}).
		Subscribe(context.Background())
}

NOTICE: more server examples are Here

Advanced

Load Balance

Basic load balance feature, please checkout current master branch. It's a client side load-balancer.

NOTICE: Balancer APIs are here

Reactor API

Mono and Flux are two parts of Reactor API.

Mono

Mono completes successfully by emitting an element, or with an error. Here is a tiny example:

package main

import (
	"context"

	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Create a Mono which produce a simple payload.
	mono := rx.NewMono(func(ctx context.Context, sink rx.MonoProducer) {
		// Use context API if you want.
		sink.Success(payload.NewString("foo", "bar"))
	})

	done := make(chan struct{})

	mono.
		DoFinally(func(ctx context.Context, st rx.SignalType) {
			close(done)
		}).
		DoOnSuccess(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			// Handle and consume payload.
			// Do something here...
		}).
		SubscribeOn(rx.ElasticScheduler()).
		Subscribe(context.Background())

	<-done
}

Flux

Flux emits 0 to N elements, and then completes (successfully or with an error). Here is tiny example:

package main

import (
	"context"
	"time"

	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Create a Flux and produce 10 elements.
	flux := rx.NewFlux(func(ctx context.Context, producer rx.Producer) {
		for i := 0; i < 10; i++ {
			producer.Next(payload.NewString("hello", time.Now().String()))
		}
		producer.Complete()
	})
	flux.DoOnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			// Handle and consume elements
			// Do something here...
		}).
		Subscribe(context.Background())
}

Backpressure & RequestN

Flux support backpressure.

You can call func Request in Subscription or use LimitRate before subscribe.

// Here is an example which consume Payload one by one.
flux.Subscribe(
    context.Background(),
    rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
        // Init Request 1 element.
        s.Request(1)
    }),
    rx.OnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
        // Consume element, do something...

        // Request for next one manually.
        s.Request(1)
    }),
)

Dependencies

TODO

Transport

  • TCP
  • Websocket
  • Aeron

Duplex Socket

  • MetadataPush
  • RequestFNF
  • RequestResponse
  • RequestStream
  • RequestChannel
Others
  • Resume
  • Keepalive
  • Fragmentation
  • Thin Reactor
  • Cancel
  • Error
  • Flow Control: RequestN
  • Flow Control: Lease
  • Load Balance
You can’t perform that action at this time.