Skip to content
Reactive Extensions for Golang with pipe support
Go
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
internal
.gitignore Initial commit Jan 11, 2019
LICENSE
README.md
buffer_subject.go
buffer_subject_test.go
observable.go
pipe.go
replay_subject.go
replay_subject_test.go
skip.go
subject.go
subject_test.go
subjectable.go
subscription.go
subscription_test.go
take.go

README.md

Reactive GoDoc Go Report Card Codacy Badge

My attempt on creating a simple RxJs clone

Features

  • Observables
    • Multi-Type support
  • Subjects
    • Subject
    • ReplaySubject
  • Pipes
    • Take
    • TakeEvery
    • Skip
    • SkipEvery

Examples

Simple Subject

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
	subject := reactive.NewSubject()
	subject.Subscribe(subHandler)
	subject.Next(1)
	subject.Next(2)
	subject.Next(3)
	subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
1
2
3
4

Replay Subject

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
    subject := reactive.NewReplaySubject()
    subject.Next(1)
    subject.Next(2)
    subject.Next(3)
    subject.Subscribe(subHandler)
    subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
3
4

Multi-Type support

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
	subject := reactive.NewSubject()

	subject.Subscribe(intHandler)
	subject.Subscribe(stringHandler)

	subject.Next(2)
	subject.Next("Hello")
	subject.Next("World")
	subject.Next(4)
	subject.Next(nil)
}

func intHandler(a int) {
	fmt.Print("Int Handler: ")
	fmt.Println(a)
}

func stringHandler(a string) {
	fmt.Print("String Handler: ")
	fmt.Println(a)
}

Output

Int Handler: 2
String Handler: Hello
String Handler: World
Int Handler: 4
Int Handler: 0
String Handler:

Take Pipe

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
    subject := reactive.NewReplaySubject()
    subject.Pipe(reactive.Take(2)).Subscribe(subHandler)
    subject.Next(1)
    subject.Next(2)
    subject.Next(3)
    subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
1
2

TakeEvery Pipe

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
    subject := reactive.NewReplaySubject()
    subject.Pipe(reactive.TakeEvery(2)).Subscribe(subHandler)
    subject.Next(1)
    subject.Next(2)
    subject.Next(3)
    subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
2
4

Skip Pipe

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
    subject := reactive.NewReplaySubject()
    subject.Pipe(reactive.Skip(2)).Subscribe(subHandler)
    subject.Next(1)
    subject.Next(2)
    subject.Next(3)
    subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
3
4

SkipEvery Pipe

package main

import (
    "github.com/infinytum/reactive"
    "fmt"
)

func main() {
    subject := reactive.NewReplaySubject()
    subject.Pipe(reactive.SkipEvery(2)).Subscribe(subHandler)
    subject.Next(1)
    subject.Next(2)
    subject.Next(3)
    subject.Next(4)
}

func subHandler(a int) {
	fmt.Println(a)
}

Output

$ go run main.go
1
3
You can’t perform that action at this time.