Skip to content

Commit

Permalink
feat(rx): range over item values (#232)
Browse files Browse the repository at this point in the history
feat(rx): add WithCalc (#232)

feat(rx): add range iterator (#232)

feat(rx): rename HasItems to ContainItems (#232)

feat(rx): continue range iterator (#232)

feat(rx): fix copy by ptr range iterator (#232)
  • Loading branch information
plastikfan committed Apr 24, 2024
1 parent 12b3626 commit b53eaaa
Show file tree
Hide file tree
Showing 42 changed files with 941 additions and 345 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"coverprofile",
"cubiest",
"deadcode",
"deepcopy",
"depguard",
"dogsled",
"dotenv",
Expand Down Expand Up @@ -49,6 +50,7 @@
"linters",
"lorax",
"mattn",
"mohae",
"nakedret",
"nolint",
"nolintlint",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
require (
github.com/avfs/avfs v0.33.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
go.uber.org/multierr v1.11.0 // indirect
)

Expand Down
12 changes: 11 additions & 1 deletion rx/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,24 @@ func (f AssertFunc[T]) Check(actual AssertResources[T]) {
f(actual)
}

// ContainItems
type ContainItems[T any] struct {
Expected []T
}

// HasItems checks if an observable contains expected items
func (a ContainItems[T]) Check(actual AssertResources[T]) {
Expect(actual.Values()).To(ContainElements(a.Expected), reason("HasItems"))
}

// HasItems
type HasItems[T any] struct {
Expected []T
}

// HasItems checks if an observable has an exact set of items.
func (a HasItems[T]) Check(actual AssertResources[T]) {
Expect(actual.Values()).To(ContainElements(a.Expected), reason("HasItems"))
Expect(actual.Values()).To(HaveExactElements(a.Expected), reason("HasItems"))
}

// HasItem
Expand Down
9 changes: 0 additions & 9 deletions rx/calculator.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
package rx

import (
"golang.org/x/exp/constraints"
)

// Numeric
type Numeric interface {
constraints.Integer | constraints.Signed | constraints.Unsigned | constraints.Float
}

func Calc[T Numeric]() Calculator[T] {
return new(NumericCalc[T])
}
Expand Down
26 changes: 26 additions & 0 deletions rx/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,29 @@ func (e TryError[T]) Error() string {
enums.ItemDescriptions[e.expected], enums.ItemDescriptions[e.item.disc], e.item,
)
}

// MissingCalcError is triggered when if the client forgets to provide a
// calculator option
type MissingCalcError struct {
error string
}

func (e MissingCalcError) Error() string {
return "missing calculator"
}

func init() {
RangeMissingWhilstError = BadRangeIteratorError{
error: "range iterator missing whilst definition",
}
}

type BadRangeIteratorError struct {
error string
}

func (e BadRangeIteratorError) Error() string {
return fmt.Sprintf("bad range iterator (%v)", e.error)
}

var RangeMissingWhilstError BadRangeIteratorError
22 changes: 17 additions & 5 deletions rx/factory-connectable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
"github.com/onsi/ginkgo/v2/dsl/decorators"
. "github.com/onsi/gomega" //nolint:revive // gomega ok
"golang.org/x/sync/errgroup"
)

Expand All @@ -53,6 +54,7 @@ var _ = Describe("FactoryConnectable", func() {
When("single with multiple observers", func() {
It("馃И should: distribute all items to all connected observers", func() {
// Test_Connectable_IterableChannel_Single
defer GinkgoRecover()
defer leaktest.Check(GinkgoT())()

ch := make(chan Item[int], 10)
Expand Down Expand Up @@ -341,19 +343,29 @@ var _ = Describe("FactoryConnectable", func() {
})

When("range single", func() {
It("馃И should: distribute all items to all connected observers", func() {
XIt("馃И should: distribute all items to all connected observers", decorators.Label("Flakey"), func() {
// Test_Connectable_IterableRange_Single
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

obs := &ObservableImpl[int]{
iterable: newRangeIterable(1, 3,
iterable: newRangeIterable(&NumericRangeIterator[int]{
StartAt: 1,
Whilst: LessThan(4),
},
WithPublishStrategy[int](),
WithContext[int](ctx),
),
}

// obs := &ObservableImpl[int]{
// iterable: newRangeIterableL(1, 3,
// WithPublishStrategy[int](),
// WithContext[int](ctx),
// ),
// }
testConnectableSingle(obs, []any{1, 2, 3})
})
})
Expand Down Expand Up @@ -427,7 +439,7 @@ func testConnectableSingle[T any](obs Observable[T], expected []any) {

func testConnectableComposed[T any](obs Observable[T], increment Func[T], expected []any) {
obs = obs.Map(func(ctx context.Context, v T) (T, error) {
return increment(ctx, v) // expect client to implement with v+1
return increment(ctx, v)
}, WithPublishStrategy[T]())

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -465,7 +477,7 @@ func testConnectableComposed[T any](obs Observable[T], increment Func[T], expect
wg.Wait()
obs.Connect(ctx)

Expect(eg.Wait()).Error().To(BeNil())
Expect(eg.Wait()).To(Succeed())
}

func testConnectableWithoutConnect[T any](obs Observable[T]) {
Expand Down
36 changes: 24 additions & 12 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package rx

import (
"context"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -95,11 +94,12 @@ func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T] {
// CombineLatest combines the latest item emitted by each Observable via a specified function
// and emit items based on the results of this function.
func CombineLatest[T any](f FuncN[T], observables []Observable[T],
calc Calculator[T], opts ...Option[T],
opts ...Option[T],
) Observable[T] {
option := parseOptions(opts...)
ctx := option.buildContext(emptyContext)
next := option.buildChannel()
calc := option.calc()

go func() {
var counter uint32
Expand All @@ -116,6 +116,12 @@ func CombineLatest[T any](f FuncN[T], observables []Observable[T],

observe := it.Observe(opts...)

if calc == nil {
Error[T](MissingCalcError{}).SendContext(ctx, next)

return
}

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -381,21 +387,27 @@ func Never[T any]() Observable[T] {

// Range creates an Observable that emits count sequential integers beginning
// at start.
func Range[T any](start, count NumVal, opts ...Option[T]) Observable[T] {
if count < 0 {
return Thrown[T](IllegalInputError{
error: "count must be positive", // TODO(i18n)
})
func Range[T Numeric](iterator RangeIterator[T], opts ...Option[T]) Observable[T] {
if err := iterator.Init(); err != nil {
return Thrown[T](err)
}

if start+count-1 > math.MaxInt32 {
return Thrown[T](IllegalInputError{
error: "max value is bigger than math.MaxInt32",
})
return &ObservableImpl[T]{
iterable: newRangeIterable(iterator, opts...),
}
}

// RangeNF creates an Observable that emits count sequential integers beginning
// at start, for non numeric types, which do contain a nominated Numeric member
func RangeNF[T NominatedField[T, O], O Numeric](iterator RangeIteratorNF[T, O],
opts ...Option[T],
) Observable[T] {
if err := iterator.Init(); err != nil {
return Thrown[T](err)
}

return &ObservableImpl[T]{
iterable: newRangeIterable(start, count, opts...),
iterable: newRangeIterableNF(iterator, opts...),
}
}

Expand Down

0 comments on commit b53eaaa

Please sign in to comment.