Skip to content

Commit

Permalink
feat(rx): add map, max and min operators (#181)
Browse files Browse the repository at this point in the history
feat(rx): add map operator tests (#181)

feat(rx): add map operator (#181)

feat(rx): add min operator (#181)
  • Loading branch information
plastikfan committed Apr 11, 2024
1 parent 25a89b8 commit cd2dffd
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 38 deletions.
6 changes: 4 additions & 2 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] {

// CombineLatest combines the latest item emitted by each Observable via a specified function
// and emit items based on the results of this function.
func CombineLatest[T any](f FuncN[T], observables []Observable[T], opts ...Option[T]) Observable[T] {
func CombineLatest[T any](f FuncN[T], observables []Observable[T],
calc Calculator[T], opts ...Option[T],
) Observable[T] {
option := parseOptions(opts...)
ctx := option.buildContext(emptyContext)
next := option.buildChannel()
Expand Down Expand Up @@ -108,7 +110,7 @@ func CombineLatest[T any](f FuncN[T], observables []Observable[T], opts ...Optio
return
}

if isZero(s[i]) { // TODO(check): s[i] == nil
if calc.IsZero(s[i]) {
atomic.AddUint32(&counter, 1)
}

Expand Down
17 changes: 9 additions & 8 deletions rx/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ var _ = Describe("Factory", func() {

obs := rx.CombineLatest(func(values ...int) int {
return lo.Sum(values)
}, lo.Map([]rx.Observable[int]{
testObservable[int](ctx, 1, 2),
testObservable[int](ctx, 10, 11),
}, func(it rx.Observable[int], _ int) rx.Observable[int] {
return it
}))
},
lo.Map([]rx.Observable[int]{
testObservable[int](ctx, 1, 2),
testObservable[int](ctx, 10, 11),
}, func(it rx.Observable[int], _ int) rx.Observable[int] {
return it
}), rx.NewCalc[int]())

rx.Assert(context.Background(), obs, rx.IsNotEmpty[int]{})
})
Expand All @@ -91,7 +92,7 @@ var _ = Describe("Factory", func() {
rx.Empty[int](),
}, func(it rx.Observable[int], _ int) rx.Observable[int] {
return it
}))
}), rx.NewCalc[int]())

rx.Assert(context.Background(), obs, rx.IsEmpty[int]{})
})
Expand All @@ -112,7 +113,7 @@ var _ = Describe("Factory", func() {
testObservable[int](ctx, errFoo),
}, func(it rx.Observable[int], _ int) rx.Observable[int] {
return it
}))
}), rx.NewCalc[int]())

rx.Assert(context.Background(), obs,
rx.IsEmpty[int]{},
Expand Down
34 changes: 31 additions & 3 deletions rx/limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@ import (
"golang.org/x/exp/constraints"
)

func LimitComparator[T constraints.Ordered](a, b T) int {
if a == b {
func NativeItemLimitComparator[T constraints.Ordered](a, b Item[T]) int {
if a.V == b.V {
return 0
}

if a < b {
if a.V < b.V {
return -1
}

return 1
}

func NumericItemLimitComparator[T constraints.Ordered](a, b Item[T]) int {
if a.N == b.N {
return 0
}

if a.N < b.N {
return -1
}

Expand All @@ -26,4 +38,20 @@ func MinInitLimitInt() int {
return math.MaxInt
}

func MaxItemInitLimitInt() Item[int] {
return Of(math.MinInt)
}

func MinItemInitLimitInt() Item[int] {
return Of(math.MaxInt)
}

func MaxNItemInitLimitInt() Item[int] {
return Num[int](math.MinInt)
}

func MinNItemInitLimitInt() Item[int] {
return Num[int](math.MaxInt)
}

// TODO: add limiters for other Ordered constraint types
4 changes: 2 additions & 2 deletions rx/observable-operator-distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = Describe("Observable operator", func() {
obs := testObservable[int](ctx, 1, 2, 2, 1, 3).DistinctUntilChanged(
func(_ context.Context, v int) (int, error) {
return v, nil
}, rx.LimitComparator, rx.WithCPUPool[int]())
}, rx.NativeItemLimitComparator, rx.WithCPUPool[int]())

rx.Assert(ctx, obs,
rx.HasItems[int]{
Expand All @@ -197,7 +197,7 @@ var _ = Describe("Observable operator", func() {
obs := testObservable[int](ctx, 1, 2, 2, 1, 3).DistinctUntilChanged(
func(_ context.Context, item int) (int, error) {
return item, nil
}, rx.LimitComparator, rx.WithCPUPool[int]())
}, rx.NativeItemLimitComparator, rx.WithCPUPool[int]())

rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2, 1, 3},
Expand Down
157 changes: 157 additions & 0 deletions rx/observable-operator-map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Map", func() {
When("one", func() {
It("🧪 should: translate all values", func() {
// rxgo: Test_Observable_Map_One
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
})
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{2, 3, 4},
},
rx.HasNoError[int]{},
)
})
})

When("multiple", func() {
It("🧪 should: transform all values through all stages", func() {
// rxgo: Test_Observable_Map_Multiple
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3).Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
}).Map(func(_ context.Context, v int) (int, error) {
return v * 10, nil
})
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{20, 30, 40},
}, rx.HasNoError[int]{})
})
})

When("cancel", func() {
It("🧪 should: result in empty result due to cancellation", func() {
// rxgo: Test_Observable_Map_Cancel
defer leaktest.Check(GinkgoT())()

next := make(chan rx.Item[int])

ctx, cancel := context.WithCancel(context.Background())
obs := rx.FromChannel(next).Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
}, rx.WithContext[int](ctx))
cancel()
rx.Assert(ctx, obs, rx.IsEmpty[int]{}, rx.HasNoError[int]{})
})
})

Context("Errors", func() {
When("error", func() {
It("🧪 should: transform valid values and contain error", func() {
// rxgo: Test_Observable_Map_Error
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1, 2, 3, errFoo).Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
})
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{2, 3, 4},
}, rx.HasError[int]{
Expected: []error{errFoo},
})
})
})

When("value and error", func() {
It("🧪 should: not transform value that returns error", func() {
// rxgo: Test_Observable_Map_ReturnValueAndError
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := testObservable[int](ctx, 1).Map(func(_ context.Context, _ int) (int, error) {
return 2, errFoo
})
rx.Assert(ctx, obs, rx.IsEmpty[int]{}, rx.HasError[int]{
Expected: []error{errFoo},
})
})
})

When("multiple errors", func() {
It("🧪 should: contain error", func() {
// rxgo: Test_Observable_Map_Multiple_Error
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

called := false
obs := testObservable[int](ctx, 1, 2, 3).Map(func(_ context.Context, _ int) (int, error) {
return 0, errFoo
}).Map(func(_ context.Context, _ int) (int, error) {
called = true

return 0, nil
})
rx.Assert(ctx, obs, rx.IsEmpty[int]{}, rx.HasError[int]{
Expected: []error{errFoo},
})
Expect(called).To(BeFalse())
})
})
})

Context("Parallel", func() {
It("🧪 should: translate all values", func() {
// rxgo: Test_Observable_Map_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const length = 10

ch := make(chan rx.Item[int], length)
go func() {
for i := 0; i < length; i++ {
ch <- rx.Of(i)
}
close(ch)
}()

obs := rx.FromChannel(ch).Map(func(_ context.Context, v int) (int, error) {
return v + 1, nil
}, rx.WithPool[int](length))

rx.Assert(ctx, obs, rx.HasItemsNoOrder[int]{
Expected: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}, rx.HasNoError[int]{})
})
})
})
})
51 changes: 51 additions & 0 deletions rx/observable-operator-max_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Max", func() {
When("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Max
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Range[int](0, 100).Max(
rx.NumericItemLimitComparator,
rx.MaxNItemInitLimitInt,
)
rx.Assert(ctx, obs, rx.HasNumber[int]{
Expected: 99,
})
})
})

Context("Parallel", func() {
When("foo", func() {
XIt("🧪 should: ", func() {
// rxgo: Test_Observable_Max_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := rx.Range[int](0, 100).Max(
rx.NumericItemLimitComparator,
rx.MaxNItemInitLimitInt,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 99,
})
})
})
})
})
})
52 changes: 52 additions & 0 deletions rx/observable-operator-min_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rx_test

import (
"context"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Min", func() {
When("principle", func() {
It("🧪 should: ", func() {
// rxgo: Test_Observable_Min
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Range[int](0, 100).Min(
rx.NumericItemLimitComparator,
rx.MinNItemInitLimitInt,
)
rx.Assert(ctx, obs, rx.HasNumber[int]{
Expected: 0,
})
})
})

Context("Parallel", func() {
When("foo", func() {
XIt("🧪 should: ", func() {
// rxgo: Test_Observable_Min_Parallel
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Range[int](0, 100).Min(
rx.NumericItemLimitComparator,
rx.MinNItemInitLimitInt,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItem[int]{
Expected: 0,
})
})
})
})
})
})
Loading

0 comments on commit cd2dffd

Please sign in to comment.