Skip to content
This repository
tag: 2.3.0.0
Fetching contributors…

Cannot retrieve contributors at this time

file 179 lines (146 sloc) 7.079 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using Microsoft.Reactive.Testing;
using ReactiveUI.Testing;
using Xunit;

namespace ReactiveUI.Tests
{
    public class ObservableAsyncMRUCacheTest : IEnableLogger
    {
[Fact]
        public void GetTest()
        {
            (new TestScheduler()).With(sched => {
                var input = new[] {1, 1, 1, 1, 1};
                var delay = TimeSpan.FromSeconds(1.0);
                var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2);

                int result = 0;
                var t = new Thread(() => {
                    // We use this side thread because there's no way to tell
                    // the cache to Run the Test Scheduler. So the side thread
                    // will do the waiting while the main thread advances the
                    // Scheduler
                    foreach (int x in input.Select(x => fixture.Get(x))) {
                        this.Log().DebugFormat("Adding {0} to result", x);
                        result += x;
                    }
                });
                t.Start();

                sched.Start();
                sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));

                // NB: The Thread.Sleep is to let our other thread catch up
                Thread.Sleep(100);
                Assert.Equal(0, result);

                sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1200)));

                Thread.Sleep(100);
                Assert.Equal(25, result);

                this.Log().Info("Running to end");
                sched.Start();
                t.Join();
                Assert.Equal(25, result);
            });
        }

[Fact]
        public void AsyncGetTest()
        {
            var input = new[] { 1, 1, 1, 1, 1 };
            var sched = new TestScheduler();

            var delay = TimeSpan.FromSeconds(1.0);
            var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2, null, sched);

            int result = 0;
            input.ToObservable(sched).SelectMany<int, int>(x => (IObservable<int>)fixture.AsyncGet(x)).Subscribe(x => result += x);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
            Assert.Equal(0, result);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1200)));
            Assert.Equal(25, result);

            this.Log().Info("Running to end");
            sched.Start();
            Assert.Equal(25, result);
        }

#if FALSE
[Fact]
        public void CachedValuesTest()
        {
            var input = new[] { 1, 2, 1, 3, 1 };
            var sched = new TestScheduler();

            var delay = TimeSpan.FromSeconds(1.0);
            var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 2, 2);

            var results = input.ToObservable().SelectMany(fixture.AsyncGet).CreateCollection();
            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));

            Assert.Equal(0, fixture.CachedValues().Count());

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));

            var output = fixture.CachedValues().ToArray();
            Assert.IsTrue(output.Length == 2);
            Assert.Equal(input.Length, results.Count);
        }
#endif

[Fact]
        public void CacheShouldQueueOnceWeHitOurConcurrentLimit()
        {
            var input = new[] { 1, 2, 3, 4, 1 };
            var sched = new TestScheduler();

            var delay = TimeSpan.FromSeconds(1.0);
            var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2, null, sched);

            int result = 0;
            input.ToObservable(sched).SelectMany<int, int>(x => (IObservable<int>)fixture.AsyncGet(x)).Subscribe(x => result += x);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
            Assert.Equal(0, result);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));
            Assert.Equal(1*5 + 2*5 + 1*5, result);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(2500)));
            Assert.Equal(1*5 + 2*5 + 3*5 + 4*5 + 1*5, result);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(5000)));
            Assert.Equal(1*5 + 2*5 + 3*5 + 4*5 + 1*5, result);
        }

        /* NB: The ideas in this test are fundamentally flawed - while none
* of the Subjects in ObservableAsyncMRUCache are getting incorrectly
* completed, the input Observable cannot continue after an error
* is thrown. We need to make a design change to ObservableAsyncMRUCache
* so that users can decide what to return on error */
#if FALSE
[Fact]
        public void CacheShouldEatExceptionsAndMarshalThemToObservable()
        {
            /* This is a bit tricky:
*
* 5,2 complete at t=1000 simultaneously
* 10,0 get queued up, 0 fails immediately (delay() doesn't delay the OnError),
* so 10 completes at t=2000
* The 7 completes at t=3000
*/
            var input = new[] { 5, 2, 10, 0/*boom!*/, 7 };
            var sched = new TestScheduler();

            Observable.Throw<int>(new Exception("Foo")).Subscribe(x => {
                Console.WriteLine(x);
            }, ex => {
                Console.WriteLine(ex);
            }, () => {
                Console.WriteLine("Completed");
            });

            var delay = TimeSpan.FromSeconds(1.0);
            var fixture = new ObservableAsyncMRUCache<int, int>(x =>
                (x == 0 ? Observable.Throw<int>(new Exception("Boom!")) : Observable.Return(10 * x)).Delay(delay, sched), 5, 2, null, sched);

            Exception exception = null;
            int completed = 0;
            input.ToObservable()
                .SelectMany(x => fixture.AsyncGet(x))
                .Subscribe(x => {
                    this.Log().InfoFormat("Result = {0}", x);
                    completed++;
                }, ex => exception = exception ?? ex);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
            Assert.Null(exception);
            Assert.Equal(0, completed);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));
            Assert.NotNull(exception);
            Assert.Equal(2, completed);

            sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(7500)));
            Assert.NotNull(exception);
            Assert.Equal(4, completed);
            this.Log().Info(exception);
        }
#endif
    }
}
Something went wrong with that request. Please try again.