-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
ScheduledSubject.cs
72 lines (61 loc) · 2.03 KB
/
ScheduledSubject.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace ReactiveUI
{
public class ScheduledSubject<T> : ISubject<T>
{
public ScheduledSubject(IScheduler scheduler, IObserver<T> defaultObserver = null, ISubject<T> defaultSubject = null)
{
_scheduler = scheduler;
_defaultObserver = defaultObserver;
_subject = defaultSubject ?? new Subject<T>();
if (defaultObserver != null)
{
_defaultObserverSub = _subject.ObserveOn(_scheduler).Subscribe(_defaultObserver);
}
}
readonly IObserver<T> _defaultObserver;
readonly IScheduler _scheduler;
readonly ISubject<T> _subject;
int _observerRefCount = 0;
IDisposable _defaultObserverSub;
public void Dispose()
{
if (_subject is IDisposable)
{
((IDisposable)_subject).Dispose();
}
}
public void OnCompleted()
{
_subject.OnCompleted();
}
public void OnError(Exception error)
{
_subject.OnError(error);
}
public void OnNext(T value)
{
_subject.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (_defaultObserverSub != null) {
_defaultObserverSub.Dispose();
_defaultObserverSub = null;
}
Interlocked.Increment(ref _observerRefCount);
return new CompositeDisposable(
_subject.ObserveOn(_scheduler).Subscribe(observer),
Disposable.Create(() => {
if (Interlocked.Decrement(ref _observerRefCount) <= 0 && _defaultObserver != null) {
_defaultObserverSub = _subject.ObserveOn(_scheduler).Subscribe(_defaultObserver);
}
}));
}
}
}