以及对应的 RxJava2 的示例代码
- Single
- Subject
- Observable 创建方法
- Observable 转换操作
- Observable 过滤操作
- Observable 合并操作
- Observable 错误处理
- Observable 辅助操作
- Observable 条件操作
- Observable 背压操作
- Observable 可连接操作
- Observable 转换操作
- 参考
Single 是一种只能发出单个项目的特殊的 Observable,没有 onNext
方法,最终只有 onSuccess
或 onError
之中的一个被调用,结果将在 onSuccess
中直接回调出去。
Single.just("a string")
.subscribe(new SingleObserver<String>() {
@Override public void onSubscribe(Disposable d) {
L.print("onSubscribe");
}
@Override public void onSuccess(String s) {
L.print("result %s", s); // 回调最终结果
}
@Override public void onError(Throwable e) {
L.print(e);
}
});
Subject
本身既可以当作被观察者,也可当作观察者,在某些实现中被当作桥梁或代理。
只发出最后一个项目,在所有项目完成都之后,将结果发送给订阅者。 如果出现错误,则观察者只会接收到错误的通知。
final AsyncSubject<Integer> subject = AsyncSubject.create();
subject.onNext(0);
subject.onNext(1);
subject.onNext(2);
subject.onComplete();
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2
}
});
当观察者订阅时,它会发出源 Observable 发出的上一个最近的项目,若还没有项目发出,可以指定一个默认的项目。
BehaviorSubject<Integer> subject = BehaviorSubject.createDefault(-1);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // -1 0 1 2
}
});
subject.onNext(0);
subject.onNext(1);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 2
}
});
subject.onNext(2);
subject.onComplete();
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // no result
}
});
它会向观察者发出发出在观察者订阅之后发出的所有项目。
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(0);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 2
}
});
subject.onNext(1);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2
}
});
subject.onNext(2);
subject.onComplete();
每次在观察者订阅的时候都发送源 Observable 发送过的所有项目,在一些版本中,指定缓存限制,缓存超过限制时,旧的项目将被丢弃。
ReplaySubject<Integer> subject = ReplaySubject.createWithSize(2); // 数量限制
subject.onNext(0);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
subject.onNext(1);
subject.onNext(2);
subject.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 2
}
});
subject.onComplete();
创建可观察的 Observable
以自己的方式从头创建与一个 Observable,遵循 Observable 的回调方式,一般 onNext
调用一次或者多次,onCompleted
或 onError
只调用一次。
/* 提供 Disposable 或 Cancellable 的创建*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.setDisposable(new Disposable() {
@Override public void dispose() {
L.print("dispose");
}
@Override public boolean isDisposed() { return false; }
});
// e.setCancellable(new Cancellable() {
// @Override public void cancel() throws Exception { /* 取消操作 */ }
// });
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
使用 ubsafeCreate
方法创建,需要自己处理 dispose 操作。
Observable.unsafeCreate(new ObservableSource<Integer>() {
@Override public void subscribe(Observer<? super Integer> observer) {
Disposable d = new Disposable() {
private boolean isDispose;
@Override public void dispose() { isDispose = true; }
@Override public boolean isDisposed() { return isDispose; }
};
observer.onSubscribe(d);
if (!d.isDisposed()) { observer.onNext(0); }
if (!d.isDisposed()) { observer.onNext(1); }
if (!d.isDisposed()) { observer.onNext(2); }
if (!d.isDisposed()) { observer.onComplete(); }
}
});
创建延迟发射项目的 Observable,每次在观察者订阅的时候,产生一个新的 Observable,开始发射它的所有项目。
Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override public ObservableSource<Integer> call() throws Exception {
return Observable.just(0, 1, 2);
}
});
defer.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
defer.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
创建具有明确行为的 Observable
Observable.empty() // 只发射完成
.subscribe(new Observer<Object>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Object o) {}
@Override public void onError(Throwable e) {}
@Override public void onComplete() { L.print("onComplete"); }
});
Observable.never(); // 什么都不发射
Observable.error(new Exception("test")) // 只发射错误
.subscribe(new Observer<Object>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Object o) {}
@Override public void onError(Throwable e) { L.print(e); }
@Override public void onComplete() {}
});
从一个数据结构或对象创建 Observable
/* 来自一个数组类型 */
Observable.fromArray("1", "2", "3");
/* 来自一个集合类型 */
Observable.fromIterable(Arrays.asList("1", "2", "3"));
/* 来自一个函数返回值 */
Observable.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {
return "a string";
}
});
/* ... */
创建以特定时间间隔返回递增整数的 Observable
/* 每隔一秒发送一次 */
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override public void accept(Long aLong) throws Exception {
L.print(String.valueOf(aLong)); // 0 1 2 3 ...
}
});
创建一个发射一个或一组对象的 Observable
/* 从一个或多个对象创建 Observable */
Observable.just(0);
Observable.just(0, 1, 2, 3)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 3
}
});
创建一个发射一个整数范围的 Observable
Observable.range(1, 4)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 2 3 4
}
});
创建一个可重复发射项目的 Observable,有些实现可指定重复次数。
Observable.just(0, 1, 2)
.repeat(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 0 1 2
}
});
创建一个发射一个函数返回值的 Observable
Observable.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {
/* 一系列操作 */
return "result";
}
}).subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
L.print(s);
}
});
在指定的时间后发出一个整型0
/* 指定一秒后发出 */
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override public void accept(Long aLong) throws Exception {
L.print(String.valueOf(aLong)); // 0
}
});
TestHelper.block(2);
对发射项目类型的转换
将源 Observable 转换成一个周期性打包发射项目的 Observable,每次发射一组打包的项目
Observable.just(0, 1, 2, 3, 4, 5)
.buffer(2) // 1 个一组打包发射
.subscribe(new Consumer<List<Integer>>() {
@Override public void accept(List<Integer> integers) throws Exception {
L.printList(integers); // [0, 1] [2, 3] [4, 5]
}
});
Observable.just(0, 1, 2, 3, 4, 5)
.buffer(2, 3) // 每隔3个作为起点
.subscribe(new Consumer<List<Integer>>() {
@Override public void accept(List<Integer> integers) throws Exception {
L.printList(integers); // [0, 1] [3, 4]
}
});
将一个包含多维项目的 Observable 平铺成多个低维项目的 Observable,并依次发送他们的项目
/* 将一维的列表平铺成单个的整型 */
List<Integer> list = Arrays.asList(1, 3, 5, 7);
List<Integer> list1 = Arrays.asList(2, 4, 6, 8);
Observable.just(list, list1)
.flatMap(new Function<List<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(List<Integer> integers) throws Exception {
return Observable.fromIterable(integers);
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 3 5 7 2 4 6 8
}
});
将源 Observable 按给定条件分成多组 Observable
Observable.just(0, 1, 2, 3, 4,5)
.groupBy(new Function<Integer, Integer>() {
@Override public Integer apply(Integer integer) throws Exception {
return integer % 2 == 0 ? 0 : 1; // 按奇偶数分组
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(GroupedObservable<Integer, Integer> groupedObservable) throws Exception {
if (groupedObservable.getKey() == 0) {
groupedObservable.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 偶数 0 2 4
}
});
} else if (groupedObservable.getKey() == 1) {
groupedObservable.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 奇数 1 3 5
}
});
}
}
});
将发射的项目通过转换方法转换成另一种类型再发射
Observable
.just(BitmapFactory.decodeResource(
context.getResources(), R.mipmap.ic_launcher
))
.map(new Function<Bitmap, Drawable>() { // Bitmap 转换为 Drawable
@Override public Drawable apply(Bitmap bitmap) throws Exception {
return new BitmapDrawable(bitmap);
}
})
.subscribe(new Consumer<Drawable>() {
@Override public void accept(Drawable drawable) throws Exception {
imageView.setImageDrawable(drawable); // set image
}
});
将发射的每一个项目依次调用指定的方法,并将结果和下一个项目进行计算,有时被称为累加器
Observable.just(0, 1, 2, 3, 4)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
//0 (0+1)=1 (1+2)=3 (3+3)=6 (6+4)=10
L.print(String.valueOf(integer)); // 0 1 3 6 10
}
});
将发射项目按规则分为多个窗口,每个窗口都是一个 Observable,而且具有完整生命周期
/* print s(subscribe)... 0 1 c(complete)... s... 2 3 c... s... 4 5 c...*/
Observable.just(0, 1, 2, 3, 4, 5)
.window(2) // 每两个分成一个窗口
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(final Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {
L.print("subscribe on " + String.valueOf(integerObservable));
}
@Override public void onNext(Integer integer) {
L.print(String.valueOf(integer));
}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {
L.print("complete on " + String.valueOf(integerObservable));
}
});
}
});
}
选择性的发射项目
去抖动,当一段时间内,没有再次发射项目时,发射最近发射过的一次项目,如果时间段内再次发射了项目,之前的项目将被丢弃,并且计时将重新开始
Subject<Integer> subject = PublishSubject.create();
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2 3
}
});
subject.onNext(0);
Thread.sleep(300);
subject.onNext(1);
Thread.sleep(300);
subject.onNext(2);
Thread.sleep(600);
subject.onNext(3);
subject.onComplete();
去重复,通过给定key判断是否已发射过了,发射过的将不会再次发射
Observable.just(1, 2, 3, 1, 2, 4)
.distinct(new Function<Integer, Integer>() {
@Override public Integer apply(Integer integer) throws Exception {
return integer; // 默认key为自身
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 2 3 4
}
});
通过下标指定项目发射
Observable.just(0, 1, 2, 3, 4)
.elementAt(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2
}
});
过滤,只发射通过指定测试方法的项目
Observable.just(0, 1, 2, 3, 4, 5)
.filter(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
return integer <= 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
仅发射第一个项目,或者满足条件的第一个项目
Observable.just(0, 1, 2)
.first(0)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0
}
});
禁止发射所有项目,只发射结束或错误的通知
Observable.just(0, 1, 2, 3)
.ignoreElements()
.subscribe(new Action() {
@Override public void run() throws Exception {
L.print("end.");
}
});
仅发射最后一个项目,或者满足条件的最后一个项目
Observable.just(0,1,2,3,4)
.last(0)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 4
}
});
定期观察源 Observable 发射的项目,发射距观察点最近的上一次发射的项目
Observable.create(
new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
Thread.sleep(180);
e.onNext(1);
Thread.sleep(180);
e.onNext(2); // 360
Thread.sleep(180);
e.onNext(3); // 540
}
})
.sample(400, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2 3
}
});
跳过指定数量或时间内的项目,继续发射后面的项目
Observable.just(0, 1, 2, 3, 4)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2 3 4
}
});
从后面跳过指定数量或时间内的项目,正常发射前面的项目
Observable.just(0, 1, 2, 3, 4)
.skipLast(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
仅仅发射指定数量或时间内的项目
Observable.just(0, 1, 2, 3, 4)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1
}
});
仅仅发射末尾指定数量或时间内的项目
Observable.just(0, 1, 2, 3, 4)
.takeLast(2)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 3 4
}
});
和多个 Observable 共同创建发射序列
将两个或多个Observable正在发射的最新项目通过合并方法合并成新项目后再发射
/*
| | | | |
0 1 2
a b c
*/
Observable.combineLatest(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
Thread.sleep(100);
e.onNext(1);
Thread.sleep(100);
e.onNext(2);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread()),
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
Thread.sleep(300);
e.onNext("b");
Thread.sleep(100);
e.onNext("c");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread()),
new BiFunction<Integer, String, String>() {
@Override public String apply(Integer integer, String s) throws Exception {
return integer + s; // 0a 1a 2a 2b 2c
}
}
当源 Observable 发出一个项目时,如果在另一个 Observable 当前发射项目的时间窗口之内,将两个来自不同 Observable 的项目通过方法组合成新项目后再发射
/*
* * * *
/ / / /
a b c d
| | | |
0 1 2
\ \ \
* \ *
\
\
\
*
*/
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
Thread.sleep(200);
e.onNext(1);
Thread.sleep(400);
e.onNext(2);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.join(
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
Thread.sleep(200);
e.onNext("b");
Thread.sleep(200);
e.onNext("c");
Thread.sleep(200);
e.onNext("d");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread()),
/* 为源 Observable 发射的每个项目设置时效(转换成窗口类型) */
new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(final Integer integer) throws Exception {
int delay = 100;
if (integer == 1) {
delay = 250;
}
return Observable
.timer(delay, TimeUnit.MILLISECONDS)
.map(new Function<Long, Integer>() {
@Override public Integer apply(Long aLong) throws Exception {
return integer;
}
});
}
},
/* 为join的 Observable 发射的每个项目设置时效(转换成窗口类型) */
new Function<String, ObservableSource<String>>() {
@Override public ObservableSource<String> apply(final String s) throws Exception {
return Observable
.timer(100, TimeUnit.MILLISECONDS)
.map(new Function<Long, String>() {
@Override public String apply(Long aLong) throws Exception {
return s;
}
});
}
}, new BiFunction<Integer, String, String>() {
@Override public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}
)
.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
L.print(s); // 0a 1b 1c 2d
}
});
将两个或多个 Observable 合并成一个 Observable,让其行为和单个 Observable 一样
Observable
.merge(
Observable.just(1, 3, 5, 7),
Observable.just(2, 4, 6, 8)
)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 1 3 5 7 2 4 6 8
}
});
在源 Observable 发射项目之前发射指定的项目
Observable.just(0, 1, 2, 3)
.startWith(Arrays.asList(3, 4))
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 3 4 0 1 2 3
}
});
````
### Switch
订阅一个发射项目类型为 Observable 的 Observable,每当新的 Observable 被发射时,将会取消订阅之前的 Observable,开始订阅新的 Observable
```java
/*
| | | | | |
-> 0 1 2 3 ...
-> 2 4 6
*/
Observable.switchOnNext(Observable.create(
new ObservableOnSubscribe<ObservableSource<Integer>>() {
@Override
public void subscribe(ObservableEmitter<ObservableSource<Integer>> e) throws Exception {
e.onNext(Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Function<Long, Integer>() {
@Override public Integer apply(Long aLong) throws Exception {
return aLong.intValue();
}
}));
Thread.sleep(240);
e.onNext(Observable.just(2, 4, 6));
e.onComplete();
}
}
)).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 4 6
}
});
将两个或多个 Observable 发射的项目通过方法合并出新的类型后再发射
Observable.just(0, 1, 2, 3)
.zipWith(Observable.just("a", "b", "c", "d"),
new BiFunction<Integer, String, String>() {
@Override public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
L.print(s); // 0a 1b 2c 3d
}
});
}
对错误通知到来时的一些处理
拦截 onError
通知,将其转换为正常序列或其他项目
onErrorReturn
出现错误时,发射一个特定的项目,然后正常终止
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onError(new AssertionError("test"));
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override public Integer apply(Throwable throwable) throws Exception { return -1; }
})
.subscribe(new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer integer) {
L.print(String.valueOf(integer)); // 0 -1
}
@Override public void onError(Throwable e) { L.print(e); }
@Override public void onComplete() { L.print("onComplete"); /* call this */ }
});
onErrorResumeNext
当出现错误时,切换到备用 Observable 继续发射项目
/* onErrorResumeNext test */
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
throw new AssertionError("test");
}
})
.onErrorResumeNext(Observable.just(2, 3, 4))
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 3 4
}
});
onExceptionResumeNext
当出现错误时,如果是Exception类型,切换到备用 Observable 继续发射项目,否则发出错误通知
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
/* throw new AssertionError("test"); */
throw new Exception("test");
}
})
.onExceptionResumeNext(
Observable.just(2, 3, 4))
.subscribe(new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer integer) {
L.print(String.valueOf(integer)); // error is Exception ? 0 1 2 3 4 : 0 1
}
@Override public void onError(Throwable e) {
L.print(e); // call this if error not a Exception
}
@Override public void onComplete() { L.print("onComplete"); }
});
在源 Observable 发出错误后重新订阅 Observable 期望再次发射时没有错误
/* 0 1 0 1 error */
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
throw new AssertionError("test");
}
})
.retry(1) // 重试一次
.subscribe(new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer integer) { L.print(String.valueOf(integer)); }
@Override public void onError(Throwable e) { L.print(e); }
@Override public void onComplete() { L.print("onComplete"); }
});
对 Observable 的一些工具方法
将发射序列整体延迟一段时间后再发射
Observable.just(0, 1, 2, 3)
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 100毫秒后 0 1 2 3
}
});
注册 Observable 各个生命周期的监听,看起来就像订阅了 Observable 一样
Observable.just(0, 1, 2, 3)
/* 注册 onSubscribe 监听 */
.doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {}
})
/* 注册 onNext 监听 */
.doOnNext(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {}
})
/* 注册 onComplete 监听 */
.doOnComplete(new Action() {
@Override public void run() throws Exception {}
})
/* 注册 onError 监听 */
.doOnError(new Consumer<Throwable>() {
@Override public void accept(Throwable throwable) throws Exception {}
})
/* 注册 onNext 和 onComplete 监听 */
.doOnEach(new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) { }
@Override public void onNext(Integer integer) { }
@Override public void onError(Throwable e) { }
@Override public void onComplete() {}
});
前者把原始发射序列转换成代表每个发射项目的 Notification 对象序列,后者还原成原始序列
Observable<Notification<Integer>> materialize = Observable.just(0, 1, 2)
.materialize();
materialize
.subscribe(new Consumer<Notification<Integer>>() {
@Override public void accept(Notification<Integer> integerNotification) throws Exception {
if (integerNotification.isOnComplete()) {
L.print("onComplete");
} else if (integerNotification.isOnNext()) {
L.print(String.valueOf(integerNotification.getValue()));
} else if (integerNotification.isOnError()) {
L.print(integerNotification.getError());
}
}
});
materialize.dematerialize()
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
L.print(String.valueOf(o)); // 0 1 2
}
});
指定观察者运行的调度器
Observable.just(0, 1, 2)
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 on thread RxNewThreadScheduler-1
}
});
将可能运行在多个线程并出现发射顺序混乱的 Observable 强制序列化发射,符合发射规则
/*
t1 3 *
| | |
t2 0 1 2
-> 3 0 1 onComplete
*/
Observable
.unsafeCreate(new ObservableSource<Integer>() {
@Override public void subscribe(final Observer<? super Integer> observer) {
TestHelper.runOnNewThread(new Runnable() {
@Override public void run() {
TestHelper.sleep(20);
observer.onNext(0);
TestHelper.sleep(120);
observer.onNext(1);
TestHelper.sleep(100);
observer.onNext(2);
}
});
TestHelper.runOnNewThread(new Runnable() {
@Override public void run() {
observer.onNext(3);
TestHelper.sleep(150);
observer.onComplete();
}
});
}
})
.serialize()
.subscribe(new DisposableObserver<Integer>() {
@Override public void onNext(Integer integer) {
L.print(String.valueOf(integer));
}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {
L.print("onComplete");
}
});
订阅观察者
指定 Observable 运行的调度器
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
/* run on thread RxNewThreadScheduler-1 */
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer));
}
});
获取发射项目之间的发射的时间间隔的序列
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(100);
e.onNext(0);
Thread.sleep(120);
e.onNext(1);
Thread.sleep(50);
e.onNext(2);
Thread.sleep(3);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.timeInterval()
.subscribe(new Consumer<Timed<Integer>>() {
@Override public void accept(Timed<Integer> integerTimed) throws Exception {
L.print(String.valueOf(integerTimed.time())); // 100 120 50 (大约)
}
});
指定限制一段时间内没有项目发射则发射一个 TimeoutException 到 onError
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(200);
e.onNext(0);
e.onComplete();
}
})
.timeout(100, TimeUnit.MILLISECONDS)
.subscribe(new DisposableObserver<Integer>() {
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable e) {
L.print(e); // call this
}
@Override public void onComplete() {}
});
获取发射项目对应的时间戳序列
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(60);
e.onNext(0);
Thread.sleep(100);
e.onNext(1);
Thread.sleep(80);
e.onNext(2);
e.onComplete();
}
})
.timestamp()
.subscribe(new Consumer<Timed<Integer>>() {
@Override public void accept(Timed<Integer> integerTimed) throws Exception {
L.print(String.valueOf(integerTimed.time()));
}
});
创建和 Observable 生命周期一致的一次性资源的 Observable
/*
-> sub dispose
| | | |
-> close
*/
Observable
.using(new Callable<BufferedReader>() {
@Override public BufferedReader call() throws Exception {
return new BufferedReader(new StringReader("string line."));
}
}, new Function<BufferedReader, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(BufferedReader reader) throws Exception {
return Observable.just(reader.readLine());
}
}, new Consumer<BufferedReader>() {
@Override public void accept(BufferedReader reader) throws Exception {
reader.close();
}
})
.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
L.print(s); // string line.
}
});
评估 Observable 的相关操作
判断所有项目是否满足某个条件
Observable
.just(0, 1, 2, 3, 4)
.all(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
return integer < 4;
}
})
.subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {
L.print(String.valueOf(aBoolean)); // false
}
});
选出最先发射项目的那个 Observable,只发射它的所有项目
Observable
.amb(Arrays.asList(
Observable.just(1, 3, 5)
.delay(40, TimeUnit.MILLISECONDS),
Observable.just(2, 4, 6)
.delay(60, TimeUnit.MILLISECONDS))
)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2 4 6
}
});
````
### Contains
测试发射序列中是否存在特定的项目
```java
Observable
.just(0, 1, 2, 3)
.contains(2)
.subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {
L.print(String.valueOf(aBoolean)); // true
}
});
如果源 Observable 没有发射项目正常结束,那么发射默认的项目
Observable.<Integer>empty()
.defaultIfEmpty(-1)
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // -1
}
});
对比两个 Observable 的发射序列是否一致
Observable.sequenceEqual(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onComplete();
}
}),
Observable.just(0, 1, 2)
).subscribe(new Consumer<Boolean>() {
@Override public void accept(Boolean aBoolean) throws Exception {
L.print(String.valueOf(aBoolean));
}
});
源 Observable 不断丢弃发射的项目,并观察另一个 Observable,当它开始发射项目时,源 Observable 停止丢弃,开始发射剩余的项目
/*
| | | |
0 1 2 3
a *
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
Thread.sleep(100);
e.onNext(1);
Thread.sleep(100);
e.onNext(2);
Thread.sleep(100);
e.onNext(3);
e.onComplete();
}
}).skipUntil(Observable.just('a').delay(220, TimeUnit.MILLISECONDS))
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 3
}
});
不断丢弃满足条件的项目,直到不满足条件后,开始发射剩余项目
Observable.just(0, 1, 2, 3, 2, 5)
.skipWhile(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
return integer < 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 2 3 2 5
}
});
观察另一个 Observable,当它开始发射项目时,源 Observable 丢弃后面所有项目
/*
| | | |
0 1 2 3
a *
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
Thread.sleep(100);
e.onNext(1);
Thread.sleep(100);
e.onNext(2);
Thread.sleep(100);
e.onNext(3);
e.onComplete();
}
}).takeUntil(Observable.just('a').delay(220, TimeUnit.MILLISECONDS))
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2
}
});
满足条件不断发射项目,当条件不满足时,丢弃后面的所有项目
Observable
.just(0, 1, 2, 3, 2, 5)
.takeWhile(new Predicate<Integer>() {
@Override public boolean test(Integer integer) throws Exception {
return integer < 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1
}
});
处理生产者比消费者慢的情况
在订阅者调用 request 方法时,发射下一项数据,可设置缓存大小,缓存满后抛出异常
Flowable.range(0, 10)
.onBackpressureBuffer()
.subscribe(new Subscriber<Integer>() {
Subscription s;
@Override public void onSubscribe(Subscription s) {
this.s = s;
s.request(1);
}
@Override public void onNext(Integer integer) {
L.print(String.valueOf(integer));
TestHelper.sleep(500); // 0 ~ 10 每隔 500 毫秒输出
s.request(1);
}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});
可以更精确的动态控制订阅状态
将一个普通的 Obsevable 转换为一个可连接的 ConnectableObservable
激活 Connectable 的发射器,使其开始发射项目
ConnectableObservable<Integer> publish = Observable.just(0, 1, 2, 3, 4)
.publish();
publish.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 3 4
}
});
publish.connect();
将一个可连接的 Connectable 转换为一个普通的 Observable
ConnectableObservable<Integer> publish = Observable.range(0, 4).publish();
publish.refCount()
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print(String.valueOf(integer)); // 0 1 2 3
}
});
保证每个订阅者接收完整的发射序列
ConnectableObservable<Integer> replay = Observable.create(
new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 4; i++) {
Thread.sleep(100);
e.onNext(i);
}
}
})
.replay();
replay.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print("sub 1: %d", integer); // 0 1 2 3
}
});
replay.connect();
TestHelper.sleep(200);
replay.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
L.print("sub 2: %d", integer); // 0 1 2 3
}
});
将 Observable 转换成另一个对象或者数据结构
/* to feature */
Future<Integer> future = Single.create(new SingleOnSubscribe<Integer>() {
@Override public void subscribe(SingleEmitter<Integer> e) throws Exception {
Thread.sleep(1000);
e.onSuccess(1);
}
}).toFuture();
try {
L.print(String.valueOf(future.get()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
/* to list */
Observable.just(0, 1, 2)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override public void accept(List<Integer> integers) throws Exception {
L.printList(integers);
}
});
/* to blocking iterable */
for (int i : Observable.just(0, 1, 2, 3).blockingIterable()) {
L.print(String.valueOf(i));
}
/* to map */
Observable.just(0, 1, 2)
.toMap(new Function<Integer, String>() {
@Override public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
.subscribe(new Consumer<Map<String, Integer>>() {
@Override public void accept(Map<String, Integer> stringIntegerMap) throws Exception {
L.printMap(stringIntegerMap); // { 0: 0, 1: 1, 2: 2}
}
});