Skip to content

Commit

Permalink
Bugfix for SwitchIfEmpty.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Jan 12, 2020
1 parent 64a810e commit e116547
Show file tree
Hide file tree
Showing 27 changed files with 1,363 additions and 1,252 deletions.
6 changes: 3 additions & 3 deletions docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- [PublishOn](unimplement.md)
- [DelayElement](share/delay_element.md)
- Mono Only
- [SwitchIfEmpty](todo.md)
- [SwitchIfEmpty](mono/switch_if_empty.md)
- Flux Only
- [Take](flux/take.md)
- [Reduce](unimplement.md)
- [Reduce](unimplement)
- [RateLimit](unimplement.md)
- [SwitchOnFirst](todo.md)
- [SwitchOnFirst](flux/switch_on_first.md)
- Subscriber
- [Subscribe](todo.md)
- [SubscribeWith](todo.md)
Expand Down
38 changes: 38 additions & 0 deletions docs/flux/switch_on_first.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
### SwitchOnFirst
Get first element in this `Flux` and transform to another `Flux`.

``` go
package main

import (
"context"
"fmt"
"strings"

"github.com/jjeffcaii/reactor-go/flux"
)

func main() {
flux.Just("golang", "I love golang.", "I love java.", "Awesome golang.", "I love ruby.").
SwitchOnFirst(func(firstSignal flux.Signal, originFlux flux.Flux) flux.Flux {
// extract first word and filtering word that contains it.
first, ok := firstSignal.Value()
if !ok {
return originFlux
}
return originFlux.
Filter(func(item interface{}) bool {
return strings.Contains(item.(string), first.(string))
})
}).
DoOnNext(func(v interface{}) {
fmt.Println("next:", v)
}).
Subscribe(context.Background())
// Should print:
// next: golang
// next: I love golang.
// next: Awesome golang.
}

```
30 changes: 30 additions & 0 deletions docs/flux/take.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
### Take
Take only the first N values from this `Flux`, if available.

``` go
package main

import (
"context"
"fmt"
"time"

"github.com/jjeffcaii/reactor-go/flux"
)

func main() {
// Example: taking 3 elements
flux.
Interval(500 * time.Millisecond).
Take(3).
DoOnNext(func(v interface{}) {
fmt.Println(time.Now().Format("2006-01-02 03:04:05"), "next:", v)
}).
BlockLast(context.Background())
// Should print:
// 2020-01-12 07:26:58 next: 0
// 2020-01-12 07:26:59 next: 1
// 2020-01-12 07:26:59 next: 2
}

```
24 changes: 24 additions & 0 deletions docs/mono/switch_if_empty.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
### SwitchIfEmpty
Fallback to an alternative `Mono` if this mono is completed without data.

``` go
package main

import (
"context"
"fmt"

"github.com/jjeffcaii/reactor-go/mono"
)

func main() {
// v1 should be 42
v1, _ := mono.Empty().SwitchIfEmpty(mono.Just(42)).Block(context.Background())
fmt.Println("v1:", v1)

// v2 should be 1024
v2, _ := mono.Just(1024).SwitchIfEmpty(mono.Just(2048)).Block(context.Background())
fmt.Println("v2:", v2)
}

```
61 changes: 0 additions & 61 deletions examples/xxx.go

This file was deleted.

3 changes: 2 additions & 1 deletion flux/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flux

import (
"context"
"time"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/scheduler"
Expand All @@ -25,7 +26,6 @@ type Flux interface {
Map(rs.Transformer) Flux
Take(n int) Flux
DoOnDiscard(rs.FnOnDiscard) Flux
// OnOnNext adds behavior triggered when the Flux emits an element.
DoOnNext(rs.FnOnNext) Flux
DoOnComplete(rs.FnOnComplete) Flux
DoOnError(rs.FnOnError) Flux
Expand All @@ -34,6 +34,7 @@ type Flux interface {
DoOnSubscribe(rs.FnOnSubscribe) Flux
DoFinally(rs.FnOnFinally) Flux
SwitchOnFirst(FnSwitchOnFirst) Flux
DelayElement(delay time.Duration) Flux
SubscribeOn(scheduler.Scheduler) Flux
BlockFirst(context.Context) (interface{}, error)
BlockLast(context.Context) (interface{}, error)
Expand Down
130 changes: 65 additions & 65 deletions flux/flux_create_sink.go
Original file line number Diff line number Diff line change
@@ -1,97 +1,97 @@
package flux

import (
"sync"
"sync/atomic"
"sync"
"sync/atomic"

rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
)

type bufferedSink struct {
s rs.Subscriber
q queue
n int32
draining int32
stat int32
cond *sync.Cond
s rs.Subscriber
q queue
n int32
draining int32
stat int32
cond *sync.Cond
}

func (p *bufferedSink) Request(n int) {
atomic.AddInt32(&p.n, int32(n))
p.drain()
atomic.AddInt32(&p.n, int32(n))
p.drain()
}

func (p *bufferedSink) Cancel() {
if !atomic.CompareAndSwapInt32(&p.stat, 0, statCancel) {
return
}
// TODO: support cancel
p.dispose()
if !atomic.CompareAndSwapInt32(&p.stat, 0, statCancel) {
return
}
// TODO: support cancel
p.dispose()
}

func (p *bufferedSink) Complete() {
p.cond.L.Lock()
for atomic.LoadInt32(&(p.draining)) == 1 || p.q.size() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
if atomic.CompareAndSwapInt32(&p.stat, 0, statComplete) {
p.s.OnComplete()
p.dispose()
}
p.cond.L.Lock()
for atomic.LoadInt32(&(p.draining)) == 1 || p.q.size() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
if atomic.CompareAndSwapInt32(&p.stat, 0, statComplete) {
p.s.OnComplete()
p.dispose()
}
}

func (p *bufferedSink) Error(err error) {
if atomic.CompareAndSwapInt32(&p.stat, 0, statError) {
hooks.Global().OnErrorDrop(err)
return
}
p.s.OnError(err)
p.dispose()
if atomic.CompareAndSwapInt32(&p.stat, 0, statError) {
hooks.Global().OnErrorDrop(err)
return
}
p.s.OnError(err)
p.dispose()
}

func (p *bufferedSink) Next(v interface{}) {
if atomic.LoadInt32(&p.stat) != 0 {
hooks.Global().OnNextDrop(v)
return
}
p.q.offer(v)
p.drain()
if atomic.LoadInt32(&p.stat) != 0 {
hooks.Global().OnNextDrop(v)
return
}
p.q.offer(v)
p.drain()
}

func (p *bufferedSink) drain() {
if !atomic.CompareAndSwapInt32(&p.draining, 0, 1) {
return
}
defer func() {
p.cond.L.Lock()
atomic.CompareAndSwapInt32(&p.draining, 1, 0)
p.cond.Broadcast()
p.cond.L.Unlock()
}()
for atomic.AddInt32(&p.n, -1) > -1 {
if atomic.LoadInt32(&p.stat) != 0 {
return
}
v, ok := p.q.poll()
if !ok {
atomic.AddInt32(&(p.n), 1)
break
}
p.s.OnNext(v)
}
atomic.CompareAndSwapInt32(&(p.n), -1, 0)
if !atomic.CompareAndSwapInt32(&p.draining, 0, 1) {
return
}
defer func() {
p.cond.L.Lock()
atomic.CompareAndSwapInt32(&p.draining, 1, 0)
p.cond.Broadcast()
p.cond.L.Unlock()
}()
for atomic.AddInt32(&p.n, -1) > -1 {
if atomic.LoadInt32(&p.stat) != 0 {
return
}
v, ok := p.q.poll()
if !ok {
atomic.AddInt32(&(p.n), 1)
break
}
p.s.OnNext(v)
}
atomic.CompareAndSwapInt32(&(p.n), -1, 0)
}

func (p *bufferedSink) dispose() {
_ = p.q.Close()
_ = p.q.Close()
}

func newBufferedSink(s rs.Subscriber, cap int) *bufferedSink {
return &bufferedSink{
s: s,
q: newQueue(cap),
cond: sync.NewCond(&sync.Mutex{}),
}
return &bufferedSink{
s: s,
q: newQueue(cap),
cond: sync.NewCond(&sync.Mutex{}),
}
}

0 comments on commit e116547

Please sign in to comment.