Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ module github.com/rsocket/rsocket-go
go 1.11

require (
github.com/golang/mock v1.4.3
github.com/golang/mock v1.4.4
github.com/google/uuid v1.1.2
github.com/gorilla/websocket v1.4.2
github.com/jjeffcaii/reactor-go v0.4.5
github.com/jjeffcaii/reactor-go v0.5.0
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.5.0
github.com/stretchr/testify v1.6.1
github.com/urfave/cli/v2 v2.1.1
github.com/urfave/cli/v2 v2.3.0
go.uber.org/atomic v1.7.0
)
16 changes: 7 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jjeffcaii/reactor-go v0.4.5 h1:FOc3ICOIxPHzJBLYmA6Hf1jpPduX0Czzh4i62Wz17A4=
github.com/jjeffcaii/reactor-go v0.4.5/go.mod h1:2ZzeNFnQ2c55NHRh0KJ4k5yMvmrcpx1APzh7BKkRNQE=
github.com/jjeffcaii/reactor-go v0.5.0 h1:Y8VVp31JGij/ilMh8bXyNEo8k2SVSmc4Cc+mzPGL07w=
github.com/jjeffcaii/reactor-go v0.5.0/go.mod h1:qYN34C2UANFOtDeUGvhxlExLpFMDbvSrphX3Gb3H6S8=
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -30,23 +30,21 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k=
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
10 changes: 4 additions & 6 deletions rx/mono/mono.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// ReleaseFunc can be used to release resources.
type ReleaseFunc func()
type Item = rx.Item
type Combinator2 = func(first, second Item) (payload.Payload, error)

// Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
type Mono interface {
Expand Down Expand Up @@ -49,6 +51,8 @@ type Mono interface {
SwitchIfError(alternative func(error) Mono) Mono
// SwitchValueIfError switch to an alternative Payload if this Mono is end with an error.
SwitchValueIfError(alternative payload.Payload) Mono
// ZipWith combines the result from this mono and another into a new Payload.
ZipWith(alternative Mono, cmb Combinator2) Mono
// Raw returns low-level reactor.Mono which defined in reactor-go library.
Raw() mono.Mono
// ToChan subscribe Mono and puts items into a chan.
Expand All @@ -65,9 +69,3 @@ type Sink interface {
// Error emits an error then complete current Sink.
Error(error)
}

// Processor combine Sink and Mono.
type Processor interface {
Sink
Mono
}
26 changes: 16 additions & 10 deletions rx/mono/proxy_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ func (p proxy) Raw() mono.Mono {
return p.Mono
}

func (p proxy) Success(v payload.Payload) {
mustProcessor(p.Mono).Success(v)
}

func (p proxy) Error(e error) {
mustProcessor(p.Mono).Error(e)
}

func (p proxy) ToChan(ctx context.Context) (<-chan payload.Payload, <-chan error) {
return toChan(ctx, p.Mono)
}
Expand Down Expand Up @@ -120,12 +112,12 @@ func (p proxy) DoOnCancel(fn rx.FnOnCancel) Mono {
}

func (p proxy) SwitchIfEmpty(alternative Mono) Mono {
return newProxy(p.Mono.SwitchIfEmpty(alternative.Raw()))
return newProxy(p.Mono.SwitchIfEmpty(unpackRawPublisher(alternative)))
}

func (p proxy) SwitchIfError(alternative func(error) Mono) Mono {
return newProxy(p.Mono.SwitchIfError(func(err error) mono.Mono {
return alternative(err).Raw()
return unpackRawPublisher(alternative(err))
}))
}

Expand All @@ -137,6 +129,20 @@ func (p proxy) Timeout(timeout time.Duration) Mono {
return newProxy(p.Mono.Timeout(timeout))
}

func (p proxy) ZipWith(alternative Mono, cmb Combinator2) Mono {
return Zip(p, alternative).ToMono(func(item rx.Tuple) (payload.Payload, error) {
first, err := convertItem(item[0])
if err != nil {
return nil, err
}
second, err := convertItem(item[1])
if err != nil {
return nil, err
}
return cmb(first, second)
})
}

func (p proxy) Subscribe(ctx context.Context, options ...rx.SubscriberOption) {
p.SubscribeWith(ctx, rx.NewSubscriber(options...))
}
Expand Down
25 changes: 15 additions & 10 deletions rx/mono/proxy_oneshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ func returnOneshotProxy(o *oneshotProxy) (raw mono.Mono) {
return
}

func (o *oneshotProxy) Success(p payload.Payload) {
mustProcessor(o.Mono).Success(p)
}

func (o *oneshotProxy) Error(err error) {
mustProcessor(o.Mono).Error(err)
}

func (o *oneshotProxy) SubscribeWith(ctx context.Context, s rx.Subscriber) {
var sub reactor.Subscriber
if s == rx.EmptySubscriber {
Expand Down Expand Up @@ -143,13 +135,13 @@ func (o *oneshotProxy) Block(ctx context.Context) (payload.Payload, error) {
}

func (o *oneshotProxy) SwitchIfEmpty(alternative Mono) Mono {
o.Mono = o.Mono.SwitchIfEmpty(alternative.Raw())
o.Mono = o.Mono.SwitchIfEmpty(unpackRawPublisher(alternative))
return o
}

func (o *oneshotProxy) SwitchIfError(alternative func(error) Mono) Mono {
o.Mono = o.Mono.SwitchIfError(func(err error) mono.Mono {
return alternative(err).Raw()
return unpackRawPublisher(alternative(err))
})
return o
}
Expand All @@ -158,6 +150,19 @@ func (o *oneshotProxy) SwitchValueIfError(alternative payload.Payload) Mono {
o.Mono = o.Mono.SwitchValueIfError(alternative)
return o
}
func (o *oneshotProxy) ZipWith(alternative Mono, cmb Combinator2) Mono {
return Zip(o, alternative).ToMonoOneshot(func(item rx.Tuple) (payload.Payload, error) {
first, err := convertItem(item[0])
if err != nil {
return nil, err
}
second, err := convertItem(item[1])
if err != nil {
return nil, err
}
return cmb(first, second)
})
}

func (o *oneshotProxy) Raw() mono.Mono {
return o.Mono
Expand Down
45 changes: 37 additions & 8 deletions rx/mono/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/pkg/errors"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
)

var empty = newProxy(mono.Empty())
Expand All @@ -32,6 +33,10 @@ func Raw(input mono.Mono) Mono {
return newProxy(input)
}

func RawOneshot(origin mono.Mono) Mono {
return borrowOneshotProxy(origin)
}

// Just wrap an exist Payload to a Mono.
func Just(input payload.Payload) Mono {
return newProxy(mono.Just(input))
Expand Down Expand Up @@ -156,14 +161,6 @@ func toChan(ctx context.Context, publisher mono.Mono) (<-chan payload.Payload, <
return value, err
}

func mustProcessor(origin mono.Mono) mono.Processor {
m, ok := origin.(mono.Processor)
if !ok {
panic(errors.Errorf("require processor but %v", origin))
}
return m
}

func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
done := make(chan struct{})
vchan := make(chan payload.Payload, 1)
Expand All @@ -184,3 +181,35 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
return nil, nil
}
}

func unpackRawPublisher(source Mono) mono.Mono {
if source == nil {
return nil
}
switch t := source.(type) {
case *oneshotProxy:
return returnOneshotProxy(t)
default:
return t.Raw()
}
}

func convertItem(item *reactor.Item) (result rx.Item, err error) {
if item == nil {
return
}
if item.E != nil {
result.E = item.E
return
}
if item.V == nil {
return
}
p, ok := item.V.(payload.Payload)
if !ok {
err = errors.Errorf("require Payload value type instead of %t", item.V)
return
}
result.V = p
return
}
76 changes: 62 additions & 14 deletions rx/mono/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,82 @@ package mono
import (
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/mono"
"github.com/jjeffcaii/reactor-go/tuple"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
)

// Zip merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple.
func Zip(first Mono, second Mono, others ...Mono) ZipBuilder {
var all []Mono
all = append(all, first, second)
all = append(all, others...)
return ZipAll(all...)
if len(others) < 1 {
return []mono.Mono{
unpackRawPublisher(first),
unpackRawPublisher(second),
}
}
sources := make([]mono.Mono, 2+len(others))
sources[0] = unpackRawPublisher(first)
sources[1] = unpackRawPublisher(second)
for i := 0; i < len(others); i++ {
sources[i+2] = unpackRawPublisher(others[i])
}
return sources
}

// ZipAll merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple.
func ZipAll(sources ...Mono) ZipBuilder {
if len(sources) < 1 {
panic("at least one Mono for zip operation")
}
all := make([]mono.Mono, len(sources))
for i := 0; i < len(all); i++ {
all[i] = sources[i].Raw()
all[i] = unpackRawPublisher(sources[i])
}
return all
}

// ZipBuilder can be used to build a zipped Mono.
type ZipBuilder []mono.Mono

func (z ZipBuilder) ToMono(transform func(rx.Tuple) (payload.Payload, error)) Mono {
return Raw(mono.ZipAll(z...).Map(func(any reactor.Any) (reactor.Any, error) {
tup := rx.NewTuple(any.(tuple.Tuple))
return transform(tup)
}))
// ToMonoOneshot builds as a oneshot Mono.
func (z ZipBuilder) ToMonoOneshot(transform func(rx.Tuple) (payload.Payload, error)) Mono {
return RawOneshot(mono.ZipCombineOneshot(cmb(transform), pinItem, z...))
}

// ToMono builds a Mono.
func (z ZipBuilder) ToMono(transform func(item rx.Tuple) (payload.Payload, error)) Mono {
return Raw(mono.ZipCombine(cmb(transform), pinItem, z...))
}

func unpinItem(item *reactor.Item) {
if item == nil {
return
}
if r, _ := item.V.(common.Releasable); r != nil {
r.Release()
}
if r, _ := item.E.(common.Releasable); r != nil {
r.Release()
}
}

func pinItem(item *reactor.Item) {
if item == nil {
return
}
if r, _ := item.V.(common.Releasable); r != nil {
r.IncRef()
}
if r, _ := item.E.(common.Releasable); r != nil {
r.IncRef()
}
}

func cmb(transform func(rx.Tuple) (payload.Payload, error)) func(...*reactor.Item) (reactor.Any, error) {
return func(values ...*reactor.Item) (reactor.Any, error) {
defer func() {
for i := 0; i < len(values); i++ {
unpinItem(values[i])
}
}()
t := rx.NewTuple(values...)
return transform(t)
}
}
21 changes: 17 additions & 4 deletions rx/mono/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

func TestZipBuilder_ToMono(t *testing.T) {
v, err := mono.Zip(mono.Just(_fakePayload), mono.Just(_fakePayload)).
ToMono(func(tuple rx.Tuple) (payload.Payload, error) {
assert.Equal(t, 2, tuple.Len())
for i := 0; i < tuple.Len(); i++ {
v, err := tuple.Get(i)
ToMono(func(items rx.Tuple) (payload.Payload, error) {
assert.Equal(t, 2, items.Len())
for i := 0; i < len(items); i++ {
v, err := items.Get(i)
assert.NoError(t, err)
assert.Equal(t, _fakePayload, v)
}
Expand All @@ -33,3 +33,16 @@ func TestZip_Empty(t *testing.T) {
}).Block(context.Background())
})
}

func TestZipWith(t *testing.T) {
p1 := payload.NewString("hello", "")
p2 := payload.NewString("world", "")
res, err := mono.Just(p1).
ZipWith(mono.Just(p2), func(first, second mono.Item) (payload.Payload, error) {
data := first.V.DataUTF8() + " " + second.V.DataUTF8() + "!"
return payload.NewString(data, ""), nil
}).
Block(context.Background())
assert.NoError(t, err, "should not return error")
assert.Equal(t, "hello world!", res.DataUTF8(), "bad result")
}
Loading