Skip to content

Commit

Permalink
fix(rx): parallel (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Apr 17, 2024
1 parent ffd50b0 commit 0459847
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 227 deletions.
7 changes: 0 additions & 7 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,6 @@ func (it Item[T]) SendContext(ctx context.Context, ch chan<- Item[T]) bool {
}
}

func (it Item[T]) SendOpContext(ctx context.Context, ch any) bool { // Item[operator[T]]
_ = ctx
_ = ch

panic("SendOpContext: NOT-IMPL")
}

// SendNonBlocking sends an item without blocking.
// It returns a boolean to indicate whether the item was sent.
func (it Item[T]) SendNonBlocking(ch chan<- Item[T]) bool {
Expand Down
9 changes: 7 additions & 2 deletions rx/limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ func NativeItemLimitComparator[T constraints.Ordered](a, b Item[T]) int {
}

func NumericItemLimitComparator[T constraints.Ordered](a, b Item[T]) int {
if a.Num() == b.Num() {
var (
valA = a.Num()
valB = b.Num()
)

if valA == valB {
return 0
}

if a.Num() < b.Num() {
if valA < valB {
return -1
}

Expand Down
40 changes: 33 additions & 7 deletions rx/observable-operator-all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
"github.com/onsi/ginkgo/v2/dsl/decorators"

"github.com/snivilised/lorax/rx"
)
Expand Down Expand Up @@ -115,7 +116,7 @@ var _ = Describe("Observable operator", func() {
})
})

XContext("Parallel", func() {
Context("Parallel", func() {
Context("all true", func() {
Context("given: foo", func() {
It("馃И should: ", func() {
Expand All @@ -127,7 +128,7 @@ var _ = Describe("Observable operator", func() {

rx.Assert(ctx, rx.Range[int](1, 3).All(positiveN,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](), // not supported yet
rx.WithCPUPool[int](),
),
rx.HasTrue[int]{},
rx.HasNoError[int]{},
Expand All @@ -147,7 +148,7 @@ var _ = Describe("Observable operator", func() {

rx.Assert(ctx, testObservable[int](ctx, 1, -2, 3).All(negative,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](), // not supported yet
rx.WithCPUPool[int](),
),
rx.IsFalse[int]{},
rx.HasNoError[int]{},
Expand All @@ -157,9 +158,9 @@ var _ = Describe("Observable operator", func() {
})
})

XContext("Error", func() {
Context("given: foo", func() {
It("馃И should: ", func() {
Context("Error", func() {
Context("given: error occurs after predicate failure", func() {
It("馃И should: not return error", func() {
// rxgo: Test_Observable_All_Parallel_Error
defer leaktest.Check(GinkgoT())()

Expand All @@ -168,9 +169,34 @@ var _ = Describe("Observable operator", func() {

rx.Assert(ctx, testObservable[int](ctx, 1, errFoo, 3).All(negative,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](), // not supported yet
rx.WithCPUPool[int](),
),
rx.IsFalse[int]{},
)
})
})

Context("given: error occurs before predicate failure", func() {
XIt("馃И should: return error", decorators.Label("Flakey"), func() {
// rxgo: Test_Observable_All_Parallel_Error
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Flakey: (possible race)

// NB: Once an error has occurred, you can't rely on the
// result of All to be accurate. The result will only
// reflect the state of processing at the time the error
// occurred. In this particular case, the result of All will
// be true and so will rx.IsTrue[int], that is because
// 1 and 3 were not seen.
//
rx.Assert(ctx, testObservable[int](ctx, errFoo, 1, 3).All(negative,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.HasError[int]{
Expected: []error{errFoo},
},
Expand Down
64 changes: 30 additions & 34 deletions rx/observable-operator-average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,51 +99,47 @@ var _ = Describe("Observable operator", func() {
Context("Parallel", func() {
Context("given: type float32", func() {
It("馃И should: ", func() {
// rxgo(tbd - parallel not impl yet): Test_Observable_AverageFloat32_Parallel
// rxgo: Test_Observable_AverageFloat32_Parallel
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx, testObservable[float32](ctx,
float32(1), float32(20),
).Average(rx.NewCalc[float32]()),
rx.HasItem[float32]{
Expected: float32(10.5),
},
)
rx.Assert(ctx, testObservable[float32](ctx,
float32(1), float32(20),
).Average(rx.NewCalc[float32]()),
rx.HasItem[float32]{
Expected: float32(10.5),
},
)
*/
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx, testObservable[float32](ctx,
float32(1), float32(20),
).Average(rx.Calc[float32]()),
rx.HasItem[float32]{
Expected: float32(10.5),
},
)

rx.Assert(ctx, testObservable[float32](ctx,
float32(1), float32(20),
).Average(rx.Calc[float32]()),
rx.HasItem[float32]{
Expected: float32(10.5),
},
)
})
})
})

Context("Parallel/Error", func() {
Context("given: foo", func() {
It("馃И should: ", func() {
// rxgo(tbd - parallel not impl yet): Test_Observable_AverageFloat32_Parallel_Error
// rxgo: Test_Observable_AverageFloat32_Parallel_Error
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx, testObservable[float32](ctx,
"foo",
).Average(rx.NewCalc[float32](),
rx.WithContext[float32](ctx), rx.WithCPUPool[float32](),
),
rx.HasAnError[float32]{},
)
*/
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx, testObservable[float32](ctx,
"foo",
).Average(rx.Calc[float32](),
rx.WithContext[float32](ctx), rx.WithCPUPool[float32](),
),
rx.HasAnError[float32]{},
)
})
})
})
Expand Down
49 changes: 22 additions & 27 deletions rx/observable-operator-contains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func two(i rx.Item[int]) bool {
return i.V == 2
}

var _ = Describe("Observable operator", func() {
var _ = Context("Observable operator", func() {
Context("Contains", func() {
When("sequence contains item", func() {
It("馃И should: result in true", func() {
Expand Down Expand Up @@ -73,19 +73,16 @@ var _ = Describe("Observable operator", func() {
// rxgo: Test_Observable_Contain_Parallel
defer leaktest.Check(GinkgoT())()

/*
TODO(impl): CPUPool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx,
testObservable[int](ctx, 1, 2, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsTrue[int]{},
)
*/
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx,
testObservable[int](ctx, 1, 2, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsTrue[int]{},
)
})
})

Expand All @@ -94,19 +91,17 @@ var _ = Describe("Observable operator", func() {
// rxgo: Test_Observable_Contain_Parallel
defer leaktest.Check(GinkgoT())()

/*
TODO(impl): CPUPool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rx.Assert(ctx,
testObservable[int](ctx, 1, 4, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsFalse[int]{},
)
*/
// TODO(impl): CPUPool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx,
testObservable[int](ctx, 1, 4, 3).Contains(two,
rx.WithContext[int](ctx),
rx.WithCPUPool[int](),
),
rx.IsFalse[int]{},
)
})
})
})
Expand Down
20 changes: 9 additions & 11 deletions rx/observable-operator-count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,16 @@ var _ = Describe("Observable operator", func() {
// rxgo: Test_Observable_Count_Parallel
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rx.Assert(ctx, rx.Range[int](1, 100).Count(
rx.WithCPUPool[int](),
),
rx.HasNumber[int]{
Expected: 100,
},
)
*/
rx.Assert(ctx, rx.Range[int](1, 100).Count(
rx.WithCPUPool[int](),
),
rx.HasNumber[int]{
Expected: 100,
},
)
})
})
})
Expand Down
47 changes: 17 additions & 30 deletions rx/observable-operator-default-if-empty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ var _ = Describe("Observable operator", func() {
// rxgo: Test_Observable_DefaultIfEmpty_Parallel_Empty
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := rx.Empty[int]().DefaultIfEmpty(3,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{3},
})
*/
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := rx.Empty[int]().DefaultIfEmpty(3,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{3},
})
})
})

Expand All @@ -87,26 +85,15 @@ var _ = Describe("Observable operator", func() {
// rxgo: Test_Observable_DefaultIfEmpty_Parallel_NotEmpty
defer leaktest.Check(GinkgoT())()

/*
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
obs := testObservable[int](ctx, 1, 2).DefaultIfEmpty(3,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2},
})
*/
})
})
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Context("Parallel/Error", func() {
Context("given: foo", func() {
It("馃И should: ", func() {
// rxgo: Test_
defer leaktest.Check(GinkgoT())()
obs := testObservable[int](ctx, 1, 2).DefaultIfEmpty(3,
rx.WithCPUPool[int](),
)
rx.Assert(ctx, obs, rx.HasItems[int]{
Expected: []int{1, 2},
})
})
})
})
Expand Down

0 comments on commit 0459847

Please sign in to comment.