diff --git a/rx/flux/flux.go b/rx/flux/flux.go index 9236bc0..eab8aca 100644 --- a/rx/flux/flux.go +++ b/rx/flux/flux.go @@ -5,6 +5,7 @@ import ( "github.com/jjeffcaii/reactor-go/flux" "github.com/jjeffcaii/reactor-go/scheduler" + "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" ) @@ -22,10 +23,10 @@ type Sink interface { Error(e error) } -// Flux represents represents a reactive sequence of 0..N items. +// Flux represents a reactive sequence of 0..N items. type Flux interface { rx.Publisher - // Take take only the first N values from this Flux, if available. + // Take takes only the first N values from this Flux, if available. Take(n int) Flux // Filter evaluate each source value against the given Predicate. // If the predicate test succeeds, the value is emitted. diff --git a/rx/flux/proxy.go b/rx/flux/proxy.go index 0bb01d0..7f67cd9 100644 --- a/rx/flux/proxy.go +++ b/rx/flux/proxy.go @@ -6,6 +6,7 @@ import ( "github.com/jjeffcaii/reactor-go" "github.com/jjeffcaii/reactor-go/flux" "github.com/jjeffcaii/reactor-go/scheduler" + "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" ) @@ -76,12 +77,19 @@ func (p proxy) ToChan(ctx context.Context, cap int) (<-chan payload.Payload, <-c err <- reactor.ErrSubscribeCancelled } }). + Map(func(any reactor.Any) (reactor.Any, error) { + return payload.Clone(any.(payload.Payload)), nil + }). SubscribeWithChan(ctx, ch, err) return ch, err } func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error) { - v, err := p.Flux.BlockFirst(ctx) + v, err := p.Flux. + Map(func(any reactor.Any) (reactor.Any, error) { + return payload.Clone(any.(payload.Payload)), nil + }). + BlockFirst(ctx) if err != nil { return } @@ -92,14 +100,18 @@ func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error } func (p proxy) BlockLast(ctx context.Context) (last payload.Payload, err error) { - v, err := p.Flux.BlockLast(ctx) + v, err := p.Flux. + Map(func(any reactor.Any) (reactor.Any, error) { + return payload.Clone(any.(payload.Payload)), nil + }). + BlockLast(ctx) if err != nil { return } - if v == nil { - return + if v != nil { + last = v.(payload.Payload) } - last = v.(payload.Payload) + return } @@ -119,7 +131,7 @@ func (p proxy) BlockSlice(ctx context.Context) (results []payload.Payload, err e Subscribe( ctx, reactor.OnNext(func(v reactor.Any) error { - results = append(results, v.(payload.Payload)) + results = append(results, payload.Clone(v.(payload.Payload))) return nil }), reactor.OnError(func(e error) {