Skip to content

Commit

Permalink
feat(rx): add filter operator (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 10, 2024
1 parent 1153d0b commit 8a8da81
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
56 changes: 56 additions & 0 deletions rx/observable-operator-filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

func even(item rx.Item[int]) bool {
return item.V%2 == 0
}

var _ = Describe("Observable operator", func() {
Context("Filter", func() {
When("principle", func() {
It("🧪 should: return filtered items", func() {
// rxgo: Test_Observable_Filter
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3, 4).Filter(even)
rx.Assert(ctx, obs,
rx.HasItems[int]{
Expected: []int{2, 4},
},
rx.HasNoError[int]{},
)
})
})

Context("Parallel", func() {
When("foo", func() {
It("🧪 should: return filtered items", func() {
// rxgo: Test_Observable_Filter_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2, 3, 4).Filter(even,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs,
rx.HasItemsNoOrder[int]{
Expected: []int{2, 4},
},
rx.HasNoError[int]{},
)
})
})
})
})
})
38 changes: 38 additions & 0 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,44 @@ func (o *ObservableImpl[T]) Errors(opts ...Option[T]) []error {
}
}

// Filter emits only those items from an Observable that pass a predicate test.
func (o *ObservableImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) Observable[T] {
const (
forceSeq = false
bypassGather = true
)

return observable(o.parent, o, func() operator[T] {
return &filterOperator[T]{apply: apply}
}, forceSeq, bypassGather, opts...)
}

type filterOperator[T any] struct {
apply Predicate[T]
}

func (op *filterOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], _ operatorOptions[T],
) {
if op.apply(item) {
item.SendContext(ctx, dst)
}
}

func (op *filterOperator[T]) err(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *filterOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
}

func (op *filterOperator[T]) gatherNext(_ context.Context, _ Item[T],
_ chan<- Item[T], _ operatorOptions[T],
) {
}

// !!!

// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
Expand Down
1 change: 1 addition & 0 deletions rx/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Observable[T any] interface {
ElementAt(index uint, opts ...Option[T]) Single[T]
Error(opts ...Option[T]) error
Errors(opts ...Option[T]) []error
Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Map(apply Func[T], opts ...Option[T]) Observable[T]
Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
Expand Down

0 comments on commit 8a8da81

Please sign in to comment.