-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
transaction.go
145 lines (126 loc) · 4.09 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package pg
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/getsentry/sentry-go"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/smartcontractkit/sqlx"
"github.com/smartcontractkit/chainlink-relay/pkg/logger"
corelogger "github.com/smartcontractkit/chainlink/v2/core/logger"
)
type TxOptions struct {
sql.TxOptions
LockTimeout time.Duration
IdleInTxSessionTimeout time.Duration
}
// NOTE: In an ideal world the timeouts below would be set to something sane in
// the postgres configuration by the user. Since we do not live in an ideal
// world, it is necessary to override them here.
//
// They cannot easily be set at a session level due to how Go's connection
// pooling works.
const (
// NOTE: This is the default level in Postgres anyway, we just make it
// explicit here
DefaultIsolation = sql.LevelReadCommitted
)
func OptReadOnlyTx() TxOptions {
return TxOptions{TxOptions: sql.TxOptions{ReadOnly: true}}
}
func applyDefaults(optss []TxOptions) (lockTimeout, idleInTxSessionTimeout time.Duration, txOpts sql.TxOptions) {
lockTimeout = defaultLockTimeout
idleInTxSessionTimeout = defaultIdleInTxSessionTimeout
txIsolation := DefaultIsolation
readOnly := false
if len(optss) > 0 {
opts := optss[0]
if opts.LockTimeout != 0 {
lockTimeout = opts.LockTimeout
}
if opts.IdleInTxSessionTimeout != 0 {
idleInTxSessionTimeout = opts.IdleInTxSessionTimeout
}
if opts.Isolation != 0 {
txIsolation = opts.Isolation
}
readOnly = opts.ReadOnly
}
txOpts = sql.TxOptions{
Isolation: txIsolation,
ReadOnly: readOnly,
}
return
}
func SqlTransaction(ctx context.Context, rdb *sql.DB, lggr logger.Logger, fn func(tx *sqlx.Tx) error, optss ...TxOptions) (err error) {
db := WrapDbWithSqlx(rdb)
return sqlxTransaction(ctx, db, lggr, fn, optss...)
}
func sqlxTransaction(ctx context.Context, db *sqlx.DB, lggr logger.Logger, fn func(tx *sqlx.Tx) error, optss ...TxOptions) (err error) {
wrapFn := func(q Queryer) error {
tx, ok := q.(*sqlx.Tx)
if !ok {
panic(fmt.Sprintf("expected q to be %T but got %T", tx, q))
}
return fn(tx)
}
return sqlxTransactionQ(ctx, db, lggr, wrapFn, optss...)
}
// TxBeginner can be a db or a conn, anything that implements BeginTxx
type TxBeginner interface {
BeginTxx(context.Context, *sql.TxOptions) (*sqlx.Tx, error)
}
func sqlxTransactionQ(ctx context.Context, db TxBeginner, lggr logger.Logger, fn func(q Queryer) error, optss ...TxOptions) (err error) {
lockTimeout, idleInTxSessionTimeout, txOpts := applyDefaults(optss)
var tx *sqlx.Tx
tx, err = db.BeginTxx(ctx, &txOpts)
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
defer func() {
if p := recover(); p != nil {
sentry.CurrentHub().Recover(p)
sentry.Flush(corelogger.SentryFlushDeadline)
// A panic occurred, rollback and repanic
lggr.Errorf("Panic in transaction, rolling back: %s", p)
done := make(chan struct{})
go func() {
if rerr := tx.Rollback(); rerr != nil {
lggr.Errorf("Failed to rollback on panic: %s", rerr)
}
close(done)
}()
select {
case <-done:
panic(p)
case <-time.After(10 * time.Second):
panic(fmt.Sprintf("panic in transaction; aborting rollback that took longer than 10s: %s", p))
}
} else if err != nil {
lggr.Errorf("Error in transaction, rolling back: %s", err)
// An error occurred, rollback and return error
if rerr := tx.Rollback(); rerr != nil {
err = multierr.Combine(err, errors.WithStack(rerr))
}
} else {
// All good! Time to commit.
err = errors.WithStack(tx.Commit())
}
}()
if lockTimeout != defaultLockTimeout {
_, err = tx.Exec(fmt.Sprintf(`SET LOCAL lock_timeout = %d`, lockTimeout.Milliseconds()))
if err != nil {
return errors.Wrap(err, "error setting transaction local lock_timeout")
}
}
if idleInTxSessionTimeout != defaultIdleInTxSessionTimeout {
_, err = tx.Exec(fmt.Sprintf(`SET LOCAL idle_in_transaction_session_timeout = %d`, idleInTxSessionTimeout.Milliseconds()))
if err != nil {
return errors.Wrap(err, "error setting transaction local idle_in_transaction_session_timeout")
}
}
err = fn(tx)
return
}