-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathtransactions.go
130 lines (115 loc) · 3.86 KB
/
transactions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package db
import (
"context"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/circleci/ex/o11y"
)
type TxManager struct {
db *sqlx.DB
// This is only for testing purposes
testQuerier func(Querier) Querier
}
func NewTxManager(db *sqlx.DB) *TxManager {
return &TxManager{
db: db,
}
}
func NewTxManagerWithTestQuerier(db *sqlx.DB, tq func(Querier) Querier) *TxManager {
return &TxManager{
db: db,
testQuerier: tq,
}
}
type queryFn func(context.Context, Querier) error
// WithTx wraps f in an explicit o11y'd transaction, handling rollback
// if f returns an error. It will retry the transaction a few times in the face of
// ErrBadConn errors.
// The length here is due to the internalised func, which we want to encapsulate
// to avoid reuse, since it is highly coupled to the retry behaviour.
//
//nolint:funlen
func (t *TxManager) WithTx(ctx context.Context, f queryFn) (err error) {
// Set up the main transaction function that we will retry on ErrBadCon
transaction := func() (err error) {
tx, err := t.db.BeginTxx(ctx, nil)
if err != nil {
_, err = mapBadCon(err)
return fmt.Errorf("begin transaction: %w", err)
}
// This defer is to catch any error from the call to f to decide if we should commit
// or rollback the transaction.
// The defer stacking rules mean that this will be called before the bad connection
// defer above.
defer func() {
p := recover()
switch {
case p != nil:
// a panic occurred, attempt a rollback and re-panic
// (it may already be rolled-back, so ignore this error)
_ = tx.Rollback()
panic(p)
case badConn(err):
// We can't do anything else with a bad connection, and the db server
// will already have rolled back
return
case err != nil:
// Never commit on an error.
// But don't roll back if the transaction context is Done
// (the library code already handles rollback in the context Done cases)
// This check is in case f returned an err different from the context error
if ctx.Err() != nil {
return
}
// something other than a context cancel went wrong, rollback and report any
// error on rollback, an ErrBadCon is safe here since the server will have rolled back
if rErr := tx.Rollback(); rErr != nil {
o11y.AddField(ctx, "rollback_error", rErr)
}
case ctx.Err() != nil:
// This case is if f suppressed an error but the transaction ctx is still Done
// even if f appeared to have not seen any error we report the context cancellation
// so the calling code will at least be able to be aware that the transaction was
// rolled back
err = ctx.Err()
default:
// All good, commit
err = tx.Commit()
// Specifically trap the bad connection here which will allow a retry
_, err = mapBadCon(err)
if err != nil {
err = fmt.Errorf("commit transaction: %w", err)
}
// N.B there is no need for an explicit rollback - the db server automatically rolls back
// transactions where the connection (or session) is dropped (ErrBadConn).
}
}()
// Use the error wrapped transaction so that the common errors can be reported as warnings in any
// spans used in f
var q Querier = unifiedQuerier{q: eTx{tx}}
if t.testQuerier != nil {
q = t.testQuerier(eTx{tx})
}
err = f(ctx, q) // This err will be mapped in the unifiedQuerier wrapper
// Note that the above defer can reassign err
return err
}
// Attempt the transaction a few times.
// (More than 3 ErrBadCon errors is going to be very unlikely.)
for i := 0; i < 3; i++ {
err = transaction()
// We can only retry on bad connection errors
if !badConn(err) {
break
}
}
return err
}
func (t *TxManager) NoTx() Querier {
return unifiedQuerier{q: eDB{t.db}}
}
// WithTransaction simply delegates to WithTx.
// Deprecated
func (t *TxManager) WithTransaction(ctx context.Context, f queryFn) (err error) {
return t.WithTx(ctx, f)
}