Skip to content

Commit

Permalink
feat(rx): add optional single (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Mar 26, 2024
1 parent f8a65ed commit 0c3fa31
Show file tree
Hide file tree
Showing 15 changed files with 1,106 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
- uses: actions/setup-go@v5
with:
go-version: 1.22
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v4
with:
Expand All @@ -35,7 +35,7 @@ jobs:
run: go install github.com/mattn/goveralls@latest

- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- run: go test -v -coverprofile=coverage.out ./...

Expand Down
60 changes: 53 additions & 7 deletions rxa/assert.go → rx/assert.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package rxa

// rxa (rx assertion) package can use rx
package rx

import (
"context"
"fmt"

. "github.com/onsi/ginkgo/v2" //nolint:revive,stylecheck // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive,stylecheck // gomega ok
"github.com/snivilised/lorax/rx"
)

// AssertPredicate is a custom predicate based on the items.
type AssertPredicate[I any] func(items []I) error

// RxAssert lists the Observable assertions.
type RxAssert[I any] interface {
type RxAssert[I any] interface { //nolint:revive // foo
apply(*rxAssert[I])
itemsToBeChecked() (bool, []I)
itemsNoOrderedToBeChecked() (bool, []I)
Expand Down Expand Up @@ -87,7 +84,7 @@ func (ass *rxAssert[I]) notRaisedErrorToBeChecked() bool {
}

func (ass *rxAssert[I]) itemToBeChecked() (b bool, i I) {
return ass.checkHasNoItem, ass.item
return ass.checkHasItem, ass.item
}

func (ass *rxAssert[I]) noItemToBeChecked() (b bool, i I) {
Expand All @@ -114,7 +111,7 @@ func parseAssertions[I any](assertions ...RxAssert[I]) RxAssert[I] {
return ass
}

func Assert[I any](ctx context.Context, iterable rx.Iterable[I], assertions ...RxAssert[I]) {
func Assert[I any](ctx context.Context, iterable Iterable[I], assertions ...RxAssert[I]) {
ass := parseAssertions(assertions...)
got := make([]I, 0)
errs := make([]error, 0)
Expand Down Expand Up @@ -213,3 +210,52 @@ loop:
Expect(errs).To(BeEmpty())
}
}

func HasItems[I any](expectedItems []I) RxAssert[I] {
return newAssertion(func(ra *rxAssert[I]) {
ra.checkHasItems = true
ra.items = expectedItems
})
}

// HasItem checks if a single or optional single has a specific item.
func HasItem[I any](i I) RxAssert[I] {
return newAssertion(func(a *rxAssert[I]) {
a.checkHasItem = true
a.item = i
})
}

// IsNotEmpty checks that the observable produces some items.
func IsNotEmpty[I any]() RxAssert[I] {
return newAssertion(func(a *rxAssert[I]) {
a.checkHasSomeItems = true
})
}

// IsEmpty checks that the observable has not produce any item.
func IsEmpty[I any]() RxAssert[I] {
return newAssertion(func(a *rxAssert[I]) {
a.checkHasNoItems = true
})
}

func HasError[I any](err error) RxAssert[I] {
return newAssertion(func(a *rxAssert[I]) {
a.checkHasRaisedError = true
a.err = err
})
}

// HasAnError checks that the observable has produce an error.
func HasAnError[I any]() RxAssert[I] {
return newAssertion(func(a *rxAssert[I]) {
a.checkHasRaisedAnError = true
})
}

func HasNoError[I any]() RxAssert[I] {
return newAssertion(func(ra *rxAssert[I]) {
ra.checkHasNotRaisedError = true
})
}
49 changes: 44 additions & 5 deletions rx/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ func Amb[I any](observables []Observable[I], opts ...Option[I]) Observable[I] {
panic("Amb: NOT-IMPL")
}

// Empty creates an Observable with no item and terminate immediately.
func Empty[I any]() Observable[I] {
next := make(chan Item[I])
close(next)

return &ObservableImpl[I]{
iterable: newChannelIterable(next),
}
}

// FromChannel creates a cold observable from a channel.
func FromChannel[I any](next <-chan Item[I], opts ...Option[I]) Observable[I] {
option := parseOptions(opts...)
Expand All @@ -19,11 +29,40 @@ func FromChannel[I any](next <-chan Item[I], opts ...Option[I]) Observable[I] {
}
}

func parseOptions[I any](opts ...Option[I]) Option[I] {
o := new(funcOption[I])
for _, opt := range opts {
opt.apply(o)
// Just creates an Observable with the provided items.
func Just[I any](values ...I) func(opts ...Option[I]) Observable[I] {
return func(opts ...Option[I]) Observable[I] {
return &ObservableImpl[I]{
iterable: newJustIterable(values...)(opts...),
}
}
}

return o
// JustSingle is like JustItem in that it is defined for a single item iterable
// but behaves like Just in that it returns a func.
// This is probably not required, just defined for experimental purposes for now.
func JustSingle[I any](value I, opts ...Option[I]) func(opts ...Option[I]) Single[I] {
return func(_ ...Option[I]) Single[I] {
return &SingleImpl[I]{
iterable: newJustIterable(value)(opts...),
}
}
}

// JustItem creates a single from one item.
func JustItem[I any](value I, opts ...Option[I]) Single[I] {
// Why does this not return a func, but Just does?
//
return &SingleImpl[I]{
iterable: newJustIterable(value)(opts...),
}
}

// Never creates an Observable that emits no items and does not terminate.
func Never[I any]() Observable[I] {
next := make(chan Item[I])

return &ObservableImpl[I]{
iterable: newChannelIterable(next),
}
}
7 changes: 7 additions & 0 deletions rx/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func (i Item[I]) SendContext(ctx context.Context, ch chan<- Item[I]) bool {
}
}

func (i Item[I]) SendOpContext(ctx context.Context, ch any) bool { // Item[operator[I]]
_ = ctx
_ = ch

panic("SendOpContext: NOT-IMPL")
}

// SendNonBlocking sends an item without blocking.
// It returns a boolean to indicate whether the item was sent.
func (i Item[I]) SendNonBlocking(ch chan<- Item[I]) bool {
Expand Down
19 changes: 9 additions & 10 deletions rx/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok
"github.com/snivilised/lorax/rx"
"github.com/snivilised/lorax/rxa"
)

var _ = Describe("Item", Ordered, func() {
Expand All @@ -25,10 +24,10 @@ var _ = Describe("Item", Ordered, func() {
rx.Of(3),
)

rxa.Assert(context.Background(),
rx.Assert(context.Background(),
rx.FromChannel(ch),
rxa.HasItems([]int{1, 2, 3}),
rxa.HasNoError[int]())
rx.HasItems([]int{1, 2, 3}),
rx.HasNoError[int]())
})
})

Expand All @@ -44,10 +43,10 @@ var _ = Describe("Item", Ordered, func() {
rx.Of(3),
)

rxa.Assert(context.Background(),
rx.Assert(context.Background(),
rx.FromChannel(ch),
rxa.HasItems([]int{1, 3}),
rxa.HasAnError[int]())
rx.HasItems([]int{1, 3}),
rx.HasAnError[int]())
})
})

Expand All @@ -63,10 +62,10 @@ var _ = Describe("Item", Ordered, func() {
rx.Of(3),
)

rxa.Assert(context.Background(),
rx.Assert(context.Background(),
rx.FromChannel(ch),
rxa.HasItems([]int{1, 3}),
rxa.HasError[int](errFoo))
rx.HasItems([]int{1, 3}),
rx.HasError[int](errFoo))
})
})
})
Expand Down
5 changes: 4 additions & 1 deletion rx/iterable-channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ func newChannelIterable[I any](next <-chan Item[I], opts ...Option[I]) Iterable[
}

func (i *channelIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] {
mergedOptions := append(i.opts, opts...) //nolint:gocritic // foo
mergedOptions := make([]Option[I], 0, len(opts))
copy(mergedOptions, opts)
mergedOptions = append(mergedOptions, opts...)

option := parseOptions(mergedOptions...)

if !option.isConnectable() {
Expand Down
13 changes: 13 additions & 0 deletions rx/iterable-factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rx

type factoryIterable[I any] struct {
factory func(opts ...Option[I]) <-chan Item[I]
}

func newFactoryIterable[I any](factory func(opts ...Option[I]) <-chan Item[I]) Iterable[I] {
return &factoryIterable[I]{factory: factory}
}

func (i *factoryIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] {
return i.factory(opts...)
}
29 changes: 29 additions & 0 deletions rx/iterable-just.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package rx

type justIterable[I any] struct {
items []I
opts []Option[I]
}

func newJustIterable[I any](items ...I) func(opts ...Option[I]) Iterable[I] {
return func(opts ...Option[I]) Iterable[I] {
return &justIterable[I]{
items: items,
opts: opts,
}
}
}

func (i *justIterable[I]) Observe(opts ...Option[I]) <-chan Item[I] {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()
items := make([]Item[I], 0, len(i.items))

for _, item := range i.items {
items = append(items, Of(item))
}

go SendItems(option.buildContext(emptyContext), next, CloseChannel, items...)

return next
}
Loading

0 comments on commit 0c3fa31

Please sign in to comment.