Skip to content

Commit

Permalink
Fix more missing OnError() dispatch and some test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Atsushi Eno committed Feb 26, 2012
1 parent 27198f1 commit ff66ed1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
8 changes: 8 additions & 0 deletions System.Reactive.Tests/System.Reactive.Linq/ObservableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ IEnumerable<IObservable<long>> IntervalSelectTakeDoEnumerable (HistoricalSchedul
scheduler.AdvanceBy (TimeSpan.FromMilliseconds (50));
}
}

[Test]
[ExpectedException (typeof (MyException))]
public void FirstOrDefault ()
{
var source = Observable.Throw<int> (new MyException ());
var ret = source.FirstOrDefault ();
}

[Test] // some practical test
public void IntervalSelectTakeDo ()
Expand Down
14 changes: 9 additions & 5 deletions System.Reactive/System.Reactive.Linq/Observable.Numeric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static IObservable<T> NonNullableMin<T> (this IObservable<T> source)
} else if (Comparer<T>.Default.Compare (min, s) > 0)
min = s;
},
ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, min)
);
// ----
Expand Down Expand Up @@ -77,6 +78,7 @@ static IObservable<T> NonNullableMax<T> (this IObservable<T> source)
} else if (Comparer<T>.Default.Compare (max, s) < 0)
max = s;
},
ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, max)
);
// ----
Expand All @@ -91,7 +93,7 @@ static IObservable<T> NullableMax<T> (this IObservable<T> source)
return new ColdObservableEach<T> (sub => {
// ----
T max = default (T);
return source.Subscribe ((s) => { if (Comparer<T>.Default.Compare (max, s) < 0) max = s; }, () => VerifyCompleted (true, sub, max));
return source.Subscribe ((s) => { if (Comparer<T>.Default.Compare (max, s) < 0) max = s; }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, max));
// ----
}, DefaultColdScheduler);
}
Expand All @@ -104,7 +106,7 @@ static IObservable<T> NonNullableSum<T> (this IObservable<T> source, Func<T,T,T>
return new ColdObservableEach<T> (sub => {
// ----
T sum = default (T);
return source.Subscribe (s => sum = add (sum, s), () => VerifyCompleted (true, sub, sum));
return source.Subscribe (s => sum = add (sum, s), ex => sub.OnError (ex), () => VerifyCompleted (true, sub, sum));
// ----
}, DefaultColdScheduler);
}
Expand All @@ -117,7 +119,7 @@ static IObservable<T> NullableSum<T> (this IObservable<T> source, Func<T,T,T> ad
return new ColdObservableEach<T> (sub => {
// ----
T sum = default (T);
return source.Subscribe (s => sum = sum != null ? s : add (sum, s), () => VerifyCompleted (true, sub, sum));
return source.Subscribe (s => sum = sum != null ? s : add (sum, s), ex => sub.OnError (ex), () => VerifyCompleted (true, sub, sum));
// ----
}, DefaultColdScheduler);
}
Expand All @@ -131,7 +133,7 @@ static IObservable<T> NonNullableAverage<T> (this IObservable<T> source, Func<T,
// ----
T sum = default (T);
int count = 0;
return source.Subscribe (s => { count++; sum = add (sum, s); }, () => VerifyCompleted (true, sub, avg (sum, count)));
return source.Subscribe (s => { count++; sum = add (sum, s); }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, avg (sum, count)));
// ----
}, DefaultColdScheduler);
}
Expand All @@ -145,7 +147,7 @@ static IObservable<T> NullableAverage<T> (this IObservable<T> source, Func<T,T,T
// ----
T sum = default (T);
int count = 0;
return source.Subscribe (s => { count++; sum = sum != null ? s : add (sum, s); }, () => VerifyCompleted (true, sub, avg (sum, count)));
return source.Subscribe (s => { count++; sum = sum != null ? s : add (sum, s); }, ex => sub.OnError (ex), () => VerifyCompleted (true, sub, avg (sum, count)));
// ----
}, DefaultColdScheduler);
}
Expand Down Expand Up @@ -280,6 +282,7 @@ public static IObservable<TSource> Max<TSource> (this IObservable<TSource> sourc
} else if (comparer.Compare (max, s) < 0)
max = s;
},
ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, max));
// ----
}, DefaultColdScheduler);
Expand Down Expand Up @@ -363,6 +366,7 @@ public static IObservable<TSource> Min<TSource> (this IObservable<TSource> sourc
} else if (comparer.Compare (min, s) > 0)
min = s;
},
ex => sub.OnError (ex),
() => VerifyCompleted (got, sub, min));
// ----
}, DefaultColdScheduler);
Expand Down
9 changes: 4 additions & 5 deletions System.Reactive/System.Reactive.Linq/Observable.Window.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static IObservable<IObservable<TSource>> Window<TSource> (
subjects [x++].Subject.OnNext (v);
}
current++;
}, () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); }));
}, ex => sub.OnError (ex), () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); }));
return dis;
// ----
}, DefaultColdScheduler);
Expand Down Expand Up @@ -198,7 +198,7 @@ public static IObservable<IObservable<TSource>> Window<TSource> (
// This check makes sense when the event was published *at the same time* the subject ends its life time by timeSpan.
if (scheduler.Now - subjects [x].Start < timeSpan)
subjects [x].Subject.OnNext (v);
}, () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); })));
}, ex => sub.OnError (ex), () => { foreach (var sc in subjects) sc.Subject.OnCompleted (); sub.OnCompleted (); })));
return dis;
// ----
}, DefaultColdScheduler);
Expand Down Expand Up @@ -228,14 +228,13 @@ public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening,
var l = new Subject<TSource> ();
var dis = new CompositeDisposable ();
var disClosings = new CompositeDisposable ();
dis.Add (windowOpenings.Subscribe (Observer.Create<TWindowOpening> (
s => {
dis.Add (windowOpenings.Subscribe (s => {
var closing = windowClosingSelector (s);
disClosings.Add (closing.Subscribe (c => {
sub.OnNext (l);
l = new Subject<TSource> ();
}));
}, () => disClosings.Dispose ())));
}, ex => sub.OnError (ex), () => disClosings.Dispose ()));
dis.Add (source.Subscribe (
s => l.OnNext (s), ex => sub.OnError (ex), () => {
Expand Down

0 comments on commit ff66ed1

Please sign in to comment.