Skip to content

reactivego/rx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rx

import "github.com/reactivego/rx"

Go Reference

Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.

Prerequisites

You’ll need Go 1.23 or later, as the implementation depends on language support for generics and iterators.

Observables

In rx, an Observables represents a stream of data that can emit items over time, while an Observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the observable emits data, errors, or a completion signal.

This page introduces the reactive pattern, explaining what Observables and Observers are and how subscriptions work. Other sections explore the powerful set of Operators that allow you to transform, combine, and control data streams efficiently.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

Example

package main

import "github.com/reactivego/x"

func main() {
    x.From[any](1,"hi",2.3).Println().Wait()
}

Note the program creates a mixed type any observable from an int, string and a float64.

Output

1
hi
2.3

Example

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,3).Println().Wait()
}

Note the program uses inferred type int for the observable.

Output

1
2
3

Observables in rx offer several advantages over standard Go channels:

Hot vs Cold Observables

  • Hot Observables emit values regardless of subscription status. Like a live broadcast, any values emitted when no subscribers are listening are permanently missed. Examples include system events, mouse movements, or real-time data feeds.

  • Cold Observables begin emission only when subscribed to, ensuring subscribers receive the complete data sequence from the beginning. Examples include file contents, database queries, or HTTP requests that are executed on-demand.

Rich Lifecycle Management

Observables offer comprehensive lifecycle handling. They can complete normally, terminate with errors, or continue indefinitely. Subscriptions provide fine-grained control, allowing subscribers to cancel at any point, preventing resource leaks and unwanted processing.

Time-Varying Data Model

Unlike traditional variables that represent static values, Observables elegantly model how values evolve over time. They represent the entire progression of a value's state changes, not just its current state, making them ideal for reactive programming paradigms.

Native Concurrency Support

Concurrency is built into the Observable paradigm. Each Observable conceptually operates as an independent process that asynchronously pushes values to subscribers. This approach naturally aligns with concurrent programming models while abstracting away much of the complexity typically associated with managing concurrent operations.

Operators

Operators form a language for expressing programs with Observables. They transform, filter, and combine one or more Observables into new Observables, allowing for powerful data stream processing. Each operator performs a specific function in the reactive pipeline, enabling you to compose complex asynchronous workflows through method chaining.

Index

All converts an Observable stream into a Go 1.22+ iterator sequence that provides each emitted value paired with its sequential zero-based index

All2 converts an Observable of Tuple2 pairs into a Go 1.22+ iterator sequence that yields each tuple's components (First, Second) as separate values.

Append creates a pipe that appends emitted values to a provided slice while forwarding them to the next observer, with a method variant available for chaining.

AsObservable provides type conversion between observables, allowing you to safely cast an Observable of one type to another, and to convert a typed Observable to an Observable of 'any' type (and vice versa).

AsObserver converts an Observer of type any to an Observer of a specific type T.

Assign stores each emitted value from an Observable into a provided pointer variable while passing all emissions through to the next observer, enabling value capture during stream processing.

AutoConnect makes a (Connectable) Multicaster behave like an ordinary Observable that automatically connects the mullticaster to its source when the specified number of observers have subscribed to it.

AutoUnsubscribe

BufferCount

Catch recovers from an error notification by continuing the sequence without emitting the error but switching to the catch ObservableInt to provide items.

CatchError catches errors on the Observable to be handled by returning a new Observable or throwing error.

CombineAll

CombineLatest combines multiple Observables into one by emitting an array containing the latest values from each source whenever any input Observable emits a value, with variants (CombineLatest2, CombineLatest3, CombineLatest4, CombineLatest5) that return strongly-typed tuples for 2-5 input Observables respectively.

Concat combines multiple Observables sequentially by emitting all values from the first Observable before proceeding to the next one, ensuring emissions never overlap.

ConcatAll transforms a higher-order Observable (an Observable that emits other Observables) into a first-order Observable by subscribing to each inner Observable only after the previous one completes.

ConcatMap projects each source value to an Observable, subscribes to it, and emits its values, waiting for each one to complete before processing the next source value.

ConcatWith extends an Observable by appending additional Observables, ensuring that emissions from each Observable only begin after the previous one completes.

Connectable is an Observable with delayed connection to its source, combining both Observable and Connector interfaces. It separates the subscription process into two parts: observers can register via Subscribe, but the Observable won't subscribe to its source until Connect is explicitly called. This enables multiple observers to subscribe before any emissions begin (multicast behavior), allowing a single source Observable to be efficiently shared among multiple consumers. Besides inheriting all methods from Observable and Connector, Connectable provides the convenience methods AutoConnect and RefCount to manage connection behavior.

Connect establishes a connection to the source Observable and returns a Subscription that can be used to cancel the connection when no longer needed.

Connector provides a mechanism for controlling when a Connectable Observable subscribes to its source, allowing you to connect the Observable independently from when observers subscribe to it. This separation enables multiple subscribers to prepare their subscriptions before the source begins emitting items. It has a single method Connect.

Constraints type constraints Signed, Unsigned, Integer and Float copied verbatim from golang.org/x/exp so we could drop the dependency on that package.

Count returns an Observable that emits a single value representing the total number of items emitted by the source Observable before it completes.

Create constructs a new Observable from a Creator function, providing a bridge between imperative code and the reactive Observable pattern. The Observable will continue producing values until the Creator signals completion, the Observer unsubscribes, or the Creator returns an error.

Creator is a function type that generates values for an Observable stream. It receives a zero-based index for the current iteration and returns a tuple containing the next value to emit, any error that occurred, and a boolean flag indicating whether the sequence is complete.

Defer

Delay

DistinctUntilChanged only emits when the current value is different from the last.

Do calls a function for each next value passing through the observable.

ElementAt emit only item n emitted by an Observable.

Empty creates an Observable that emits no items but terminates normally.

EndWith

Equal

Err

ExhaustAll

ExhaustMap

Filter emits only those items from an observable that pass a predicate test.

First emits only the first item from an Observable.

Fprint

Fprintf

Fprintln

From creates an observable from multiple values passed in.

Go subscribes to the observable and starts execution on a separate goroutine, ignoring all emissions from the observable sequence. This makes it useful when you only care about side effects and not the actual values. Returns a Subscription that can be used to cancel the subscription when no longer needed.

Ignore[T] creates an Observer[T] that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.

Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time terval.

Last emits only the last item emitted by an Observable.

Map transforms the items emitted by an Observable by applying a function to each item.

MapE

Marshal

MaxBufferSizeOption, WithMaxBufferSize

Merge combines multiple Observables into one by merging their emissions.

MergeAll flattens a higher order observable by merging the observables it emits.

MergeMap transforms the items emitted by an Observable by applying a function to each item an turning an Observable.

MergeWith combines multiple Observables into one by merging their emissions.

Multicast

Must

Never creates an Observable that emits no items and does't terminate.

Observable

Observer

Of emits a variable amount of values in a sequence and then emits a complete notification.

OnComplete

OnDone

OnError

OnNext

Passthrough just passes through all output from the Observable.

Pipe

Print

Printf

Println subscribes to the Observable and prints every item to os.Stdout.

Publish returns a multicasting Observable[T] for an underlying Observable[T] as a Connectable[T] type.

Pull

Pull2

Race

RaceWith

Recv

Reduce applies a reducer function to each item emitted by an Observable and the previous reducer sult.

ReduceE

RefCount makes a Connectable behave like an ordinary Observable.

Repeat creates an observable that emits a sequence of items repeatedly.

Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it ll complete without error.

RetryTime

SampleTime emits the most recent item emitted by an Observable within periodic time intervals.

Scan applies a accumulator function to each item emitted by an Observable and the previous cumulator result.

ScanE

Scheduler

Send

Share

Skip suppresses the first n items emitted by an Observable.

Slice

StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

Subject is a combination of an observer and observable.

Subscribe operates upon the emissions and notifications from an Observable.

SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.

Subscriber

Subscription

SwitchAll

SwitchMap

Take emits only the first n items emitted by an Observable.

TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.

Tap

Throw creates an observable that emits no items and terminates with an error.

Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.

Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.

Tuple

Values

Wait subscribes to the Observable and waits for completion or error.

WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.

WithLatestFromAll flattens a higher order observable.

Zip

ZipAll