Skip to content

Commit

Permalink
feat(rx): reset reduce acc with native 0 (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 24, 2024
1 parent a90ceff commit 90b7788
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
6 changes: 3 additions & 3 deletions rx/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ var _ = Describe("Factory", func() {
})

When("given: custom structure", func() {
It("馃И should: ", func() {
It("馃И should: create observable without error", func() {
// Test_Just_CustomStructure
defer leaktest.Check(GinkgoT())()

Expand All @@ -745,7 +745,7 @@ var _ = Describe("Factory", func() {
})

When("given: channel", func() {
XIt("馃И should: ???", func() {
XIt("馃И should: ???", decorators.Label("sending chan not supported yet"), func() {
// Test_Just_Channel
defer leaktest.Check(GinkgoT())()

Expand Down Expand Up @@ -1039,7 +1039,7 @@ var _ = Describe("Factory", func() {

Context("NominatedRangeIterator", func() {
When("positive count", func() {
It("馃И should: create observable", decorators.Label("need pointer receiver on T"), func() {
It("馃И should: create observable", func() {
// Test_Range
defer leaktest.Check(GinkgoT())()

Expand Down
5 changes: 2 additions & 3 deletions rx/observable-operator-average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"

"github.com/snivilised/lorax/rx"
)
Expand Down Expand Up @@ -146,8 +145,8 @@ var _ = Describe("Observable operator", func() {
})

Context("Parallel/Error", func() {
Context("given: foo", func() {
XIt("馃И should: ", decorators.Label("broken average.gatherNext"), func() {
Context("given: invalid input", func() {
It("馃И should: result in error", func() {
// rxgo: Test_Observable_AverageFloat32_Parallel_Error
defer leaktest.Check(GinkgoT())()

Expand Down
34 changes: 17 additions & 17 deletions rx/observable-operator-reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"
"github.com/snivilised/lorax/enums"
"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
XContext("Reduce", decorators.Label("broken by reduce acc"), func() {
Context("Reduce", func() {
When("using Range", func() {
It("馃И should: compute reduction ok", func() {
// rxgo: Test_Observable_Reduce
Expand All @@ -45,9 +44,9 @@ var _ = Describe("Observable operator", func() {
obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce( // 1, 10000
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
return acc.V + num.Num(), nil
}).Reduce(
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
return acc.V + item.V, nil
},
)
rx.Assert(ctx, obs,
Expand Down Expand Up @@ -104,7 +103,7 @@ var _ = Describe("Observable operator", func() {

Context("Parallel", func() {
When("using Range", func() {
XIt("馃И should: compute reduction ok", decorators.Label("repairing"), func() {
It("馃И should: compute reduction ok", func() {
// rxgo: Test_Observable_Reduce_Parallel
defer leaktest.Check(GinkgoT())()

Expand All @@ -113,10 +112,10 @@ var _ = Describe("Observable operator", func() {

obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(6),
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
return acc.Num() + num.Num(), nil
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
return acc.V + item.V, nil
}, rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs,
Expand All @@ -131,7 +130,7 @@ var _ = Describe("Observable operator", func() {

Context("Parallel/Error", func() {
When("using Range", func() {
XIt("馃И should: result in error", func() {
It("馃И should: result in error", func() {
// rxgo: Test_Observable_Reduce_Parallel_Error
defer leaktest.Check(GinkgoT())()

Expand All @@ -141,11 +140,11 @@ var _ = Describe("Observable operator", func() {
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
if num.Num() == 1000 {
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
if item.V == 1000 {
return 0, errFoo
}
return acc.Num() + num.Num(), nil
return acc.V + item.V, nil
}, rx.WithContext[int](ctx), rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs,
Expand All @@ -157,21 +156,22 @@ var _ = Describe("Observable operator", func() {
})

When("error with error strategy", func() {
XIt("馃И should: result in error", func() {
It("馃И should: result in error", func() {
// rxgo: Test_Observable_Reduce_Parallel_WithErrorStrategy
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Range(&rx.NumericRangeIterator[int]{
StartAt: 1,
Whilst: rx.LessThan(10001),
}).Reduce(
func(_ context.Context, acc, num rx.Item[int]) (int, error) {
if num.Num() == 1 {
func(_ context.Context, acc, item rx.Item[int]) (int, error) {
if item.V == 1 {
return 0, errFoo
}
return acc.Num() + num.Num(), nil
return acc.V + item.V, nil
}, rx.WithCPUPool[int](), rx.WithErrorStrategy[int](enums.ContinueOnError),
)
rx.Assert(ctx, obs,
Expand Down
3 changes: 1 addition & 2 deletions rx/observable-operator-sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"

"github.com/snivilised/lorax/rx"
)

var _ = Describe("Observable operator", func() {
Context("Sum", func() {
When("principle", func() {
XIt("馃И should: return sum", decorators.Label("broken by reduce acc"), func() {
It("馃И should: return sum", func() {
// rxgo: Test_Observable_SumFloat32_OnlyFloat32
defer leaktest.Check(GinkgoT())()

Expand Down
8 changes: 4 additions & 4 deletions rx/observable-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (op *allOperator[T]) gatherNext(ctx context.Context, item Item[T],
}
}

// Average calculates the average of numbers emitted by an Observable and emits the result.
func (o *ObservableImpl[T]) Average(opts ...Option[T],
) Single[T] {
const (
Expand Down Expand Up @@ -1496,7 +1497,7 @@ func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSi
return optionalSingle(o.parent, o, func() operator[T] {
return &reduceOperator[T]{
apply: apply,
acc: Num[T](0), // acc needs to be a Num: bomb!!!
acc: Zero[T](),
empty: true,
}
}, forceSeq, bypassGather, opts...)
Expand All @@ -1512,7 +1513,7 @@ func (op *reduceOperator[T]) next(ctx context.Context, item Item[T],
dst chan<- Item[T], operatorOptions operatorOptions[T],
) {
op.empty = false
v, err := op.apply(ctx, op.acc, item) // bomb!!!
v, err := op.apply(ctx, op.acc, item)

if err != nil {
Error[T](err).SendContext(ctx, dst)
Expand Down Expand Up @@ -2233,10 +2234,9 @@ func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) O

// Sum calculates the average emitted by an Observable and emits the result
func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T] {
options := parseOptions[T]()
options := parseOptions(opts...)
calc := options.calc()

// TODO: bomb!!!: do we use Num?
return o.Reduce(func(_ context.Context, acc, item Item[T]) (T, error) {
if calc == nil {
var (
Expand Down

0 comments on commit 90b7788

Please sign in to comment.