-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Catch.cs
175 lines (153 loc) · 6.28 KB
/
Catch.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
namespace SuperLinq.Async;
public static partial class AsyncSuperEnumerable
{
/// <summary>
/// Creates a sequence that corresponds to the source sequence, concatenating it with the sequence resulting from
/// calling an exception handler function in case of an error.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <typeparam name="TException">Exception type to catch.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="handler">Handler to invoke when an exception of the specified type occurs.</param>
/// <returns>Source sequence, concatenated with an exception handler result sequence in case of an error.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="handler"/> is <see
/// langword="null"/>.</exception>
/// <remarks>
/// This method uses deferred execution and streams its results.
/// </remarks>
public static IAsyncEnumerable<TSource> Catch<TSource, TException>(
this IAsyncEnumerable<TSource> source,
Func<TException, IAsyncEnumerable<TSource>> handler)
where TException : Exception
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(handler);
return Core(source, handler);
static async IAsyncEnumerable<TSource> Core(
IAsyncEnumerable<TSource> source,
Func<TException, IAsyncEnumerable<TSource>> handler,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
IAsyncEnumerable<TSource>? errSource;
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken);
while (true)
{
try
{
if (!await e.MoveNextAsync())
yield break;
}
catch (TException ex)
{
errSource = handler(ex);
break;
}
yield return e.Current;
}
Assert.NotNull(errSource);
await foreach (var item in errSource.WithCancellation(cancellationToken).ConfigureAwait(false))
yield return item;
}
}
/// <summary>
/// Creates a sequence that returns the elements of the first sequence, switching to the second in case of an error.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <param name="first">First sequence.</param>
/// <param name="second">Second sequence, concatenated to the result in case the first sequence completes
/// exceptionally.</param>
/// <returns>The first sequence, followed by the second sequence in case an error is produced.</returns>
/// <exception cref="ArgumentNullException"><paramref name="first"/> or <paramref name="second"/> is <see
/// langword="null"/>.</exception>
/// <remarks>
/// This method uses deferred execution and streams its results.
/// </remarks>
public static IAsyncEnumerable<TSource> Catch<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
{
ArgumentNullException.ThrowIfNull(first);
ArgumentNullException.ThrowIfNull(second);
return Catch(new[] { first, second });
}
/// <summary>
/// Creates a sequence by concatenating source sequences until a source sequence completes successfully.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <param name="sources">Source sequences.</param>
/// <returns>Sequence that continues to concatenate source sequences while errors occur.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is <see langword="null"/>.</exception>
/// <remarks>
/// This method uses deferred execution and streams its results.
/// </remarks>
public static IAsyncEnumerable<TSource> Catch<TSource>(params IAsyncEnumerable<TSource>[] sources)
{
ArgumentNullException.ThrowIfNull(sources);
return sources.ToAsyncEnumerable().Catch();
}
/// <summary>
/// Creates a sequence by concatenating source sequences until a source sequence completes successfully.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <param name="sources">Source sequences.</param>
/// <returns>Sequence that continues to concatenate source sequences while errors occur.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is <see langword="null"/>.</exception>
/// <remarks>
/// This method uses deferred execution and streams its results.
/// </remarks>
public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
{
ArgumentNullException.ThrowIfNull(sources);
return sources.ToAsyncEnumerable().Catch();
}
/// <summary>
/// Creates a sequence by concatenating source sequences until a source sequence completes successfully.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <param name="sources">Source sequences.</param>
/// <returns>Sequence that continues to concatenate source sequences while errors occur.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is <see langword="null"/>.</exception>
/// <remarks>
/// This method uses deferred execution and streams its results.
/// </remarks>
public static IAsyncEnumerable<TSource> Catch<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
{
ArgumentNullException.ThrowIfNull(sources);
return Core(sources);
static async IAsyncEnumerable<TSource> Core(
IAsyncEnumerable<IAsyncEnumerable<TSource>> sources,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var sourceIter = sources.GetConfiguredAsyncEnumerator(cancellationToken);
if (!await sourceIter.MoveNextAsync())
yield break;
var source = sourceIter.Current;
var hasNext = await sourceIter.MoveNextAsync();
// outer loop is not infinite.
// on last loop (`hasNext == false`), then either
// `source` will iterate successfully (yield break)
// or it will fail (throw). either way, it will not
// make it outside of the inner `while (true)`
while (true)
{
ArgumentNullException.ThrowIfNull(source);
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken);
while (true)
{
try
{
if (!await e.MoveNextAsync())
yield break;
}
catch
{
if (!hasNext)
throw;
break;
}
yield return e.Current;
}
source = sourceIter.Current;
hasNext = await sourceIter.MoveNextAsync();
}
}
}
}