/
ObservableTest.cs
117 lines (100 loc) · 3.04 KB
/
ObservableTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using NUnit.Framework;
namespace Paralect.ServiceBus.Test.Tests
{
public static class EXT
{
private static IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}
}
public class SObserver<T> : IObserver<T>
{
private Action<T> onNext;
private Action<Exception> onError;
private Action onCompleted;
/// <summary>
/// Initializes a new instance of the <see cref="T:System.Object"/> class.
/// </summary>
public SObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public static SObserver<T> Create(Action<T> onNext)
{
return new SObserver<T>(onNext, e => { }, () => { });
}
public static SObserver<T> Create(Action<T> onNext, Action onCompleted)
{
return new SObserver<T>(onNext, e => { }, onCompleted);
}
public void OnNext(T value)
{
onNext(value);
}
public void OnError(Exception error)
{
onError(error);
}
public void OnCompleted()
{
onCompleted();
}
}
public static class RXRXRXR
{
public static IObservable<T> LimitTo5Events<T>(this IObservable<T> obs)
{
var counter = 0;
return Observable.Create<T>(ob =>
{
return obs.Subscribe(SObserver<T>.Create(i =>
{
counter++;
ob.OnNext((T)(Object)counter);
// ob.OnNext(i);
},
() => ob.OnCompleted()));
});
}
}
[TestFixture]
public class ObservableTest
{
[Test]
public void Do()
{
Observable.Return(45);
var collection = Observable.Range(10, 50, Scheduler.CurrentThread).LimitTo5Events();
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
collection.Subscribe(i =>
{
Console.Write(Thread.CurrentThread.ManagedThreadId);
Console.Write(" ");
Console.WriteLine(i);
},
() => Console.Write("DONE!!!"));
}
}
}