diff --git a/go.mod b/go.mod index a0eeaeb..4fb8281 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module github.com/rsocket/rsocket-go go 1.11 require ( - github.com/golang/mock v1.4.3 + github.com/golang/mock v1.4.4 github.com/google/uuid v1.1.2 github.com/gorilla/websocket v1.4.2 - github.com/jjeffcaii/reactor-go v0.4.5 + github.com/jjeffcaii/reactor-go v0.5.0 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.5.0 github.com/stretchr/testify v1.6.1 - github.com/urfave/cli/v2 v2.1.1 + github.com/urfave/cli/v2 v2.3.0 go.uber.org/atomic v1.7.0 ) diff --git a/go.sum b/go.sum index 83bbe92..f23cf56 100644 --- a/go.sum +++ b/go.sum @@ -4,14 +4,14 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw= -github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= +github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= +github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/jjeffcaii/reactor-go v0.4.5 h1:FOc3ICOIxPHzJBLYmA6Hf1jpPduX0Czzh4i62Wz17A4= -github.com/jjeffcaii/reactor-go v0.4.5/go.mod h1:2ZzeNFnQ2c55NHRh0KJ4k5yMvmrcpx1APzh7BKkRNQE= +github.com/jjeffcaii/reactor-go v0.5.0 h1:Y8VVp31JGij/ilMh8bXyNEo8k2SVSmc4Cc+mzPGL07w= +github.com/jjeffcaii/reactor-go v0.5.0/go.mod h1:qYN34C2UANFOtDeUGvhxlExLpFMDbvSrphX3Gb3H6S8= github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU= github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -30,23 +30,21 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k= -github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= -rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/rx/mono/mono.go b/rx/mono/mono.go index b12ca3b..5356577 100644 --- a/rx/mono/mono.go +++ b/rx/mono/mono.go @@ -12,6 +12,8 @@ import ( // ReleaseFunc can be used to release resources. type ReleaseFunc func() +type Item = rx.Item +type Combinator2 = func(first, second Item) (payload.Payload, error) // Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error. type Mono interface { @@ -49,6 +51,8 @@ type Mono interface { SwitchIfError(alternative func(error) Mono) Mono // SwitchValueIfError switch to an alternative Payload if this Mono is end with an error. SwitchValueIfError(alternative payload.Payload) Mono + // ZipWith combines the result from this mono and another into a new Payload. + ZipWith(alternative Mono, cmb Combinator2) Mono // Raw returns low-level reactor.Mono which defined in reactor-go library. Raw() mono.Mono // ToChan subscribe Mono and puts items into a chan. @@ -65,9 +69,3 @@ type Sink interface { // Error emits an error then complete current Sink. Error(error) } - -// Processor combine Sink and Mono. -type Processor interface { - Sink - Mono -} diff --git a/rx/mono/proxy_default.go b/rx/mono/proxy_default.go index 8ece207..a58cf84 100644 --- a/rx/mono/proxy_default.go +++ b/rx/mono/proxy_default.go @@ -27,14 +27,6 @@ func (p proxy) Raw() mono.Mono { return p.Mono } -func (p proxy) Success(v payload.Payload) { - mustProcessor(p.Mono).Success(v) -} - -func (p proxy) Error(e error) { - mustProcessor(p.Mono).Error(e) -} - func (p proxy) ToChan(ctx context.Context) (<-chan payload.Payload, <-chan error) { return toChan(ctx, p.Mono) } @@ -120,12 +112,12 @@ func (p proxy) DoOnCancel(fn rx.FnOnCancel) Mono { } func (p proxy) SwitchIfEmpty(alternative Mono) Mono { - return newProxy(p.Mono.SwitchIfEmpty(alternative.Raw())) + return newProxy(p.Mono.SwitchIfEmpty(unpackRawPublisher(alternative))) } func (p proxy) SwitchIfError(alternative func(error) Mono) Mono { return newProxy(p.Mono.SwitchIfError(func(err error) mono.Mono { - return alternative(err).Raw() + return unpackRawPublisher(alternative(err)) })) } @@ -137,6 +129,20 @@ func (p proxy) Timeout(timeout time.Duration) Mono { return newProxy(p.Mono.Timeout(timeout)) } +func (p proxy) ZipWith(alternative Mono, cmb Combinator2) Mono { + return Zip(p, alternative).ToMono(func(item rx.Tuple) (payload.Payload, error) { + first, err := convertItem(item[0]) + if err != nil { + return nil, err + } + second, err := convertItem(item[1]) + if err != nil { + return nil, err + } + return cmb(first, second) + }) +} + func (p proxy) Subscribe(ctx context.Context, options ...rx.SubscriberOption) { p.SubscribeWith(ctx, rx.NewSubscriber(options...)) } diff --git a/rx/mono/proxy_oneshot.go b/rx/mono/proxy_oneshot.go index 6d93bb2..b97de8e 100644 --- a/rx/mono/proxy_oneshot.go +++ b/rx/mono/proxy_oneshot.go @@ -35,14 +35,6 @@ func returnOneshotProxy(o *oneshotProxy) (raw mono.Mono) { return } -func (o *oneshotProxy) Success(p payload.Payload) { - mustProcessor(o.Mono).Success(p) -} - -func (o *oneshotProxy) Error(err error) { - mustProcessor(o.Mono).Error(err) -} - func (o *oneshotProxy) SubscribeWith(ctx context.Context, s rx.Subscriber) { var sub reactor.Subscriber if s == rx.EmptySubscriber { @@ -143,13 +135,13 @@ func (o *oneshotProxy) Block(ctx context.Context) (payload.Payload, error) { } func (o *oneshotProxy) SwitchIfEmpty(alternative Mono) Mono { - o.Mono = o.Mono.SwitchIfEmpty(alternative.Raw()) + o.Mono = o.Mono.SwitchIfEmpty(unpackRawPublisher(alternative)) return o } func (o *oneshotProxy) SwitchIfError(alternative func(error) Mono) Mono { o.Mono = o.Mono.SwitchIfError(func(err error) mono.Mono { - return alternative(err).Raw() + return unpackRawPublisher(alternative(err)) }) return o } @@ -158,6 +150,19 @@ func (o *oneshotProxy) SwitchValueIfError(alternative payload.Payload) Mono { o.Mono = o.Mono.SwitchValueIfError(alternative) return o } +func (o *oneshotProxy) ZipWith(alternative Mono, cmb Combinator2) Mono { + return Zip(o, alternative).ToMonoOneshot(func(item rx.Tuple) (payload.Payload, error) { + first, err := convertItem(item[0]) + if err != nil { + return nil, err + } + second, err := convertItem(item[1]) + if err != nil { + return nil, err + } + return cmb(first, second) + }) +} func (o *oneshotProxy) Raw() mono.Mono { return o.Mono diff --git a/rx/mono/utils.go b/rx/mono/utils.go index 6c80af6..b6800ff 100644 --- a/rx/mono/utils.go +++ b/rx/mono/utils.go @@ -8,6 +8,7 @@ import ( "github.com/jjeffcaii/reactor-go/scheduler" "github.com/pkg/errors" "github.com/rsocket/rsocket-go/payload" + "github.com/rsocket/rsocket-go/rx" ) var empty = newProxy(mono.Empty()) @@ -32,6 +33,10 @@ func Raw(input mono.Mono) Mono { return newProxy(input) } +func RawOneshot(origin mono.Mono) Mono { + return borrowOneshotProxy(origin) +} + // Just wrap an exist Payload to a Mono. func Just(input payload.Payload) Mono { return newProxy(mono.Just(input)) @@ -156,14 +161,6 @@ func toChan(ctx context.Context, publisher mono.Mono) (<-chan payload.Payload, < return value, err } -func mustProcessor(origin mono.Mono) mono.Processor { - m, ok := origin.(mono.Processor) - if !ok { - panic(errors.Errorf("require processor but %v", origin)) - } - return m -} - func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) { done := make(chan struct{}) vchan := make(chan payload.Payload, 1) @@ -184,3 +181,35 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) { return nil, nil } } + +func unpackRawPublisher(source Mono) mono.Mono { + if source == nil { + return nil + } + switch t := source.(type) { + case *oneshotProxy: + return returnOneshotProxy(t) + default: + return t.Raw() + } +} + +func convertItem(item *reactor.Item) (result rx.Item, err error) { + if item == nil { + return + } + if item.E != nil { + result.E = item.E + return + } + if item.V == nil { + return + } + p, ok := item.V.(payload.Payload) + if !ok { + err = errors.Errorf("require Payload value type instead of %t", item.V) + return + } + result.V = p + return +} diff --git a/rx/mono/zip.go b/rx/mono/zip.go index d83afcd..25799ae 100644 --- a/rx/mono/zip.go +++ b/rx/mono/zip.go @@ -3,34 +3,82 @@ package mono import ( "github.com/jjeffcaii/reactor-go" "github.com/jjeffcaii/reactor-go/mono" - "github.com/jjeffcaii/reactor-go/tuple" + "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" ) +// Zip merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple. func Zip(first Mono, second Mono, others ...Mono) ZipBuilder { - var all []Mono - all = append(all, first, second) - all = append(all, others...) - return ZipAll(all...) + if len(others) < 1 { + return []mono.Mono{ + unpackRawPublisher(first), + unpackRawPublisher(second), + } + } + sources := make([]mono.Mono, 2+len(others)) + sources[0] = unpackRawPublisher(first) + sources[1] = unpackRawPublisher(second) + for i := 0; i < len(others); i++ { + sources[i+2] = unpackRawPublisher(others[i]) + } + return sources } +// ZipAll merges given Monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple. func ZipAll(sources ...Mono) ZipBuilder { - if len(sources) < 1 { - panic("at least one Mono for zip operation") - } all := make([]mono.Mono, len(sources)) for i := 0; i < len(all); i++ { - all[i] = sources[i].Raw() + all[i] = unpackRawPublisher(sources[i]) } return all } +// ZipBuilder can be used to build a zipped Mono. type ZipBuilder []mono.Mono -func (z ZipBuilder) ToMono(transform func(rx.Tuple) (payload.Payload, error)) Mono { - return Raw(mono.ZipAll(z...).Map(func(any reactor.Any) (reactor.Any, error) { - tup := rx.NewTuple(any.(tuple.Tuple)) - return transform(tup) - })) +// ToMonoOneshot builds as a oneshot Mono. +func (z ZipBuilder) ToMonoOneshot(transform func(rx.Tuple) (payload.Payload, error)) Mono { + return RawOneshot(mono.ZipCombineOneshot(cmb(transform), pinItem, z...)) +} + +// ToMono builds a Mono. +func (z ZipBuilder) ToMono(transform func(item rx.Tuple) (payload.Payload, error)) Mono { + return Raw(mono.ZipCombine(cmb(transform), pinItem, z...)) +} + +func unpinItem(item *reactor.Item) { + if item == nil { + return + } + if r, _ := item.V.(common.Releasable); r != nil { + r.Release() + } + if r, _ := item.E.(common.Releasable); r != nil { + r.Release() + } +} + +func pinItem(item *reactor.Item) { + if item == nil { + return + } + if r, _ := item.V.(common.Releasable); r != nil { + r.IncRef() + } + if r, _ := item.E.(common.Releasable); r != nil { + r.IncRef() + } +} + +func cmb(transform func(rx.Tuple) (payload.Payload, error)) func(...*reactor.Item) (reactor.Any, error) { + return func(values ...*reactor.Item) (reactor.Any, error) { + defer func() { + for i := 0; i < len(values); i++ { + unpinItem(values[i]) + } + }() + t := rx.NewTuple(values...) + return transform(t) + } } diff --git a/rx/mono/zip_test.go b/rx/mono/zip_test.go index 31f27dd..eef0dea 100644 --- a/rx/mono/zip_test.go +++ b/rx/mono/zip_test.go @@ -12,10 +12,10 @@ import ( func TestZipBuilder_ToMono(t *testing.T) { v, err := mono.Zip(mono.Just(_fakePayload), mono.Just(_fakePayload)). - ToMono(func(tuple rx.Tuple) (payload.Payload, error) { - assert.Equal(t, 2, tuple.Len()) - for i := 0; i < tuple.Len(); i++ { - v, err := tuple.Get(i) + ToMono(func(items rx.Tuple) (payload.Payload, error) { + assert.Equal(t, 2, items.Len()) + for i := 0; i < len(items); i++ { + v, err := items.Get(i) assert.NoError(t, err) assert.Equal(t, _fakePayload, v) } @@ -33,3 +33,16 @@ func TestZip_Empty(t *testing.T) { }).Block(context.Background()) }) } + +func TestZipWith(t *testing.T) { + p1 := payload.NewString("hello", "") + p2 := payload.NewString("world", "") + res, err := mono.Just(p1). + ZipWith(mono.Just(p2), func(first, second mono.Item) (payload.Payload, error) { + data := first.V.DataUTF8() + " " + second.V.DataUTF8() + "!" + return payload.NewString(data, ""), nil + }). + Block(context.Background()) + assert.NoError(t, err, "should not return error") + assert.Equal(t, "hello world!", res.DataUTF8(), "bad result") +} diff --git a/rx/rx.go b/rx/rx.go index 7868aa6..979237f 100644 --- a/rx/rx.go +++ b/rx/rx.go @@ -73,3 +73,9 @@ type SignalType reactor.SignalType func (s SignalType) String() string { return reactor.SignalType(s).String() } + +// Item is a kind of container which contains value or error. +type Item struct { + V payload.Payload + E error +} diff --git a/rx/tuple.go b/rx/tuple.go index 8b86937..8c4cc85 100644 --- a/rx/tuple.go +++ b/rx/tuple.go @@ -10,40 +10,55 @@ import ( var errWrongTupleType = errors.New("tuple value must be a payload") +// IsWrongTupleTypeError returns true if target error is type of wrong tuple type. func IsWrongTupleTypeError(err error) bool { return err == errWrongTupleType } -func NewTuple(t tuple.Tuple) Tuple { - return Tuple{inner: t} +// NewTuple returns a new Tuple. +func NewTuple(t ...*reactor.Item) Tuple { + return t } -type Tuple struct { - inner tuple.Tuple -} +// Tuple is a container contains multiple items. +type Tuple []*reactor.Item +// First returns the first value or error. func (t Tuple) First() (payload.Payload, error) { - return t.innerReturn(t.inner.First()) + return t.convert(t.inner().First()) } +// Second returns the second value or error. func (t Tuple) Second() (payload.Payload, error) { - return t.innerReturn(t.inner.Second()) + return t.convert(t.inner().Second()) } +// Last returns the last value or error. func (t Tuple) Last() (payload.Payload, error) { - return t.innerReturn(t.inner.Last()) + return t.convert(t.inner().Last()) } +// Get returns the value or error with custom index. func (t Tuple) Get(index int) (payload.Payload, error) { - return t.innerReturn(t.inner.Get(index)) + return t.convert(t.inner().Get(index)) +} + +// GetValue returns the value with custom index. +func (t Tuple) GetValue(index int) payload.Payload { + if v := t.inner().GetValue(index); v != nil { + return v.(payload.Payload) + } + return nil } +// Len returns the length of Tuple. func (t Tuple) Len() int { - return t.inner.Len() + return t.inner().Len() } +// ForEach visits each item in the Tuple. func (t Tuple) ForEach(callback func(payload.Payload, error) bool) { - t.inner.ForEach(func(v reactor.Any, e error) bool { + t.inner().ForEach(func(v reactor.Any, e error) bool { if v == nil { return callback(nil, e) } @@ -55,8 +70,9 @@ func (t Tuple) ForEach(callback func(payload.Payload, error) bool) { }) } +// ForEachWithIndex visits each item in the Tuple with index. func (t Tuple) ForEachWithIndex(callback func(payload.Payload, error, int) bool) { - t.inner.ForEachWithIndex(func(v reactor.Any, e error, index int) bool { + t.inner().ForEachWithIndex(func(v reactor.Any, e error, index int) bool { if v == nil { return callback(nil, e, index) } @@ -68,7 +84,26 @@ func (t Tuple) ForEachWithIndex(callback func(payload.Payload, error, int) bool) }) } -func (t Tuple) innerReturn(value reactor.Any, err error) (payload.Payload, error) { +// HasError returns true if this Tuple contains error. +func (t Tuple) HasError() bool { + return t.inner().HasError() +} + +// CollectValues collects values and returns a slice. +func (t Tuple) CollectValues() (values []payload.Payload) { + for i := 0; i < len(t); i++ { + next := t[i] + if next == nil || next.E != nil || next.V == nil { + continue + } + if v, ok := next.V.(payload.Payload); ok { + values = append(values, v) + } + } + return +} + +func (t Tuple) convert(value reactor.Any, err error) (payload.Payload, error) { if err != nil { return nil, err } @@ -81,3 +116,7 @@ func (t Tuple) innerReturn(value reactor.Any, err error) (payload.Payload, error } return nil, errWrongTupleType } + +func (t Tuple) inner() tuple.Tuple { + return tuple.NewTuple(t...) +} diff --git a/rx/tuple_test.go b/rx/tuple_test.go index 5ecc291..ccb5c7a 100644 --- a/rx/tuple_test.go +++ b/rx/tuple_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/jjeffcaii/reactor-go" - "github.com/jjeffcaii/reactor-go/tuple" "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" "github.com/stretchr/testify/assert" @@ -17,8 +16,11 @@ var ( ) func TestTuple(t *testing.T) { - rawTuple := tuple.NewTuple(&reactor.Item{V: fakePayload}, &reactor.Item{E: fakeError}, &reactor.Item{}) - tup := rx.NewTuple(rawTuple) + tup := rx.NewTuple(&reactor.Item{V: fakePayload}, &reactor.Item{E: fakeError}, &reactor.Item{}) + assert.NotNil(t, tup.GetValue(0)) + assert.Nil(t, tup.GetValue(-1)) + assert.Nil(t, tup.GetValue(999)) + v, err := tup.First() assert.NoError(t, err, "should not return error") assert.Equal(t, fakePayload, v, "should be fake payload") @@ -63,7 +65,7 @@ func TestTuple(t *testing.T) { } func TestTupleWithWrongType(t *testing.T) { - tup := rx.NewTuple(tuple.NewTuple(&reactor.Item{V: 1})) + tup := rx.NewTuple(&reactor.Item{V: 1}) _, err := tup.First() assert.Error(t, err) assert.True(t, rx.IsWrongTupleTypeError(err)) @@ -78,6 +80,25 @@ func TestTupleWithWrongType(t *testing.T) { } func TestTuple_Empty(t *testing.T) { - tup := tuple.NewTuple() + tup := rx.NewTuple() assert.Zero(t, tup.Len()) + assert.False(t, tup.HasError()) + assert.Nil(t, tup.GetValue(0)) +} + +func TestTuple_HasError(t *testing.T) { + tu := rx.NewTuple(&reactor.Item{V: fakePayload}, &reactor.Item{E: fakeError}) + assert.True(t, tu.HasError()) + tu = rx.NewTuple(&reactor.Item{V: fakeError}) + assert.False(t, tu.HasError()) +} + +func TestTuple_CollectValues(t *testing.T) { + tu := rx.NewTuple() + res := tu.CollectValues() + assert.Empty(t, res) + + tu = rx.NewTuple(nil, &reactor.Item{V: fakePayload}, nil, &reactor.Item{E: fakeError}) + res = tu.CollectValues() + assert.Len(t, res, 1) }