/
Timed.scala
343 lines (322 loc) · 15.5 KB
/
Timed.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/*
* Copyright 2001-2016 Artima, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.scalatest.enablers
import org.scalatest._
import org.scalatest.concurrent.Signaler
import org.scalatest.exceptions.StackDepthException
import org.scalactic.{Bad, Good}
import scala.util.{Failure, Success}
//import java.util.TimerTask
//import java.util.Timer
import org.scalatest.time.Span
import org.scalatest.concurrent.SignalerTimeoutTask
import scala.concurrent.{Promise, Future, ExecutionContext}
import org.scalactic.source
/**
* Trait that provides a <code>timeoutAfter</code> construct, which allows you to specify a timeout for an
* operation passed as a by-name parameter, as well as a way to signal/interrupt it if the operation exceeds its time limit.
*/
trait Timed[T] {
/**
* Execute the passed in function <code>f</code> and time it, if the time it takes to complete the function the function
* execution exceeds the passed in <code>timeout</code>, call the passed in <code>exceptionFun</code> to create an instance
* of [[org.scalatest.exceptions.StackDepthException StackDepthException]] and the implementation is responsible to handle it.
*
* @param timeout the maximum amount of time allowed for the passed function
* @param f the passed in function to be timed
* @param signaler the <code>Signaler</code> used to signal/interrupt the function when time limit exceeded
* @param exceptionFun the function to create <code>StackDepthException</code> for failure
* @return the T returned by function <code>f</code>
*/
def timeoutAfter(
timeout: Span,
f: => T,
signaler: Signaler,
exceptionFun: Option[Throwable] => StackDepthException
): T
}
/**
* Companion object for <code>Timed</code> typeclass that offers three implicit providers: one for <code>FutureOutcome</code>,
* one for <code>Future</code> of any type, and one for any other type.
*
* <p>
* The details are in the documentation for the implicit providers themselves (methods <code>timed</code>, <code>timedFutureOf</code>,
* and <code>timedFutureOutcome</code>), but in short if a time limit is exceeded:
* </p>
*
* <ul>
* <li>if the type <code>T</code> in <code>Timed[T]</code> is <code>FutureOutcome</code>
* the <code>FutureOutcome</code> returned by <code>timeoutAfter</code> will result in either <code>Failed</code> or <code>Canceled</code></li>
* <li>if the type is <code>Future[U]</code>, the <code>Future[U]</code> returned by <code>timeoutAfter</code> will fail with either a
* <code>TestFailedDueToTimeoutException</code> or a <code>TestCanceledException</code>.</li>
* <li>otherwise, the <code>timeoutAfter</code> method will itself complete abruptly with either <code>TestFailedDueToTimeoutException</code>
* or <code>TestCanceledException.</li>
* </p>
*/
object Timed {
/*
TODO: See if this bit of documentation from the old Timeouts is still relevant, and if so, insert it
below:
<p>
If the interrupted status of the main test thread (the thread that invoked <code>failAfter</code>) was not invoked
when <code>failAfter</code> was invoked, but is set after the operation times out, it is reset by this method before
it completes abruptly with a <code>TestFailedDueToTimeoutException</code>. The interrupted status will be set by
<code>ThreadSignaler</code>, the default <code>Signaler</code> implementation.
</p>
*/
/**
* Implicit method that provides <code>Timed</code> implementation for any <code>T</code>.
*
* <p>
* If the function completes <em>before</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, this method will return normally.</li>
* <li>If the function completes abruptly with an exception, this method will complete abruptly with that same exception.</li>
* </ul>
*
* <p>
* If the function completes <em>after</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, this method will complete abruptly with a <code>StackDepthException</code> created from <code>exceptionFun</code>.</li>
* <li>If the function completes abruptly with an exception, this method will complete abruptly with a <code>StackDepthException</code> created from <code>exceptionFun</code> that includes the exception thrown by the function as its cause.</li>
* </ul>
*
* <p>
* This implementation will start a timer that when the time limit is exceeded while the passed in function <code>f</code> is still running, it will attempt to call the passed in <code>Signaler</code> to signal/interrupt the running function <code>f</code>.
* </p>
*/
implicit def timed[T]: Timed[T] =
new Timed[T] {
def timeoutAfter(
timeout: Span,
f: => T,
signaler: Signaler,
exceptionFun: Option[Throwable] => StackDepthException
): T = {
val timer = new Timer
val task = new SignalerTimeoutTask(Thread.currentThread(), signaler)
val maxDuration = timeout.totalNanos / 1000 / 1000
timer.schedule(task, maxDuration) // TODO: Probably use a sleep so I can use nanos
val startTime = scala.compat.Platform.currentTime
try {
val result = f
val endTime = scala.compat.Platform.currentTime
task.cancel()
timer.cancel()
result match {
case Exceptional(ex) => throw ex // If the result is Exceptional, the exception is already wrapped, just re-throw it to get the old behavior.
case _ =>
if (task.timedOut || (endTime - startTime) > maxDuration) {
if (task.needToResetInterruptedStatus)
Thread.interrupted() // To reset the flag probably. He only does this if it was not set before and was set after, I think.
throw exceptionFun(None)
}
}
result
}
catch {
case t: Throwable =>
val endTime = scala.compat.Platform.currentTime
task.cancel() // Duplicate code could be factored out I think. Maybe into a finally? Oh, not that doesn't work. So a method.
timer.cancel()
if(task.timedOut || (endTime - startTime) > maxDuration) {
if (task.needToResetInterruptedStatus)
Thread.interrupted() // Clear the interrupt status (There's a race condition here, but not sure we an do anything about that.)
throw exceptionFun(Some(t))
}
else
throw t
}
}
}
/**
* Implicit method that provides <code>Timed</code> implementation for any <code>Future[T]</code>.
*
* <p>
* If the asynchronous function completes <em>before</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, the <code>Future</code> will be completed with the return value of the function.</li>
* <li>If the function completes abruptly with an exception, this method will complete the <code>Future</code> with that same exception.</li>
* </ul>
*
* <p>
* If the asynchronous function completes <em>after</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, this method will complete the <code>Future</code> with a <code>StackDepthException</code> created from <code>exceptionFun</code>.</li>
* <li>If the function completes abruptly with an exception, this method will complete the <code>Future</code> with a <code>StackDepthException</code> created from <code>exceptionFun</code> that includes the exception thrown by the function as its cause.</li>
* </ul>
*
* <p>
* This implementation will start a timer that when the time limit is exceeded while the passed in asynchronous function <code>f</code> is still running, it will attempt to call the passed in <code>Signaler</code> to signal/interrupt the running function <code>f</code>.
* </p>
*/
implicit def timedFutureOf[T](implicit executionContext: ExecutionContext): Timed[Future[T]] =
new Timed[Future[T]] {
def timeoutAfter(
timeout: Span,
f: => Future[T],
signaler: Signaler,
exceptionFun: Option[Throwable] => StackDepthException
): Future[T] = {
val timer = new Timer
val maxDuration = timeout.totalNanos / 1000 / 1000
val startTime = scala.compat.Platform.currentTime
try {
val result = f
val endTime = scala.compat.Platform.currentTime
if ((endTime - startTime) > maxDuration) {
throw exceptionFun(None)
}
val promise = Promise[T]
val task = new SignalerTimeoutTask(Thread.currentThread(), signaler)
val delay = maxDuration - (scala.compat.Platform.currentTime - startTime)
val timer = new Timer
// This call should come before the onComplete call, to avoid race condition between the schedule call and cancel call in the onComplete block as reported in:
// https://github.com/scalatest/scalatest/issues/1147
timer.schedule(task, delay)
result.onComplete { t =>
t match {
case Success(r) =>
task.cancel()
timer.cancel()
if (!promise.isCompleted) { // If it completed already, it will fail or have failed with a timeout exception
val endTime = scala.compat.Platform.currentTime
val duration = endTime - startTime
if (duration > maxDuration)
promise.complete(Failure(exceptionFun(None)))
else
promise.success(r)
}
case Failure(e) =>
task.cancel()
timer.cancel()
if (!promise.isCompleted) { // If it completed already, it will fail or have failed with a timeout exception
val endTime = scala.compat.Platform.currentTime
val duration = endTime - startTime
if (duration > maxDuration)
promise.complete(Failure(exceptionFun(Some(e))))
else
promise.failure(e) // Chee Seng: I wonder if in this case should we use this exception instead of the other one? Not sure.
}
}
}
promise.future
}
catch {
case t: Throwable =>
val endTime = scala.compat.Platform.currentTime
if((endTime - startTime) > maxDuration) {
throw exceptionFun(Some(t))
}
else
throw t
}
}
}
/*
Chee Seng: This one should catch TestCanceledException and change it into
a Canceled. It should catch TestPendingException and change it into
a Pending. It should catch any other non-suite-aborting exception and
turn it into a Failed. A timeout should become a Failed(TestFailedDueToTimeoutException).
I believe this is what you did in AsyncTimeouts.
*/
/**
* Implicit method that provides <code>Timed</code> implementation for <code>FutureOutcome</code>.
*
* <p>
* If the asynchronous function completes <em>before</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, the <code>FutureOutcome</code> will be completed with the <code>Outcome</code> returned from the function.</li>
* <li>If the function completes abruptly with an <code>TestPendingException</code>, this method will complete the <code>FutureOutcome</code> with <code>Pending</code>.</li>
* <li>If the function completes abruptly with an <code>TestCanceledException</code>, this method will complete the <code>FutureOutcome</code> with <code>Canceled</code> that contains the thrown exception.</li>
* <li>If the function completes abruptly with a run-aborting exception, this method will complete the <code>FutureOutcome</code> with <code>Failed</code> that contains the thrown exception.</li>
* <li>If the function completes abruptly with a non-run-aborting exception, this method will fail the <code>FutureOutcome</code> with <code>ExecutionException</code> that contains the thrown exception.</li>
* </ul>
*
* <p>
* If the asynchronous function completes <em>after</em> the timeout expires:
* </p>
*
* <ul>
* <li>If the function returns normally, this method will complete the <code>FutureOutcome</code> with a <code>Outcome</code> that's mapped from the exception thrown from <code>exceptionFun</code>.</li>
* <li>If the function completes abruptly with an exception, this method will complete the <code>FutureOutcome</code> with <code>Outcome</code> that's mapped from the exception thrown from <code>exceptionFun</code> that includes the exception thrown by the function as its cause.</li>
* </ul>
*
* <p>
* This implementation will start a timer that when the time limit is exceeded while the passed in asynchronous function <code>f</code> is still running, it will attempt to call the passed in <code>Signaler</code> to signal/interrupt the running function <code>f</code>.
* </p>
*/
implicit def timedFutureOutcome(implicit executionContext: ExecutionContext): Timed[FutureOutcome] =
new Timed[FutureOutcome] {
def timeoutAfter(
timeout: Span,
f: => FutureOutcome,
signaler: Signaler,
exceptionFun: Option[Throwable] => StackDepthException
): FutureOutcome = {
val timer = new Timer
val maxDuration = timeout.totalNanos / 1000 / 1000
val startTime = scala.compat.Platform.currentTime
val result = f
val endTime = scala.compat.Platform.currentTime
if ((endTime - startTime) > maxDuration)
throw exceptionFun(None)
val task = new SignalerTimeoutTask(Thread.currentThread(), signaler)
val delay = maxDuration - (scala.compat.Platform.currentTime - startTime)
// This call should come before the onCompletedThen call, to avoid race condition between the schedule call and cancel call in the onComplete block as reported in:
// https://github.com/scalatest/scalatest/issues/1147
timer.schedule(task, delay)
val futureOutcome = result.onCompletedThen { t =>
t match {
case Good(r) =>
task.cancel()
timer.cancel()
val endTime = scala.compat.Platform.currentTime
val duration = endTime - startTime
try {
if (duration > maxDuration) {
throw exceptionFun(None)
}
}
catch {
case t: Throwable =>
throw t
}
case Bad(e) =>
task.cancel()
timer.cancel()
val endTime = scala.compat.Platform.currentTime
val duration = endTime - startTime
if (duration > maxDuration)
throw exceptionFun(None)
else
throw e
}
}
futureOutcome
}
}
}