/
DBIOAction.scala
587 lines (518 loc) · 32.6 KB
/
DBIOAction.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
package slick.dbio
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import slick.SlickException
import slick.basic.BasicBackend
import slick.util.{ignoreFollowOnError, Dumpable, DumpInfo}
import slick.compat.collection.*
import org.reactivestreams.Subscription
/** A Database I/O Action that can be executed on a database. The DBIOAction type allows a
* separation of execution logic and resource usage management logic from composition logic.
* DBIOActions can be composed with methods such as `andThen`, `andFinally` and `flatMap`.
* Individual parts of a composite DBIOAction are always executed serially on a single database,
* but possibly in different database sessions, unless the session is pinned either explicitly
* (using `withPinnedSession`) or implicitly (e.g. through a transaction).
*
* The actual implementation base type for all Actions is `DBIOAction`. `StreamingDBIO` and
* `DBIO` are type aliases which discard the effect type (and the streaming result type in the
* latter case) to make DBIOAction types easier to write when these features are not needed. All
* primitive DBIOActions and all DBIOActions produced by the standard combinators in Slick have
* correct Effect types and are streaming (if possible).
*
* @tparam R The result type when executing the DBIOAction and fully materializing the result.
* @tparam S An encoding of the result type for streaming results. If this action is capable of
* streaming, it is `Streaming[T]` for an element type `T`. For non-streaming
* DBIOActions it is `NoStream`.
* @tparam E The DBIOAction's effect type, e.g. `Effect.Read with Effect.Write`. When composing
* actions, the correct combined effect type will be inferred. Effects can be used in
* user code, e.g. to automatically direct all read-only Actions to a slave database
* and write Actions to the master copy.
*/
sealed trait DBIOAction[+R, +S <: NoStream, -E <: Effect] extends Dumpable {
/** Transform the result of a successful execution of this action. If this action fails, the
* resulting action also fails. */
def map[R2](f: R => R2)(implicit executor: ExecutionContext): DBIOAction[R2, NoStream, E] =
flatMap[R2, NoStream, E](r => SuccessAction[R2](f(r)))
/** Use the result produced by the successful execution of this action to compute and then
* run the next action in sequence. The resulting action fails if either this action, the
* computation, or the computed action fails. */
def flatMap[R2, S2 <: NoStream, E2 <: Effect](f: R => DBIOAction[R2, S2, E2])(implicit executor: ExecutionContext): DBIOAction[R2, S2, E with E2] =
FlatMapAction[R2, S2, R, E with E2](this, f, executor)
/** Creates a new DBIOAction with one level of nesting flattened, this method is equivalent
* to `flatMap(identity)`.
*/
def flatten[R2, S2 <: NoStream, E2 <: Effect](implicit ev : R <:< DBIOAction[R2,S2,E2]) = flatMap(ev)(DBIO.sameThreadExecutionContext)
/** Run another action after this action, if it completed successfully, and return the result
* of the second action. If either of the two actions fails, the resulting action also fails. */
def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case AndThenAction(as2) => AndThenAction[R2, S2, E with E2](this +: as2)
case a => AndThenAction[R2, S2, E with E2](Vector(this, a))
}
/** Run another action after this action, if it completed successfully, and return the result
* of both actions. If either of the two actions fails, the resulting action also fails. */
def zip[R2, E2 <: Effect](a: DBIOAction[R2, NoStream, E2]): DBIOAction[(R, R2), NoStream, E with E2] =
SequenceAction[Any, ArrayBuffer[Any], E with E2](Vector(this, a)).map { r =>
(r(0).asInstanceOf[R], r(1).asInstanceOf[R2])
} (DBIO.sameThreadExecutionContext)
/** Run another action after this action, if it completed successfully, and zip the result
* of both actions with a function `f`, then create a new DBIOAction holding this result,
* If either of the two actions fails, the resulting action also fails. */
def zipWith[R2, E2 <: Effect,R3](a: DBIOAction[R2, NoStream, E2])(f:(R,R2) =>R3)(implicit executor: ExecutionContext): DBIOAction[R3, NoStream, E with E2] =
SequenceAction[Any, ArrayBuffer[Any], E with E2](Vector(this, a)).map { r =>
f(r(0).asInstanceOf[R], r(1).asInstanceOf[R2])
} (executor)
/** Run another action after this action, whether it succeeds or fails, and then return the
* result of the first action. If the first action fails, its failure is propagated, whether
* the second action fails or succeeds. If the first action succeeds, a failure of the second
* action is propagated. */
def andFinally[E2 <: Effect](a: DBIOAction[_, NoStream, E2]): DBIOAction[R, S, E with E2] =
cleanUp[E2](_ => a)(DBIO.sameThreadExecutionContext)
/** Run another action after this action, whether it succeeds or fails, in order to clean up or
* transform an error produced by this action. The clean-up action is computed from the failure
* of this action, wrapped in `Some`, or `None` if this action succeeded.
*
* @param keepFailure If this action returns successfully, the resulting action also returns
* successfully unless the clean-up action fails. If this action fails and
* `keepFailure` is set to `true` (the default), the resulting action fails
* with the same error, no matter whether the clean-up action succeeds or
* fails. If `keepFailure` is set to `false`, an error from the clean-up
* action will override the error from this action. */
def cleanUp[E2 <: Effect](f: Option[Throwable] => DBIOAction[_, NoStream, E2], keepFailure: Boolean = true)(implicit executor: ExecutionContext): DBIOAction[R, S, E with E2] =
CleanUpAction[R, S, E with E2](this, f, keepFailure, executor)
/** A shortcut for `andThen`. */
final def >> [R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] =
andThen[R2, S2, E2](a)
/** Filter the result of this action with the given predicate. If the predicate matches, the
* original result is returned, otherwise the resulting action fails with a
* NoSuchElementException. */
final def filter(p: R => Boolean)(implicit executor: ExecutionContext): DBIOAction[R, NoStream, E] =
withFilter(p)
def withFilter(p: R => Boolean)(implicit executor: ExecutionContext): DBIOAction[R, NoStream, E] =
flatMap(v => if(p(v)) SuccessAction(v) else throw new NoSuchElementException("Action.withFilter failed"))
/** Transform the result of a successful execution of this action, if the given partial function is defined at that value,
* otherwise, the result DBIOAction will fail with a `NoSuchElementException`.
*
* If this action fails, the resulting action also fails. */
def collect[R2](pf: PartialFunction[R,R2])(implicit executor: ExecutionContext): DBIOAction[R2, NoStream, E] =
map(r1 => pf.applyOrElse(r1,(r:R) => throw new NoSuchElementException(s"DBIOAction.collect partial function is not defined at: $r")))
/** Return an action which contains the Throwable with which this action failed as its result.
* If this action succeeded, the resulting action fails with a NoSuchElementException. */
def failed: DBIOAction[Throwable, NoStream, E] = FailedAction[E](this)
/** Convert a successful result `v` of this action into a successful result `Success(v)` and a
* failure `t` into a successful result `Failure(t)`. This is the most generic combinator that
* can be used for error recovery. If possible, use [[andFinally]] or [[cleanUp]] instead,
* because those combinators, unlike `asTry`, support streaming. */
def asTry: DBIOAction[Try[R], NoStream, E] = AsTryAction[R, E](this)
/** Use a pinned database session when running this action. If it is composed of multiple
* database actions, they will all use the same session, even when sequenced with non-database
* actions. For non-composite or non-database actions, this has no effect. */
def withPinnedSession: DBIOAction[R, S, E] = DBIO.Pin andThen this andFinally DBIO.Unpin
/** Get a wrapping action which has a name that will be included in log output. */
def named(name: String): DBIOAction[R, S, E] =
NamedAction[R, S, E](this, name)
/** Get the equivalent non-fused action if this action has been fused, otherwise this
* action is returned. */
def nonFusedEquivalentAction: DBIOAction[R, S, E] = this
/** Whether or not this action should be included in log output by default. */
def isLogged: Boolean = false
}
object DBIOAction {
/** Convert a `Future` to a [[DBIOAction]]. */
def from[R](f: Future[R]): DBIOAction[R, NoStream, Effect] = FutureAction[R](f)
/** Lift a constant value to a [[DBIOAction]]. */
def successful[R](v: R): DBIOAction[R, NoStream, Effect] = SuccessAction[R](v)
/** Create a [[DBIOAction]] that always fails. */
def failed(t: Throwable): DBIOAction[Nothing, NoStream, Effect] = FailureAction(t)
private[this] def groupBySynchronicity[R, E <: Effect](in: IterableOnce[DBIOAction[R, NoStream, E]]): Vector[Vector[DBIOAction[R, NoStream, E]]] = {
var state = 0 // no current = 0, sync = 1, async = 2
var current: mutable.Builder[DBIOAction[R, NoStream, E], Vector[DBIOAction[R, NoStream, E]]] = null
val total = Vector.newBuilder[Vector[DBIOAction[R, NoStream, E]]]
in.iterator.foreach { a =>
val msgState = if(a.isInstanceOf[SynchronousDatabaseAction[_, _, _, _, _]]) 1 else 2
if(msgState != state) {
if(state != 0) total += current.result()
current = Vector.newBuilder
state = msgState
}
current += a
}
if(state != 0) total += current.result()
total.result()
}
/** Transform a `Option[ DBIO[R] ]` into a `DBIO[ Option[R] ]`. */
def sequenceOption[R, E <: Effect](in: Option[DBIOAction[R, NoStream, E]]): DBIOAction[Option[R], NoStream, E] = {
implicit val ec = DBIO.sameThreadExecutionContext
sequence(in.toList).map(_.headOption)
}
/** Transform a `TraversableOnce[ DBIO[R] ]` into a `DBIO[ TraversableOnce[R] ]`. */
def sequence[R, M[+_] <: IterableOnce[_], E <: Effect](in: M[DBIOAction[R, NoStream, E]])(implicit cbf: Factory[R, M[R]]): DBIOAction[M[R], NoStream, E] = {
implicit val ec = DBIO.sameThreadExecutionContext
def sequenceGroupAsM(g: Vector[DBIOAction[R, NoStream, E]]): DBIOAction[M[R], NoStream, E] = {
if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _, _]]) { // fuse synchronous group
new SynchronousDatabaseAction.Fused[M[R], NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext) = {
val b = cbf.newBuilder
g.foreach(a => b += a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E]].run(context))
b.result()
}
override def nonFusedEquivalentAction: DBIOAction[M[R], NoStream, E] = SequenceAction[R, M[R], E](g)
}
} else SequenceAction[R, M[R], E](g)
}
def sequenceGroupAsSeq(g: Vector[DBIOAction[R, NoStream, E]]): DBIOAction[Seq[R], NoStream, E] = {
if(g.length == 1) {
if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _, _]]) { // fuse synchronous group
new SynchronousDatabaseAction.Fused[Seq[R], NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext): Seq[R] =
g.head.asInstanceOf[SynchronousDatabaseAction[R, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E]].run(context) :: Nil
override def nonFusedEquivalentAction: DBIOAction[Seq[R], NoStream, E] = g.head.map(_ :: Nil)
}
} else g.head.map(_ :: Nil)
} else {
if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _, _]]) { // fuse synchronous group
new SynchronousDatabaseAction.Fused[Seq[R], NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext): IndexedSeq[R] = {
val b = new ArrayBuffer[R](g.length)
g.foreach(a => b += a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E]].run(context))
b.toIndexedSeq
}
override def nonFusedEquivalentAction: DBIOAction[Seq[R], NoStream, E] = SequenceAction[R, Seq[R], E](g)
}
} else SequenceAction[R, Seq[R], E](g)
}
}
val grouped = groupBySynchronicity[R, E](in.asInstanceOf[IterableOnce[DBIOAction[R, NoStream, E]]])
grouped.length match {
case 0 => DBIO.successful(cbf.newBuilder.result())
case 1 => sequenceGroupAsM(grouped.head)
case n =>
grouped.foldLeft(DBIO.successful(cbf.newBuilder): DBIOAction[mutable.Builder[R, M[R]], NoStream, E]) { (ar, g) =>
for (r <- ar; ge <- sequenceGroupAsSeq(g)) yield r ++= ge
} map (_.result())
}
}
/** A simpler version of `sequence` that takes a number of DBIOActions with any return type as
* varargs and returns a DBIOAction that performs the individual actions in sequence, returning
* `()` in the end. */
def seq[E <: Effect](actions: DBIOAction[_, NoStream, E]*): DBIOAction[Unit, NoStream, E] = {
def sequenceGroup(g: Vector[DBIOAction[Any, NoStream, E]], forceUnit: Boolean): DBIOAction[Any, NoStream, E] = {
if(g.length == 1 && !forceUnit) g.head
else if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _, _]]) sequenceSync(g)
else if(forceUnit) AndThenAction[Any, NoStream, E](g :+ DBIO.successful(()))
else AndThenAction[Any, NoStream, E](g)
}
def sequenceSync(g: Vector[DBIOAction[Any, NoStream, E]]): DBIOAction[Unit, NoStream, E] = {
new SynchronousDatabaseAction.Fused[Unit, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext) = {
g.foreach(_.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E]].run(context))
}
override def nonFusedEquivalentAction: DBIOAction[Unit, NoStream, E] = AndThenAction[Unit, NoStream, E](g)
}
}
if(actions.isEmpty) DBIO.successful(()) else {
val grouped = groupBySynchronicity[Any, E](actions :+ DBIO.successful(()))
grouped.length match {
case 1 => sequenceGroup(grouped.head, true).asInstanceOf[DBIOAction[Unit, NoStream, E]]
case n =>
val last = grouped.length - 1
val as = grouped.iterator.zipWithIndex.map { case (g, i) => sequenceGroup(g, i == last) }.toVector
AndThenAction[Unit, NoStream, E](as)
}
}
}
/** Create a DBIOAction that runs some other actions in sequence and combines their results
* with the given function. */
def fold[T, E <: Effect](actions: Seq[DBIOAction[T, NoStream, E]], zero: T)(f: (T, T) => T)(implicit ec: ExecutionContext): DBIOAction[T, NoStream, E] =
actions.foldLeft[DBIOAction[T, NoStream, E]](DBIO.successful(zero)) { (za, va) => za.flatMap(z => va.map(v => f(z, v))) }
/** A DBIOAction that pins the current session */
private[slick] object Pin extends SynchronousDatabaseAction[Unit, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect] {
def run(context: BasicBackend#BasicActionContext): Unit = context.pin
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Pin")
}
/** A DBIOAction that unpins the current session */
private[slick] object Unpin extends SynchronousDatabaseAction[Unit, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect] {
def run(context: BasicBackend#BasicActionContext): Unit = context.unpin
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Unpin")
}
/** An ExecutionContext used internally for executing plumbing operations during DBIOAction
* composition. */
private[slick] object sameThreadExecutionContext extends ExecutionContext {
private[this] val trampoline = new ThreadLocal[List[Runnable]]
private[this] def runTrampoline(first: Runnable): Unit = {
trampoline.set(Nil)
try {
var err: Throwable = null
var r = first
while(r ne null) {
try r.run() catch { case t: Throwable => err = t }
trampoline.get() match {
case r2 :: rest =>
trampoline.set(rest)
r = r2
case _ => r = null
}
}
if(err ne null) throw err
} finally trampoline.set(null)
}
override def execute(runnable: Runnable): Unit = trampoline.get() match {
case null => runTrampoline(runnable)
case r => trampoline.set(runnable :: r)
}
override def reportFailure(t: Throwable): Unit = throw t
}
}
/** A DBIOAction that represents a database operation. Concrete implementations are backend-specific. */
trait DatabaseAction[+R, +S <: NoStream, -E <: Effect] extends DBIOAction[R, S, E] {
override def isLogged = true
}
/** A DBIOAction that returns a constant value. */
case class SuccessAction[+R](value: R) extends SynchronousDatabaseAction[R, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect] {
def getDumpInfo = DumpInfo("success", String.valueOf(value))
def run(ctx: BasicBackend#BasicActionContext): R = value
}
/** A DBIOAction that fails. */
case class FailureAction(t: Throwable) extends SynchronousDatabaseAction[Nothing, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect] {
def getDumpInfo = DumpInfo("failure", String.valueOf(t))
def run(ctx: BasicBackend#BasicActionContext): Nothing = throw t
}
/** An asynchronous DBIOAction that returns the result of a Future. */
case class FutureAction[+R](f: Future[R]) extends DBIOAction[R, NoStream, Effect] {
def getDumpInfo = DumpInfo("future", String.valueOf(f))
override def isLogged = true
}
/** A DBIOAction that represents a `flatMap` operation for sequencing in the DBIOAction monad. */
case class FlatMapAction[+R, +S <: NoStream, P, -E <: Effect](base: DBIOAction[P, NoStream, E], f: P => DBIOAction[R, S, E], executor: ExecutionContext) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("flatMap", String.valueOf(f), children = Vector(("base", base)))
}
/** A DBIOAction that represents a `seq` or `andThen` operation for sequencing in the DBIOAction
* monad. Unlike `SequenceAction` it only keeps the last result. */
case class AndThenAction[R, +S <: NoStream, -E <: Effect](as: IndexedSeq[DBIOAction[Any, NoStream, E]]) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("andThen", children = as.zipWithIndex.map { case (a, i) => (String.valueOf(i+1), a) })
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case AndThenAction(as2) => AndThenAction[R2, S2, E with E2](as ++ as2)
case a => AndThenAction[R2, S2, E with E2](as :+ a)
}
}
/** A DBIOAction that represents a `sequence` or operation for sequencing in the DBIOAction monad. */
case class SequenceAction[R, +R2, -E <: Effect](as: IndexedSeq[DBIOAction[R, NoStream, E]])(implicit val cbf: Factory[R, R2]) extends DBIOAction[R2, NoStream, E] {
def getDumpInfo = DumpInfo("sequence", children = as.zipWithIndex.map { case (a, i) => (String.valueOf(i+1), a) })
}
/** A DBIOAction that represents a `cleanUp` operation for sequencing in the DBIOAction monad. */
case class CleanUpAction[+R, +S <: NoStream, -E <: Effect](base: DBIOAction[R, S, E], f: Option[Throwable] => DBIOAction[_, NoStream, E], keepFailure: Boolean, executor: ExecutionContext) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("cleanUp", children = Vector(("try", base)))
}
/** A DBIOAction that represents a `failed` operation. */
case class FailedAction[-E <: Effect](a: DBIOAction[_, NoStream, E]) extends DBIOAction[Throwable, NoStream, E] {
def getDumpInfo = DumpInfo("failed", children = Vector(("base", a)))
}
/** A DBIOAction that represents an `asTry` operation. */
case class AsTryAction[+R, -E <: Effect](a: DBIOAction[R, NoStream, E]) extends DBIOAction[Try[R], NoStream, E] {
def getDumpInfo = DumpInfo("asTry", children = Vector(("try", a)))
}
/** A DBIOAction that attaches a name for logging purposes to another action. */
case class NamedAction[+R, +S <: NoStream, -E <: Effect](a: DBIOAction[R, S, E], name: String) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("named", mainInfo = DumpInfo.highlight(name))
override def isLogged = true
}
/** The base trait for the context object passed to synchronous database actions by the execution
* engine. */
trait ActionContext {
private[this] var stickiness = 0
/** Check if the session is pinned. May only be called from a synchronous action context. */
final def isPinned = stickiness > 0
/** Pin the current session. Multiple calls to `pin` may be nested. The same number of calls
* to `unpin` is required in order to mark the session as not pinned anymore. A pinned
* session will not be released at the end of a primitive database action. Instead, the same
* pinned session is passed to all subsequent actions until it is unpinned. Note that pinning
* does not force an actual database connection to be opened. This still happens on demand.
* May only be called from a synchronous action context. */
final def pin: Unit = stickiness += 1
/** Unpin this session once. May only be called from a synchronous action context. */
final def unpin: Unit = stickiness -= 1
}
/** An ActionContext with extra functionality required for streaming DBIOActions. */
trait StreamingActionContext extends ActionContext {
/** Emit a single result of the stream. Any Exception thrown by this method should be passed on
* to the caller. */
def emit(v: Any): Unit
/** Get the Subscription for this stream. */
def subscription: Subscription
}
/** A synchronous database action provides a function from an `ActionContext` to the result
* type. `BasicBackend.DatabaseDef.run` supports this kind of action out of the box
* through `BasicBackend.DatabaseDef.runSynchronousDatabaseAction` so that `run` does not
* need to be extended if all primitive database actions can be expressed in this way. These
* actions also implement construction-time fusion for the `andFinally`, `andThen`, `asTry`,
* `failed`, `withPinnedSession` and `zip` operations.
*
* The execution engine ensures that an [[ActionContext]] is never used concurrently and that
* all state changes performed by one invocation of a SynchronousDatabaseAction are visible
* to the next invocation of the same or a different SynchronousDatabaseAction. */
trait SynchronousDatabaseAction[+R, +S <: NoStream, -C <: BasicBackend#BasicActionContext, -SC <: BasicBackend#BasicStreamingActionContext, -E <: Effect] extends DatabaseAction[R, S, E] { self =>
/** The type used by this action for the state of a suspended stream. A call to `emitStream`
* produces such a state which is then fed back into the next call. */
type StreamState >: Null <: AnyRef
/** Run this action synchronously and produce a result, or throw an Exception to indicate a
* failure. */
def run(context: C): R
/** Run this action synchronously and emit results to the context. This methods may throw an
* Exception to indicate a failure.
*
* @param limit The maximum number of results to emit, or Long.MaxValue for no limit.
* @param state The state returned by a previous invocation of this method, or `null` if
* a new stream should be produced.
* @return A stream state if there are potentially more results available, or null if the
* stream is finished. */
def emitStream(context: SC, limit: Long, state: StreamState): StreamState =
throw new SlickException("Internal error: Streaming is not supported by this Action")
/** Dispose of a `StreamState` when a streaming action is cancelled. Whenever `emitStream`
* returns `null` or throws an Exception, it needs to dispose of the state itself. This
* method will not be called in these cases. */
def cancelStream(context: SC, state: StreamState): Unit = ()
/** Whether or not this action supports streaming results. An action with a `Streaming` result
* type must either support streaming directly or have a [[nonFusedEquivalentAction]] which
* supports streaming. This flag is not used if the Action has a `NoStream` result type. */
def supportsStreaming: Boolean = true
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case a: SynchronousDatabaseAction.FusedAndThenAction[_, _, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, C, SC, E with E2](
self.asInstanceOf[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]] +:
a.as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]]])
case a: SynchronousDatabaseAction[_, _, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, C, SC, E with E2](
Vector(self.asInstanceOf[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]],
a.asInstanceOf[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]]))
case a => super.andThen[R2, S2, E2](a)
}
private[this] def superZip[R2, E2 <: Effect](a: DBIOAction[R2, NoStream, E2]) = super.zip[R2, E2](a)
override def zip[R2, E2 <: Effect](a: DBIOAction[R2, NoStream, E2]): DBIOAction[(R, R2), NoStream, E with E2] = a match {
case a: SynchronousDatabaseAction[_, _, _, _, _] => new SynchronousDatabaseAction.Fused[(R, R2), NoStream, C, SC, E with E2] {
def run(context: C): (R, R2) = {
val r1 = self.run(context)
val r2 = a.asInstanceOf[SynchronousDatabaseAction[R2, NoStream, C, SC, E2]].run(context)
(r1, r2)
}
override def nonFusedEquivalentAction: DBIOAction[(R, R2), NoStream, E with E2] = superZip(a)
}
case a => superZip(a)
}
private[this] def superAndFinally[E2 <: Effect](a: DBIOAction[_, NoStream, E2]) = super.andFinally[E2](a)
override def andFinally[E2 <: Effect](a: DBIOAction[_, NoStream, E2]): DBIOAction[R, S, E with E2] = a match {
case a: SynchronousDatabaseAction[_, _, _, _, _] => new SynchronousDatabaseAction.Fused[R, S, C, SC, E with E2] {
def run(context: C): R = {
val res = try self.run(context) catch {
case NonFatal(ex) =>
try a.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, C, SC, E2]].run(context) catch ignoreFollowOnError
throw ex
}
a.asInstanceOf[SynchronousDatabaseAction[Any, S, C, SC, E2]].run(context)
res
}
override def nonFusedEquivalentAction: DBIOAction[R, S, E with E2] = superAndFinally(a)
}
case a => superAndFinally(a)
}
private[this] def superWithPinnedSession = super.withPinnedSession
override def withPinnedSession: DBIOAction[R, S, E] = new SynchronousDatabaseAction.Fused[R, S, C, SC, E] {
def run(context: C): R = {
context.pin
val res = try self.run(context) catch {
case NonFatal(ex) =>
context.unpin
throw ex
}
context.unpin
res
}
override def nonFusedEquivalentAction = superWithPinnedSession
}
private[this] def superFailed: DBIOAction[Throwable, NoStream, E] = super.failed
override def failed: DBIOAction[Throwable, NoStream, E] = new SynchronousDatabaseAction.Fused[Throwable, NoStream, C, SC, E] {
def run(context: C): Throwable = {
var ok = false
try {
self.run(context)
ok = true
throw new NoSuchElementException("Action.failed (fused) not completed with a Throwable")
} catch {
case NonFatal(ex) if !ok => ex
}
}
override def nonFusedEquivalentAction = superFailed
}
private[this] def superAsTry: DBIOAction[Try[R], NoStream, E] = super.asTry
override def asTry: DBIOAction[Try[R], NoStream, E] = new SynchronousDatabaseAction.Fused[Try[R], NoStream, C, SC, E] {
def run(context: C): Try[R] = {
try Success(self.run(context)) catch {
case NonFatal(ex) => Failure(ex)
}
}
override def nonFusedEquivalentAction = superAsTry
}
}
object SynchronousDatabaseAction {
/** A fused SynchronousDatabaseAction */
trait Fused[+R, +S <: NoStream, C <: BasicBackend#BasicActionContext, SC <: BasicBackend#BasicStreamingActionContext, -E <: Effect] extends SynchronousDatabaseAction[R, S, C, SC, E] {
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Fused", children = Vector(("non-fused", nonFusedEquivalentAction)))
override def supportsStreaming: Boolean = false
}
class FusedAndThenAction[+R, +S <: NoStream, C <: BasicBackend#BasicActionContext, SC <: BasicBackend#BasicStreamingActionContext, -E <: Effect](val as: IndexedSeq[SynchronousDatabaseAction[Any, S, C, SC, E]]) extends Fused[R, S, C, SC, E] {
def run(context: C): R = {
var res: Any = null
as.foreach(a => res = a.run(context))
res.asInstanceOf[R]
}
override def nonFusedEquivalentAction: DBIOAction[R, S, E] = AndThenAction[R, S, E](as)
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case a: SynchronousDatabaseAction.FusedAndThenAction[_, _, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, C, SC, E with E2](
as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]]] ++
a.as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]]])
case a: SynchronousDatabaseAction[_, _, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, C, SC, E with E2](
as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]]] :+
a.asInstanceOf[SynchronousDatabaseAction[Any, S2, C, SC, E with E2]])
case a => super.andThen(a)
}
}
/** Fuse `flatMap` / `map`, `cleanUp` and `filter` / `withFilter` combinators if they use
* `DBIO.sameThreadExecutionContext` and produce a `SynchronousDatabaseAction` in their
* evaluation function (where applicable). This cannot be verified at fusion time, so a wrongly
* fused action can fail with a `ClassCastException` during evaluation. */
private[slick] def fuseUnsafe[R, S <: NoStream, E <: Effect](a: DBIOAction[R, S, E]): DBIOAction[R, S, E] = {
a match {
case FlatMapAction(base: SynchronousDatabaseAction[_, _, _, _, _], f, ec) if ec eq DBIO.sameThreadExecutionContext =>
new SynchronousDatabaseAction.Fused[R, S, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext): R = {
val b = base.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect]].run(context)
val a2 = f.asInstanceOf[Any => SynchronousDatabaseAction[R, S, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E]](b)
a2.run(context)
}
override def nonFusedEquivalentAction = a
}
case CleanUpAction(base: SynchronousDatabaseAction[_, _, _, _, _], f, keepFailure, ec) if ec eq DBIO.sameThreadExecutionContext =>
new SynchronousDatabaseAction.Fused[R, S, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, E] {
def run(context: BasicBackend#BasicActionContext): R = {
val res = try {
base.asInstanceOf[SynchronousDatabaseAction[R, S, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect]].run(context)
} catch { case NonFatal(ex) =>
try {
val a2 = f(Some(ex))
a2.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect]].run(context)
} catch { case NonFatal(_) if keepFailure => () }
throw ex
}
val a2 = f(None)
a2.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, BasicBackend#BasicActionContext, BasicBackend#BasicStreamingActionContext, Effect]].run(context)
res
}
override def nonFusedEquivalentAction = a
}
case a => a
}
}
}