-
Notifications
You must be signed in to change notification settings - Fork 157
/
Transaction.scala
208 lines (177 loc) · 7.22 KB
/
Transaction.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Copyright (c) 2018 by Rob Norris
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package skunk
import cats._
import cats.effect.Resource
import cats.effect.ExitCase
import cats.effect.ExitCase._
import cats.implicits._
import skunk.implicits._
import skunk.data.Completion
import skunk.data.TransactionStatus._
import skunk.util.Origin
import skunk.util.CallSite
import skunk.exception.SkunkException
import skunk.util.Namer
import skunk.data.TransactionStatus
/**
* Control methods for use within a `transaction` block. An instance is provided when you call
* `Session.transaction(...).use`.
* @see Session#transaction for information on default commit/rollback behavior
*
* @groupname XA Transaction Control
*/
trait Transaction[F[_]] { outer =>
/** Existential type for savepoints within this transaction block. */
type Savepoint
/**
* Current transaction status. It is not usually necessary to check because transactions will be
* committed or rolled back automatically, but if you are committing manually and your logic is
* sufficiently complex it may be helpful.
* @group XA
*/
def status: F[TransactionStatus]
/**
* Create a `Savepoint`, to which you can later roll back.
* @group XA
*/
def savepoint(implicit o: Origin): F[Savepoint]
/**
* Roll back to the specified `Savepoint`, leaving the transaction active at that point.
* @group XA
*/
def rollback(savepoint: Savepoint)(implicit o: Origin): F[Completion]
/**
* Terminate the transaction by rolling back. This is normally not necessary because a transaction
* will be rolled back automatically when the block exits abnormally.
* @see Session#transaction for information on default commit/rollback behavior
* @group XA
*/
def rollback(implicit o: Origin): F[Completion]
/**
* Terminate the transaction by committing early. This is normally not necessary because a
* transaction will be committed automatically if the block exits successfully.
* @see Session#transaction for information on default commit/rollback behavior
* @group XA
*/
def commit(implicit o: Origin): F[Completion]
/**
* Transform this `Transaction` by a given `FunctionK`.
* @group Transformations
*/
def mapK[G[_]](fk: F ~> G): Transaction[G] =
new Transaction[G] {
override type Savepoint = outer.Savepoint
override def commit(implicit o: Origin): G[Completion] = fk(outer.commit)
override def rollback(implicit o: Origin): G[Completion] = fk(outer.rollback)
override def rollback(savepoint: Savepoint)(implicit o: Origin): G[Completion] = fk(outer.rollback(savepoint))
override def savepoint(implicit o: Origin): G[Savepoint] = fk(outer.savepoint)
override def status: G[TransactionStatus] = fk(outer.status)
}
}
object Transaction {
def fromSession[F[_]: MonadError[?[_], Throwable]](
s: Session[F],
n: Namer[F]
// o: Origin // origin of the call to .begin
// also need to take an isolation level
): Resource[F, Transaction[F]] = {
def assertIdle(cs: CallSite): F[Unit] =
s.transactionStatus.get.flatMap {
case Idle => ().pure[F]
case Active =>
new SkunkException(
sql = None,
message = "Nested transactions are not allowed.",
hint = Some("You must roll back or commit the current transaction before starting a new one."),
callSite = Some(cs)
).raiseError[F, Unit]
case Failed =>
new SkunkException(
sql = None,
message = "Nested transactions are not allowed.",
hint = Some("You must roll back the current (failed) transaction before starting a new one."),
callSite = Some(cs)
).raiseError[F, Unit]
}
def assertActive(cs: CallSite): F[Unit] =
s.transactionStatus.get.flatMap {
case Idle =>
new SkunkException(
sql = None,
message = "No transaction.",
hint = Some("The transaction has already been committed or rolled back."),
callSite = Some(cs)
).raiseError[F, Unit]
case Active => ().pure[F]
case Failed =>
new SkunkException(
sql = None,
message = "Transaction has failed.",
hint = Some("""
|The active transaction has failed and needs to be rolled back (either entirely or to
|a prior savepoint) before you can continue. The most common explanation is that
|Postgres raised an error earlier in the transaction and you handled it in your
|application code, but you forgot to roll back.
""".trim.stripMargin.replace('\n', ' ')),
callSite = Some(cs)
).raiseError[F, Unit]
}
def assertActiveOrError(cs: CallSite): F[Unit] =
cs.pure[F].void
def doRollback: F[Completion] =
s.execute(internal"ROLLBACK".command)
def doCommit: F[Completion] =
s.execute(internal"COMMIT".command)
val acquire: F[Transaction[F]] =
assertIdle(CallSite("begin", Origin.unknown)) *>
s.execute(internal"BEGIN".command).map { _ =>
new Transaction[F] {
override type Savepoint = String
override def status: F[TransactionStatus] =
s.transactionStatus.get
override def commit(implicit o: Origin): F[Completion] =
assertActive(o.toCallSite("commit")) *>
doCommit
override def rollback(implicit o: Origin): F[Completion] =
assertActiveOrError(o.toCallSite("rollback")) *>
doRollback
override def rollback(savepoint: Savepoint)(implicit o: Origin): F[Completion] =
assertActiveOrError(o.toCallSite("savepoint")) *>
s.execute(internal"ROLLBACK TO ${savepoint}".command)
override def savepoint(implicit o: Origin): F[Savepoint] =
for {
_ <- assertActive(o.toCallSite("savepoint"))
i <- n.nextName("savepoint")
_ <- s.execute(internal"SAVEPOINT $i".command)
} yield i
}
}
val release: (Transaction[F], ExitCase[Throwable]) => F[Unit] = (_, ec) =>
s.transactionStatus.get.flatMap {
case Idle =>
// This means the user committed manually, so there's nothing to do
().pure[F]
case Failed =>
ec match {
// This is the normal failure case
case Error(t) => doRollback *> t.raiseError[F, Unit]
// This is possible if you swallow an error
case Completed => doRollback.void
// This is possible if you swallow an error and the someone cancels the fiber
case Canceled => doRollback.void
}
case Active =>
ec match {
// This is the normal success case
case Completed => doCommit.void
// If someone cancels the fiber we roll back
case Canceled => doRollback.void
// If an error escapes we roll back
case Error(t) => doRollback *> t.raiseError[F, Unit]
}
}
Resource.makeCase(acquire)(release)
}
}