-
-
Notifications
You must be signed in to change notification settings - Fork 609
/
DistributedBackend.scala
82 lines (66 loc) · 3.1 KB
/
DistributedBackend.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package slick.memory
import com.typesafe.config.Config
import org.reactivestreams.Subscriber
import scala.collection.compat._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.util.{Failure, Try}
import slick.SlickException
import slick.relational.RelationalBackend
import slick.basic.BasicBackend
import slick.util.Logging
/** The backend for DistributedProfile. */
trait DistributedBackend extends RelationalBackend with Logging {
type This = DistributedBackend
type Database = DatabaseDef
type Session = SessionDef
type DatabaseFactory = DatabaseFactoryDef
type Context = BasicActionContext
type StreamingContext = BasicStreamingActionContext
val Database = new DatabaseFactoryDef
val backend: DistributedBackend = this
def createDatabase(config: Config, path: String): Database =
throw new SlickException("DistributedBackend cannot be configured with an external config file")
class DatabaseDef(val dbs: Vector[BasicBackend#DatabaseDef], val executionContext: ExecutionContext) extends super.DatabaseDef {
protected[this] def createDatabaseActionContext[T](_useSameThread: Boolean): Context =
new BasicActionContext { val useSameThread = _useSameThread }
protected[this] def createStreamingDatabaseActionContext[T](s: Subscriber[_ >: T], useSameThread: Boolean): StreamingContext =
new BasicStreamingActionContext(s, useSameThread, DatabaseDef.this)
def createSession(): Session = {
val sessions = new ArrayBuffer[BasicBackend#Session]
for(db <- dbs)
sessions += Try(db.createSession()).recoverWith { case ex =>
sessions.reverseIterator.foreach { s => Try(s.close()) }
Failure(ex)
}.get
new SessionDef(sessions.toVector)
}
protected[this] val synchronousExecutionContext: ExecutionContext = new ExecutionContext {
def reportFailure(t: Throwable): Unit = executionContext.reportFailure(t)
def execute(runnable: Runnable): Unit = executionContext.execute(new Runnable {
def run(): Unit = blocking(runnable.run)
})
}
override def shutdown: Future[Unit] = Future.successful(())
def close: Unit = ()
}
class DatabaseFactoryDef {
/** Create a new distributed database instance that uses the supplied ExecutionContext for
* asynchronous execution of database actions. */
def apply(dbs: IterableOnce[BasicBackend#DatabaseDef], executionContext: ExecutionContext): Database =
new DatabaseDef(dbs.iterator.toVector, executionContext)
}
class SessionDef(val sessions: Vector[BasicBackend#Session]) extends super.SessionDef {
def close(): Unit = {
sessions.map(s => Try(s.close())).collectFirst{ case Failure(t) => t }.foreach(throw _)
}
def rollback() =
throw new SlickException("DistributedBackend does not currently support transactions")
def force(): Unit = {
sessions.foreach(_.force())
}
def withTransaction[T](f: => T) =
throw new SlickException("DistributedBackend does not currently support transactions")
}
}
object DistributedBackend extends DistributedBackend