-
Notifications
You must be signed in to change notification settings - Fork 348
/
transactor.scala
440 lines (386 loc) · 17.6 KB
/
transactor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
// Copyright (c) 2013-2020 Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package doobie.util
import doobie.free.connection.{ConnectionIO, ConnectionOp, commit, rollback, setAutoCommit, unit}
import doobie.free.KleisliInterpreter
import doobie.util.lens._
import doobie.util.yolo.Yolo
import cats.{Applicative, Defer, Monad, ~>}
import cats.data.Kleisli
import cats.effect.{Async, Blocker, Bracket, ContextShift, ExitCase, Resource, Sync}
import cats.syntax.show._
import fs2.Stream
import java.sql.{Connection, DriverManager}
import javax.sql.DataSource
import java.util.concurrent.{Executors, ThreadFactory}
import scala.concurrent.ExecutionContext
object transactor {
import doobie.free.connection.AsyncConnectionIO
/** @group Type Aliases */
type Interpreter[M[_]] = ConnectionOp ~> Kleisli[M, Connection, *]
/**
* Data type representing the common setup, error-handling, and cleanup strategy associated with
* an SQL transaction. A `Transactor` uses a `Strategy` to wrap programs prior to execution.
* @param before a program to prepare the connection for use
* @param after a program to run on success
* @param oops a program to run on failure (catch)
* @param always a program to run in all cases (finally)
* @group Data Types
*/
final case class Strategy(
before: ConnectionIO[Unit],
after: ConnectionIO[Unit],
oops: ConnectionIO[Unit],
always: ConnectionIO[Unit]
) {
val resource: Resource[ConnectionIO, Unit] = for {
_ <- Resource.make(doobie.FC.unit)(_ => always)
_ <- Resource.makeCase(before) { case (_, exitCase) =>
exitCase match {
case ExitCase.Completed => after
case ExitCase.Error(_) | ExitCase.Canceled => oops
}
}
} yield ()
}
object Strategy {
/** @group Lenses */ val before: Strategy @> ConnectionIO[Unit] = Lens(_.before, (a, b) => a.copy(before = b))
/** @group Lenses */ val after: Strategy @> ConnectionIO[Unit] = Lens(_.after, (a, b) => a.copy(after = b))
/** @group Lenses */ val oops: Strategy @> ConnectionIO[Unit] = Lens(_.oops, (a, b) => a.copy(oops = b))
/** @group Lenses */ val always: Strategy @> ConnectionIO[Unit] = Lens(_.always, (a, b) => a.copy(always = b))
/**
* A default `Strategy` with the following properties:
* - Auto-commit will be set to `false`;
* - the transaction will `commit` on success and `rollback` on failure;
* @group Constructors
*/
val default = Strategy(setAutoCommit(false), commit, rollback, unit)
/**
* A no-op `Strategy`. All actions simply return `()`.
* @group Constructors
*/
val void = Strategy(unit, unit, unit, unit)
}
/**
* A thin wrapper around a source of database connections, an interpreter, and a strategy for
* running programs, parameterized over a target monad `M` and an arbitrary wrapped value `A`.
* Given a stream or program in `ConnectionIO` or a program in `Kleisli`, a `Transactor` can
* discharge the doobie machinery and yield an effectful stream or program in `M`.
* @tparam M a target effect type; typically `IO`
* @group Data Types
*/
sealed abstract class Transactor[M[_]] { self =>
/** An arbitrary value that will be handed back to `connect` **/
type A
/** An arbitrary value, meaningful to the instance **/
def kernel: A
/** A program in `M` that can provide a database connection, given the kernel **/
def connect: A => Resource[M, Connection]
/** A natural transformation for interpreting `ConnectionIO` **/
def interpret: Interpreter[M]
/** A `Strategy` for running a program on a connection **/
def strategy: Strategy
/** Construct a [[Yolo]] for REPL experimentation. */
def yolo(implicit ev1: Sync[M]): Yolo[M] = new Yolo(this)
/**
* Construct a program to perform arbitrary configuration on the kernel value (changing the
* timeout on a connection pool, for example). This can be the basis for constructing a
* configuration language for a specific kernel type `A`, whose operations can be added to
* compatible `Transactor`s via implicit conversion.
* @group Configuration
*/
def configure[B](f: A => M[B]): M[B] =
f(kernel)
/**
* Natural transformation equivalent to `exec` that does not use the provided `Strategy` and
* instead directly binds the `Connection` provided by `connect`. This can be useful in cases
* where transactional handling is unsupported or undesired.
* @group Natural Transformations
*/
def rawExec(implicit ev: Bracket[M, Throwable]): Kleisli[M, Connection, *] ~> M =
new (Kleisli[M, Connection, *] ~> M) {
def apply[T](k: Kleisli[M, Connection, T]): M[T] = connect(kernel).use(k.run)
}
/**
* Natural transformation that provides a connection and binds through a `Kleisli` program
* using the given `Strategy`, yielding an independent program in `M`.
* @group Natural Transformations
*/
def exec(implicit ev: Bracket[M, Throwable], D: Defer[M]): Kleisli[M, Connection, *] ~> M =
new (Kleisli[M, Connection, *] ~> M) {
def apply[T](ka: Kleisli[M, Connection, T]): M[T] =
connect(kernel).use { conn =>
strategy.resource.mapK(run(conn)).use { _ =>
ka.run(conn)
}
}
}
/**
* Natural transformation equivalent to `trans` that does not use the provided `Strategy` and
* instead directly binds the `Connection` provided by `connect`. This can be useful in cases
* where transactional handling is unsupported or undesired.
* @group Natural Transformations
*/
def rawTrans(implicit ev: Bracket[M, Throwable]): ConnectionIO ~> M =
new (ConnectionIO ~> M) {
def apply[T](f: ConnectionIO[T]): M[T] =
connect(kernel).use { conn =>
f.foldMap(interpret).run(conn)
}
}
/**
* Natural transformation that provides a connection and binds through a `ConnectionIO` program
* interpreted via the given interpreter, using the given `Strategy`, yielding an independent
* program in `M`. This is the most common way to run a doobie program.
* @group Natural Transformations
*/
def trans(implicit ev: Bracket[M, Throwable]): ConnectionIO ~> M =
new (ConnectionIO ~> M) {
def apply[T](f: ConnectionIO[T]): M[T] =
connect(kernel).use { conn =>
strategy.resource.use(_ => f).foldMap(interpret).run(conn)
}
}
def rawTransP(implicit ev: Monad[M]): Stream[ConnectionIO, *] ~> Stream[M, *] =
new (Stream[ConnectionIO, *] ~> Stream[M, *]) {
def apply[T](s: Stream[ConnectionIO, T]) =
Stream.resource(connect(kernel)).flatMap { conn =>
s.translate(run(conn))
}.scope
}
def transP(implicit ev: Monad[M]): Stream[ConnectionIO, *] ~> Stream[M, *] =
new (Stream[ConnectionIO, *] ~> Stream[M, *]) {
def apply[T](s: Stream[ConnectionIO, T]) =
Stream.resource(connect(kernel)).flatMap { c =>
Stream.resource(strategy.resource).flatMap(_ => s).translate(run(c))
}.scope
}
def rawTransPK[I](implicit ev: Monad[M]): Stream[Kleisli[ConnectionIO, I, *], *] ~> Stream[Kleisli[M, I, *], *] =
new (Stream[Kleisli[ConnectionIO, I, *], *] ~> Stream[Kleisli[M, I, *], *]) {
def apply[T](s: Stream[Kleisli[ConnectionIO, I, *], T]) =
Stream.resource(connect(kernel)).translate(Kleisli.liftK[M, I]).flatMap { c =>
s.translate(runKleisli[I](c))
}.scope
}
def transPK[I](implicit ev: Monad[M]): Stream[Kleisli[ConnectionIO, I, *], *] ~> Stream[Kleisli[M, I, *], *] =
new (Stream[Kleisli[ConnectionIO, I, *], *] ~> Stream[Kleisli[M, I, *], *]) {
def apply[T](s: Stream[Kleisli[ConnectionIO, I, *], T]) =
Stream.resource(connect(kernel)).translate(Kleisli.liftK[M, I]).flatMap { c =>
Stream.resource(strategy.resource.mapK(Kleisli.liftK[ConnectionIO, I])).flatMap(_ => s)
.translate(runKleisli[I](c))
}.scope
}
private def run(c: Connection)(implicit ev: Monad[M]): ConnectionIO ~> M =
new (ConnectionIO ~> M) {
def apply[T](f: ConnectionIO[T]) =
f.foldMap(interpret).run(c)
}
private def runKleisli[B](c: Connection)(implicit ev: Monad[M]): Kleisli[ConnectionIO, B, *] ~> Kleisli[M, B, *] =
new (Kleisli[ConnectionIO, B, *] ~> Kleisli[M, B, *]) {
def apply[T](f: Kleisli[ConnectionIO, B, T]) =
Kleisli(f.run(_).foldMap(interpret).run(c))
}
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
def copy(
kernel0: A = self.kernel,
connect0: A => Resource[M, Connection] = self.connect,
interpret0: Interpreter[M] = self.interpret,
strategy0: Strategy = self.strategy
): Transactor.Aux[M, A] = new Transactor[M] {
type A = self.A
val kernel = kernel0
val connect = connect0
val interpret = interpret0
val strategy = strategy0
}
/*
* Convert the effect type of this transactor from M to M0
*/
def mapK[M0[_]](fk: M ~> M0)(implicit D: Defer[M0], A: Applicative[M0]): Transactor.Aux[M0, A] =
Transactor[M0, A](
kernel,
connect.andThen(_.mapK(fk)),
interpret.andThen(
new (Kleisli[M, Connection, *] ~> Kleisli[M0, Connection, *]) {
def apply[T](f: Kleisli[M, Connection, T]) = f.mapK(fk)
}
),
strategy
)
}
object Transactor {
def apply[M[_], A0](
kernel0: A0,
connect0: A0 => Resource[M, Connection],
interpret0: Interpreter[M],
strategy0: Strategy
): Transactor.Aux[M, A0] = new Transactor[M] {
type A = A0
val kernel = kernel0
val connect = connect0
val interpret = interpret0
val strategy = strategy0
}
type Aux[M[_], A0] = Transactor[M] { type A = A0 }
/** @group Lenses */ def kernel [M[_], A]: Transactor.Aux[M, A] Lens A = Lens(_.kernel, (a, b) => a.copy(kernel0 = b))
/** @group Lenses */ def connect [M[_], A]: Transactor.Aux[M, A] Lens (A => Resource[M, Connection]) = Lens(_.connect, (a, b) => a.copy(connect0 = b))
/** @group Lenses */ def interpret[M[_]]: Transactor[M] Lens Interpreter[M] = Lens(_.interpret, (a, b) => a.copy(interpret0 = b))
/** @group Lenses */ def strategy [M[_]]: Transactor[M] Lens Strategy = Lens(_.strategy, (a, b) => a.copy(strategy0 = b))
/** @group Lenses */ def before [M[_]]: Transactor[M] Lens ConnectionIO[Unit] = strategy[M] >=> Strategy.before
/** @group Lenses */ def after [M[_]]: Transactor[M] Lens ConnectionIO[Unit] = strategy[M] >=> Strategy.after
/** @group Lenses */ def oops [M[_]]: Transactor[M] Lens ConnectionIO[Unit] = strategy[M] >=> Strategy.oops
/** @group Lenses */ def always [M[_]]: Transactor[M] Lens ConnectionIO[Unit] = strategy[M] >=> Strategy.always
/**
* Construct a constructor of `Transactor[M, D]` for some `D <: DataSource` by partial
* application of `M`, which cannot be inferred in general. This follows the pattern described
* [here](http://tpolecat.github.io/2015/07/30/infer.html).
* @group Constructors
*/
object fromDataSource {
def apply[M[_]] = new FromDataSourceUnapplied[M]
/**
* Constructor of `Transactor[M, D]` fixed for `M`; see the `apply` method for details.
* @group Constructors (Partially Applied)
*/
class FromDataSourceUnapplied[M[_]] {
def apply[A <: DataSource](
dataSource: A,
connectEC: ExecutionContext,
blocker: Blocker
)(implicit ev: Async[M],
cs: ContextShift[M]
): Transactor.Aux[M, A] = {
val connect = (dataSource: A) => {
val acquire = cs.evalOn(connectEC)(ev.delay(dataSource.getConnection))
def release(c: Connection) = blocker.blockOn(ev.delay(c.close()))
Resource.make(acquire)(release)
}
val interp = KleisliInterpreter[M](blocker).ConnectionInterpreter
Transactor(dataSource, connect, interp, Strategy.default)
}
}
}
/**
* Construct a `Transactor` that wraps an existing `Connection`. Closing the connection is the
* responsibility of the caller.
* @param connection a raw JDBC `Connection` to wrap
* @param blocker for blocking database operations
* @group Constructors
*/
def fromConnection[M[_]: Async: ContextShift](
connection: Connection,
blocker: Blocker
): Transactor.Aux[M, Connection] = {
val connect = (c: Connection) => Resource.pure[M, Connection](c)
val interp = KleisliInterpreter[M](blocker).ConnectionInterpreter
Transactor(connection, connect, interp, Strategy.default)
}
/**
* Module of constructors for `Transactor` that use the JDBC `DriverManager` to allocate
* connections. Note that `DriverManager` is unbounded and will happily allocate new connections
* until server resources are exhausted. It is usually preferable to use `DataSourceTransactor`
* with an underlying bounded connection pool (as with `H2Transactor` and `HikariTransactor` for
* instance). Blocking operations on `DriverManagerTransactor` are executed on an unbounded
* cached daemon thread pool by default, so you are also at risk of exhausting system threads.
* TL;DR this is fine for console apps but don't use it for a web application.
* @group Constructors
*/
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
object fromDriverManager {
// An unbounded cached pool of daemon threads.
private lazy val defaultBlocker: Blocker = Blocker.liftExecutionContext {
ExecutionContext.fromExecutor(Executors.newCachedThreadPool(
new ThreadFactory {
def newThread(r: Runnable): Thread = {
val th = new Thread(r)
th.setName(show"doobie-fromDriverManager-pool-${th.getId}")
th.setDaemon(true)
th
}
}
))
}
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
private def create[M[_]](
driver: String,
conn: () => Connection,
strategy: Strategy,
blocker: Blocker
)(implicit am: Async[M], cs: ContextShift[M]): Transactor.Aux[M, Unit] =
Transactor(
(),
_ => {
val acquire = blocker.blockOn(am.delay { Class.forName(driver); conn() })
def release(c: Connection) = blocker.blockOn(am.delay { c.close() })
Resource.make(acquire)(release)
},
KleisliInterpreter[M](blocker).ConnectionInterpreter,
strategy
)
def apply[M[_]: Async: ContextShift](
driver: String,
url: String,
blocker: Blocker
): Transactor.Aux[M, Unit] =
create(driver, () => DriverManager.getConnection(url), Strategy.default, blocker)
/**
* Construct a new `Transactor` that uses the JDBC `DriverManager` to allocate connections.
* @param driver the class name of the JDBC driver, like "org.h2.Driver"
* @param url a connection URL, specific to your driver
*/
def apply[M[_]: Async: ContextShift](
driver: String,
url: String
): Transactor.Aux[M, Unit] =
apply(driver, url, defaultBlocker)
def apply[M[_]: Async: ContextShift](
driver: String,
url: String,
user: String,
pass: String,
blocker: Blocker
): Transactor.Aux[M, Unit] =
create(driver, () => DriverManager.getConnection(url, user, pass), Strategy.default, blocker)
/**
* Construct a new `Transactor` that uses the JDBC `DriverManager` to allocate connections.
* @param driver the class name of the JDBC driver, like "org.h2.Driver"
* @param url a connection URL, specific to your driver
* @param user database username
* @param pass database password
*/
def apply[M[_]: Async: ContextShift](
driver: String,
url: String,
user: String,
pass: String
): Transactor.Aux[M, Unit] =
apply(driver, url, user, pass, defaultBlocker)
/**
* Construct a new `Transactor` that uses the JDBC `DriverManager` to allocate connections.
* @param driver the class name of the JDBC driver, like "org.h2.Driver"
* @param url a connection URL, specific to your driver
* @param info a `Properties` containing connection information (see `DriverManager.getConnection`)
*/
def apply[M[_]: Async: ContextShift](
driver: String,
url: String,
info: java.util.Properties,
blocker: Blocker
): Transactor.Aux[M, Unit] =
create(driver, () => DriverManager.getConnection(url, info), Strategy.default, blocker)
/**
* Construct a new `Transactor` that uses the JDBC `DriverManager` to allocate connections.
* @param driver the class name of the JDBC driver, like "org.h2.Driver"
* @param url a connection URL, specific to your driver
* @param info a `Properties` containing connection information (see `DriverManager.getConnection`)
*/
def apply[M[_]: Async: ContextShift](
driver: String,
url: String,
info: java.util.Properties
): Transactor.Aux[M, Unit] =
apply(driver, url, info, defaultBlocker)
}
}
}