#RxJava8 - Reactive Extensions using Java 8
##Overview
Implementation of core features of reactive extension using Java 8. It is inspired by Microsoft's Rx library at https://rx.codeplex.com/, but it doesn't support all of their APIs.
##Building
git clone git@github.com:bhatti/RxJava8.git
- Compile and build jar file using
./gradlew jar
- For now, you will have to copy and add jar file manually in your application.
##Version
- 0.1 : experimental
##License
- MIT
##How To Guide
Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("done"));
List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("done"));
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
// note third argument for onComplete is optional
Observable.from(names).subscribe(name -> System.out.println(name),
error -> error.printStackTrace());
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names.iterator()).subscribe(name -> System.out.println(name),
error -> error.printStackTrace());
List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names.spliterator()).subscribe(System.out::println,
Throwable::printStackTrace);
Observable.just("value").subscribe(v -> System.out.println(v),
error -> error.printStackTrace());
// if a single object is collection, it would be treated as a single entity, e.g.
Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num),
error -> error.printStackTrace());
Observable.throwing(new Error("test error")).subscribe(System.out::println,
error -> System.err.println(error));
// this will print error
Observable.create(observer -> {
for (String name : names) {
observer.onNext(name);
}
observer.onCompleted();
}).subscribe(System.out::println, Throwable::printStackTrace);
// Creates range of numbers starting at from until it reaches to exclusively
Observable.range(4, 8).subscribe(num -> System.out.println(num),
error -> error.printStackTrace());
// will print 4, 5, 6, 7
// Creates infinite integers starting at given number and incremented by 1
Observable.integers(4).limit(4).subscribe(num -> System.out.println(num),
error -> error.printStackTrace());
// will print 4, 5, 6, 7
Observable.empty().subscribe(System.out::println,
Throwable::printStackTrace, () -> System.out.println("Completed"));
Observable.never().subscribe(System.out::println, Throwable::printStackTrace);
By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:
// We are creating thread pool of size 4 here
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newThreadPoolScheduler(4)).
subscribe(System.out::println, Throwable::printStackTrace);
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newNewThreadScheduler()).
subscribe(System.out::println, Throwable::printStackTrace);
Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newTimerSchedulerWithMilliInterval(1000)).
subscribe(System.out::println, Throwable::printStackTrace);
// this will print each name every second
This scheduler will call callback functions right away on the same thread. You can use this scheduler for a smaller amount of data that you want to consume synchronously. However, you cannot unsubscribe as it runs on the same thread.
Observable.from("Erica", "Matt", "John").
subscribeOn(Scheduler.newImmediateScheduler()).
subscribe(System.out::println, Throwable::printStackTrace);
// this will print each name every second
Count method stores number of elements that is then passed to the subscriber
Observable.from("Erica", "Matt", "John").count().
subscribe(System.out::println, Throwable::printStackTrace);
Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.
Observable.from("Erica", "Matt", "John").map(name -> name.hashCode()).
subscribe(System.out::println, Throwable::printStackTrace);
FlatMap merges list of lists into a single list when doing transformation, e.g.
Stream<List<Integer>> integerListStream = Stream.of( Arrays.asList(1, 2),
Arrays.asList(3, 4), Arrays.asList(5));
Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
subscribe(System.out::println, Throwable::printStackTrace);
Observables supports basic filtering support as provided by Java Streams, e.g.
Observable.from("Erica", "Matt", "John", "Mike", "Scott",
"Alex", "Jeff", "Brad").filter(name -> name.startsWith("M")).
subscribe(System.out::println, Throwable::printStackTrace);
// This will only print Matt and Mike
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).skip().subscribe(System.out::println,
Throwable::printStackTrace);
// This will skip Erica and John
Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike",
"Scott", "Alex", "Jeff", "Brad");
Observable.from(names).limit(2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will only print first two names
Stream<String> names = Stream.of("Erica", "Matt", "John", "Erica");
Observable.from(names).distinct.subscribe(System.out::println,
Throwable::printStackTrace);
// This will print Erica only once
Observable<Integer> observable1 = Observable.from(Stream.of(1, 2, 3));
Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
observable1.merge(observable2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will print 1, 2, 3, 4, 5, 6
Observable<String> observable1 = Observable.from("One", "Two", "Three");
Observable<Integer> observable2 = Observable.from(1, 2, 3);
observable1.zip(observable2).subscribe(System.out::println,
Throwable::printStackTrace);
// This will pass instance of Tuple object to System.out.println, which would print each tuple as [One, 1], [Two, 2], [Three, 3]
Observable<Integer> observable = Observable.range(1, 101)
.subscribeOn(Scheduler.newNewThreadScheduler())
.parallel().subscribe(System.out::println,
Throwable::printStackTrace);
// This will print 1, 2, 3, ... 100
List<Integer> list = Observable.from(1, 2).merge(Observable.from(3, 4)).toList();
// This will return list of 1, 2, 3, 4
Set<Integer> set = Observable.from(1, 2).merge(Observable.from(3, 4)).merge(Observable.just(3)).toSet();
// This will return set containg 1, 2, 3, 4 (unordered and without any duplicates)
Email bhatti AT plexobject DOT com for any questions or suggestions.