/
BasicBackend.scala
555 lines (474 loc) · 26 KB
/
BasicBackend.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
package slick.basic
import slick.util.AsyncExecutor.{Priority, Continuation, Fresh, WithConnection}
import java.io.Closeable
import java.util.concurrent.atomic.{AtomicReferenceArray, AtomicLong}
import com.typesafe.config.Config
import scala.concurrent.{Promise, ExecutionContext, Future}
import scala.util.{Success, Failure}
import scala.util.control.NonFatal
import scala.collection.compat._
import org.slf4j.LoggerFactory
import org.reactivestreams._
import slick.SlickException
import slick.dbio._
import slick.util._
/** Backend for the basic database and session handling features.
* Concrete backends like `JdbcBackend` extend this type and provide concrete
* types for `Database`, `DatabaseFactory` and `Session`. */
trait BasicBackend { self =>
protected lazy val actionLogger = new SlickLogger(LoggerFactory.getLogger(classOf[BasicBackend].getName+".action"))
protected lazy val streamLogger = new SlickLogger(LoggerFactory.getLogger(classOf[BasicBackend].getName+".stream"))
type This >: this.type <: BasicBackend
/** The type of database objects used by this backend. */
type Database <: DatabaseDef
/** The type of the database factory used by this backend. */
type DatabaseFactory
/** The type of session objects used by this backend. */
type Session >: Null <: SessionDef
/** The type of the context used for running SynchronousDatabaseActions */
type Context >: Null <: BasicActionContext
/** The type of the context used for streaming SynchronousDatabaseActions */
type StreamingContext >: Null <: Context with BasicStreamingActionContext
/** The database factory */
val Database: DatabaseFactory
/** Create a Database instance through [[https://github.com/typesafehub/config Typesafe Config]].
* The supported config keys are backend-specific. This method is used by `DatabaseConfig`.
*
* @param path The path in the configuration file for the database configuration, or an empty
* string for the top level of the `Config` object.
* @param config The `Config` object to read from.
*/
def createDatabase(config: Config, path: String): Database
/** A database instance to which connections can be created. */
trait DatabaseDef extends Closeable { this: Database =>
/** Create a new session. The session needs to be closed explicitly by calling its close() method. */
def createSession(): Session
/** Free all resources allocated by Slick for this Database. This is done asynchronously, so
* you need to wait for the returned `Future` to complete in order to ensure that everything
* has been shut down. */
def shutdown: Future[Unit] = Future(close)(ExecutionContext.fromExecutor(AsyncExecutor.shutdownExecutor))
/** Free all resources allocated by Slick for this Database, blocking the current thread until
* everything has been shut down.
*
* Backend implementations which are based on a naturally blocking shutdown procedure can
* simply implement this method and get `shutdown` as an asynchronous wrapper for free. If
* the underlying shutdown procedure is asynchronous, you should implement `shutdown` instead
* and wrap it with `Await.result` in this method. */
def close: Unit
/** Run an Action asynchronously and return the result as a Future. */
final def run[R](a: DBIOAction[R, NoStream, Nothing]): Future[R] = runInternal(a, false)
private[slick] final def runInternal[R](a: DBIOAction[R, NoStream, Nothing], useSameThread: Boolean): Future[R] =
try runInContext(a, createDatabaseActionContext(useSameThread), false, true)
catch { case NonFatal(ex) => Future.failed(ex) }
/** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
* `DBIOAction` and return the result directly as a stream without buffering everything first.
* This method is only supported for streaming actions.
*
* The Publisher itself is just a stub that holds a reference to the action and this Database.
* The action does not actually start to run until the call to `onSubscribe` returns, after
* which the Subscriber is responsible for reading the full response or cancelling the
* Subscription. The created Publisher can be reused to serve a multiple Subscribers,
* each time triggering a new execution of the action.
*
* For the purpose of combinators such as `cleanup` which can run after a stream has been
* produced, cancellation of a stream by the Subscriber is not considered an error. For
* example, there is no way for the Subscriber to cause a rollback when streaming the
* results of `someQuery.result.transactionally`.
*
* When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
* is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
* from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next
* row will be prefetched (in order to buffer the next result page from the server when a page
* boundary has been reached). */
final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)
private[slick] final def streamInternal[T](a: DBIOAction[_, Streaming[T], Nothing], useSameThread: Boolean): DatabasePublisher[T] =
createPublisher(a, s => createStreamingDatabaseActionContext(s, useSameThread))
/** Create a Reactive Streams `Publisher` using the given context factory. */
protected[this] def createPublisher[T](a: DBIOAction[_, Streaming[T], Nothing], createCtx: Subscriber[_ >: T] => StreamingContext): DatabasePublisher[T] = new DatabasePublisher[T] {
def subscribe(s: Subscriber[_ >: T]) = {
if(s eq null) throw new NullPointerException("Subscriber is null")
val ctx = createCtx(s)
if(streamLogger.isDebugEnabled) streamLogger.debug(s"Signaling onSubscribe($ctx)")
val subscribed = try { s.onSubscribe(ctx.subscription); true } catch {
case NonFatal(ex) =>
streamLogger.warn("Subscriber.onSubscribe failed unexpectedly", ex)
false
}
if(subscribed) {
try {
runInContext(a, ctx, true, true).onComplete {
case Success(_) => ctx.tryOnComplete
case Failure(t) => ctx.tryOnError(t)
}(DBIO.sameThreadExecutionContext)
} catch { case NonFatal(ex) => ctx.tryOnError(ex) }
}
}
}
/** Create the default DatabaseActionContext for this backend. */
protected[this] def createDatabaseActionContext[T](_useSameThread: Boolean): Context
/** Create the default StreamingDatabaseActionContext for this backend. */
protected[this] def createStreamingDatabaseActionContext[T](s: Subscriber[_ >: T], useSameThread: Boolean): StreamingContext
/** Run an Action in an existing DatabaseActionContext. This method can be overridden in
* subclasses to support new DatabaseActions which cannot be expressed through
* SynchronousDatabaseAction.
*
* @param streaming Whether to return the result as a stream. In this case, the context must
* be a `StreamingDatabaseActionContext` and the Future result should be
* completed with `null` or failed after streaming has finished. This
* method should not call any `Subscriber` method other than `onNext`. */
protected[this] def runInContext[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean): Future[R] = {
runInContextSafe(a, ctx, streaming, topLevel, stackLevel = 0)
}
// If recursion has reached the limit then run next batch in trampoline
private[this] def runInContextSafe[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean, stackLevel: Int): Future[R] = {
if (stackLevel < 100) {
runInContextInline(a, ctx, streaming, topLevel, stackLevel + 1)
} else {
val promise = Promise[R]
val runnable = new Runnable {
override def run() = {
try {
promise.completeWith(runInContextInline(a, ctx, streaming, topLevel, stackLevel = 1))
} catch {
case NonFatal(ex) => promise.failure(ex)
}
}
}
DBIO.sameThreadExecutionContext.execute(runnable)
promise.future
}
}
private[this] def runInContextInline[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean, stackLevel: Int): Future[R] = {
logAction(a, ctx)
a match {
case SuccessAction(v) => Future.successful(v)
case FailureAction(t) => Future.failed(t)
case FutureAction(f) => f
case FlatMapAction(base, f, ec) =>
runInContextSafe(base, ctx, false, topLevel, stackLevel).flatMap(v => runInContext(f(v), ctx, streaming, false))(ctx.getEC(ec))
case AndThenAction(actions) =>
val last = actions.length - 1
def run(pos: Int, v: Any): Future[Any] = {
val f1 = runInContextSafe(actions(pos), ctx, streaming && pos == last, topLevel && pos == 0, stackLevel)
if(pos == last) f1
else f1.flatMap(run(pos + 1, _))(DBIO.sameThreadExecutionContext)
}
run(0, null).asInstanceOf[Future[R]]
case sa@SequenceAction(actions) =>
val len = actions.length
val results = new AtomicReferenceArray[Any](len)
def run(pos: Int): Future[Any] = {
if (pos == len) Future.successful {
val b = sa.cbf.newBuilder
var i = 0
while (i < len) {
b += results.get(i)
i += 1
}
b.result()
}
else runInContextSafe(actions(pos), ctx, false, topLevel && pos == 0, stackLevel).flatMap { (v: Any) =>
results.set(pos, v)
run(pos + 1)
}(DBIO.sameThreadExecutionContext)
}
run(0).asInstanceOf[Future[R]]
case CleanUpAction(base, f, keepFailure, ec) =>
val p = Promise[R]()
runInContextSafe(base, ctx, streaming, topLevel, stackLevel).onComplete { t1 =>
try {
val a2 = f(t1 match {
case Success(_) => None
case Failure(t) => Some(t)
})
runInContext(a2, ctx, false, false).onComplete { t2 =>
if (t2.isFailure && (t1.isSuccess || !keepFailure)) p.complete(t2.asInstanceOf[Failure[R]])
else p.complete(t1)
}(DBIO.sameThreadExecutionContext)
} catch {
case NonFatal(ex) =>
val e = t1 match {
case Failure(t) if keepFailure => t
case _ => ex
}
if (!p.tryFailure(e)) {
actionLogger.warn("Exception after promise completed", e)
}
}
}(ctx.getEC(ec))
p.future
case FailedAction(a) =>
runInContextSafe(a, ctx, false, topLevel, stackLevel).failed.asInstanceOf[Future[R]]
case AsTryAction(a) =>
val p = Promise[R]()
runInContextSafe(a, ctx, false, topLevel, stackLevel).onComplete(v => p.success(v.asInstanceOf[R]))(DBIO.sameThreadExecutionContext)
p.future
case NamedAction(a, _) =>
runInContextSafe(a, ctx, streaming, topLevel, stackLevel)
case a: SynchronousDatabaseAction[_, _, _, _] =>
if (streaming) {
if (a.supportsStreaming) streamSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect]], ctx.asInstanceOf[StreamingContext], !topLevel).asInstanceOf[Future[R]]
else runInContextSafe(CleanUpAction(AndThenAction(Vector(DBIO.Pin, a.nonFusedEquivalentAction)), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel, stackLevel)
} else runSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, This, _]], ctx, !topLevel)
case a: DatabaseAction[_, _, _] =>
throw new SlickException(s"Unsupported database action $a for $this")
}
}
/** Within a synchronous execution, ensure that a Session is available. */
protected[this] final def acquireSession(ctx: Context): Unit =
if(!ctx.isPinned) ctx.currentSession = createSession()
/** Within a synchronous execution, close the current Session unless it is pinned.
*
* @param discardErrors If set to true, swallow all non-fatal errors that arise while
* closing the Session. */
protected[this] final def releaseSession(ctx: Context, discardErrors: Boolean): Unit =
if(!ctx.isPinned) {
try ctx.currentSession.close() catch { case NonFatal(ex) if(discardErrors) => }
ctx.currentSession = null
}
/** Run a `SynchronousDatabaseAction` on this database. */
protected[this] def runSynchronousDatabaseAction[R](a: SynchronousDatabaseAction[R, NoStream, This, _], ctx: Context, continuation: Boolean): Future[R] = {
val promise = Promise[R]()
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
def priority = {
ctx.readSync
ctx.priority(continuation)
}
def run: Unit =
try {
ctx.readSync
val res = try {
acquireSession(ctx)
val res = try a.run(ctx) catch { case NonFatal(ex) =>
releaseSession(ctx, true)
throw ex
}
releaseSession(ctx, false)
res
} finally {
if (!ctx.isPinned && ctx.priority(continuation) != WithConnection) connectionReleased = true
ctx.sync = 0
}
promise.success(res)
} catch { case NonFatal(ex) => promise.tryFailure(ex) }
})
promise.future
}
/** Stream a `SynchronousDatabaseAction` on this database. */
protected[this] def streamSynchronousDatabaseAction(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, continuation: Boolean): Future[Null] = {
ctx.streamingAction = a
scheduleSynchronousStreaming(a, ctx, continuation)(null)
ctx.streamingResultPromise.future
}
/** Stream a part of the results of a `SynchronousDatabaseAction` on this database. */
protected[BasicBackend] def scheduleSynchronousStreaming(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, continuation: Boolean)(initialState: a.StreamState): Unit =
try {
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
private[this] def str(l: Long) = if(l != Long.MaxValue) l else if(GlobalConfig.unicodeDump) "\u221E" else "oo"
def priority = {
ctx.readSync
ctx.priority(continuation)
}
def run(): Unit =
try {
val debug = streamLogger.isDebugEnabled
var state = initialState
ctx.readSync
if(state eq null) {
try {
acquireSession(ctx)
} catch { case NonFatal(ex) =>
if (!ctx.isPinned) connectionReleased = true
throw ex
}
}
var demand = ctx.demandBatch
var realDemand = if(demand < 0) demand - Long.MinValue else demand
do {
try {
if(debug)
streamLogger.debug((if(state eq null) "Starting initial" else "Restarting ") + " streaming action, realDemand = " + str(realDemand))
if(ctx.cancelled) {
if(ctx.deferredError ne null) throw ctx.deferredError
if(state ne null) { // streaming cancelled before finishing
val oldState = state
state = null
a.cancelStream(ctx, oldState)
}
} else if(realDemand > 0 || (state eq null)) {
val oldState = state
state = null
state = a.emitStream(ctx, realDemand, oldState)
}
if(state eq null) { // streaming finished and cleaned up
releaseSession(ctx, true)
if (!ctx.isPinned) connectionReleased = true
ctx.sync = 0
ctx.streamingResultPromise.trySuccess(null)
}
} catch { case NonFatal(ex) =>
if(state ne null) try a.cancelStream(ctx, state) catch ignoreFollowOnError
releaseSession(ctx, true)
if (!ctx.isPinned) connectionReleased = true
ctx.sync = 0
throw ex
} finally {
ctx.streamState = state
ctx.sync = 0
}
if(debug) {
if(state eq null) streamLogger.debug(s"Sent up to ${str(realDemand)} elements - Stream " + (if(ctx.cancelled) "cancelled" else "completely delivered"))
else streamLogger.debug(s"Sent ${str(realDemand)} elements, more available - Performing atomic state transition")
}
demand = ctx.delivered(demand)
realDemand = if(demand < 0) demand - Long.MinValue else demand
} while ((state ne null) && realDemand > 0)
if(debug) {
if(state ne null) streamLogger.debug("Suspending streaming action with continuation (more data available)")
else streamLogger.debug("Finished streaming action")
}
} catch {
case NonFatal(ex) => ctx.streamingResultPromise.tryFailure(ex)
}
})
} catch { case NonFatal(ex) =>
streamLogger.warn("Error scheduling synchronous streaming", ex)
throw ex
}
/** Return the default ExecutionContet for this Database which should be used for running
* SynchronousDatabaseActions for asynchronous execution. */
protected[this] def synchronousExecutionContext: ExecutionContext
protected[this] def logAction(a: DBIOAction[_, NoStream, Nothing], ctx: Context): Unit = {
if(actionLogger.isDebugEnabled && a.isLogged) {
ctx.sequenceCounter += 1
val logA = a.nonFusedEquivalentAction
val aPrefix = if(a eq logA) "" else "[fused] "
val dump = new TreePrinter(prefix = " ", firstPrefix = aPrefix, narrow = {
case a: DBIOAction[_, _, _] => a.nonFusedEquivalentAction
case o => o
}).get(logA)
val msg = DumpInfo.highlight("#" + ctx.sequenceCounter) + ": " + dump.substring(0, dump.length-1)
actionLogger.debug(msg)
}
}
}
/** A logical session of a `Database`. The underlying database connection is created lazily on demand. */
trait SessionDef extends Closeable {
/** Close this Session. */
def close(): Unit
/** Force an actual database session to be opened. Slick sessions are lazy, so you do not
* get a real database connection until you need it or you call force() on the session. */
def force(): Unit
}
/** The context object passed to database actions by the execution engine. */
trait BasicActionContext extends ActionContext {
/** Whether to run all operations on the current thread or schedule them normally on the
* appropriate ExecutionContext. This is used by the blocking API. */
protected[BasicBackend] val useSameThread: Boolean
/** Return the specified ExecutionContext unless running in same-thread mode, in which case
* `Action.sameThreadExecutionContext` is returned instead. */
private[BasicBackend] def getEC(ec: ExecutionContext): ExecutionContext =
if(useSameThread) DBIO.sameThreadExecutionContext else ec
/** A volatile variable to enforce the happens-before relationship (see
* [[https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html]] and
* [[http://gee.cs.oswego.edu/dl/jmm/cookbook.html]]) when executing something in
* a synchronous action context. It is read when entering the context and written when leaving
* so that all writes to non-volatile variables within the context are visible to the next
* synchronous execution. */
@volatile private[BasicBackend] var sync = 0
private[BasicBackend] def readSync = sync // workaround for SI-9053 to avoid warnings
private[BasicBackend] var currentSession: Session = null
private[BasicBackend] def priority(continuation: Boolean): Priority = {
if (currentSession != null) WithConnection
else if (continuation) Continuation
else Fresh
}
/** Used for the sequence counter in Action debug output. This variable is volatile because it
* is only updated sequentially but not protected by a synchronous action context. */
@volatile private[BasicBackend] var sequenceCounter = 0
def session: Session = currentSession
}
/** A special DatabaseActionContext for streaming execution. */
protected[this] class BasicStreamingActionContext(subscriber: Subscriber[_], protected[BasicBackend] val useSameThread: Boolean, database: Database) extends BasicActionContext with StreamingActionContext with Subscription {
/** Whether the Subscriber has been signaled with `onComplete` or `onError`. */
private[this] var finished = false
/** The total number of elements requested and not yet marked as delivered by the synchronous
* streaming action. Whenever this value drops to 0, streaming is suspended. When it is raised
* up from 0 in `request`, streaming is scheduled to be restarted. It is initially set to
* `Long.MinValue` when streaming starts. Any negative value above `Long.MinValue` indicates
* the actual demand at that point. It is reset to 0 when the initial streaming ends. */
private[this] val remaining = new AtomicLong(Long.MinValue)
/** An error that will be signaled to the Subscriber when the stream is cancelled or
* terminated. This is used for signaling demand overflow in `request()` while guaranteeing
* that the `onError` message does not overlap with an active `onNext` call. */
private[BasicBackend] var deferredError: Throwable = null
/** The state for a suspended streaming action. Must only be set from a synchronous action
* context. */
private[BasicBackend] var streamState: AnyRef = null
/** The streaming action which may need to be continued with the suspended state */
private[BasicBackend] var streamingAction: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect] = null
@volatile private[this] var cancelRequested = false
/** The Promise to complete when streaming has finished. */
val streamingResultPromise = Promise[Null]()
/** Indicate that the specified number of elements has been delivered. Returns the remaining
* demand. This is an atomic operation. It must only be called from the synchronous action
* context which performs the streaming. */
def delivered(num: Long): Long = remaining.addAndGet(-num)
/** Get the current demand that has not yet been marked as delivered and mark it as being in
* the current batch. When this value is negative, the initial streaming action is still
* running and the real demand can be computed by subtracting `Long.MinValue` from the
* returned value. */
def demandBatch: Long = remaining.get()
/** Whether the stream has been cancelled by the Subscriber */
def cancelled: Boolean = cancelRequested
def emit(v: Any): Unit = subscriber.asInstanceOf[Subscriber[Any]].onNext(v)
/** Finish the stream with `onComplete` if it is not finished yet. May only be called from a
* synchronous action context. */
def tryOnComplete: Unit = if(!finished && !cancelRequested) {
if(streamLogger.isDebugEnabled) streamLogger.debug("Signaling onComplete()")
finished = true
try subscriber.onComplete() catch {
case NonFatal(ex) => streamLogger.warn("Subscriber.onComplete failed unexpectedly", ex)
}
}
/** Finish the stream with `onError` if it is not finished yet. May only be called from a
* synchronous action context. */
def tryOnError(t: Throwable): Unit = if(!finished) {
if(streamLogger.isDebugEnabled) streamLogger.debug(s"Signaling onError($t)")
finished = true
try subscriber.onError(t) catch {
case NonFatal(ex) => streamLogger.warn("Subscriber.onError failed unexpectedly", ex)
}
}
/** Restart a suspended streaming action. Must only be called from the Subscriber context. */
def restartStreaming: Unit = {
readSync
val s = streamState
if(s ne null) {
streamState = null
if(streamLogger.isDebugEnabled) streamLogger.debug("Scheduling stream continuation after transition from demand = 0")
val a = streamingAction
database.scheduleSynchronousStreaming(a, this.asInstanceOf[StreamingContext], continuation = true)(s.asInstanceOf[a.StreamState])
} else {
if(streamLogger.isDebugEnabled) streamLogger.debug("Saw transition from demand = 0, but no stream continuation available")
}
}
def subscription = this
////////////////////////////////////////////////////////////////////////// Subscription methods
def request(l: Long): Unit = if(!cancelRequested) {
if(l <= 0) {
deferredError = new IllegalArgumentException("Requested count must not be <= 0 (see Reactive Streams spec, 3.9)")
cancel
} else {
if(!cancelRequested && remaining.getAndAdd(l) == 0L) restartStreaming
}
}
def cancel: Unit = if(!cancelRequested) {
cancelRequested = true
// Restart streaming because cancelling requires closing the result set and the session from
// within a synchronous action context. This will also complete the result Promise and thus
// allow the rest of the scheduled Action to run.
if(remaining.getAndSet(Long.MaxValue) == 0L) restartStreaming
}
}
}