-
Notifications
You must be signed in to change notification settings - Fork 119
/
tx.go
149 lines (121 loc) · 3.97 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package yt
import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
)
type ExecTxRetryOptions backoff.BackOff
const (
DefaultExecTxRetryCount = 5
DefaultExecTxRetryBackoff = time.Second
)
// newDefaultExecTxRetryOptions creates retry options with the following properties:
// - operation is retried up to DefaultExecTxRetryCount times with DefaultExecTxRetryBackoff sleeps in between.
// - retries could be stopped via context cancellation.
func newDefaultExecTxRetryOptions(ctx context.Context) backoff.BackOffContext {
b := backoff.NewConstantBackOff(DefaultExecTxRetryBackoff)
return backoff.WithContext(backoff.WithMaxRetries(b, DefaultExecTxRetryCount), ctx)
}
// ExecTxRetryOptionsNone is a fixed retry policy that never retries the operation.
type ExecTxRetryOptionsNone = backoff.StopBackOff
// TxFunc is a callback used in ExecTx function.
type TxFunc func(ctx context.Context, tx Tx) error
type ExecTxOptions struct {
RetryOptions ExecTxRetryOptions
*StartTxOptions
}
// ExecTx is a convenience method that creates new master transaction and
// executes commit/abort based on the error returned by the callback function.
//
// In case of nil options default ones are used.
//
// Retries could be stopped with the context cancellation.
//
// If f returns a *backoff.PermanentError, the operation is not retried, and the wrapped error is returned.
func ExecTx(ctx context.Context, yc Client, f TxFunc, opts *ExecTxOptions) (err error) {
if opts == nil {
opts = &ExecTxOptions{}
}
if opts.RetryOptions == nil {
opts.RetryOptions = newDefaultExecTxRetryOptions(ctx)
}
return backoff.Retry(func() error {
b := newMasterTxBeginner(yc, opts.StartTxOptions)
return execTx(ctx, b, func(ctx context.Context, tx any) error {
return f(ctx, tx.(Tx))
})
}, opts.RetryOptions)
}
// TabletTxFunc is a callback used in ExecTabletTx function.
type TabletTxFunc func(ctx context.Context, tx TabletTx) error
type ExecTabletTxOptions struct {
RetryOptions ExecTxRetryOptions
*StartTabletTxOptions
}
// ExecTabletTx a convenience method that creates new tablet transaction and executes commit/abort based on the
// error returned by the callback function.
//
// In case of nil options default ones are used.
//
// Retries could be stopped with the context cancellation.
//
// If f returns a *backoff.PermanentError, the operation is not retried, and the wrapped error is returned.
func ExecTabletTx(ctx context.Context, yc Client, f TabletTxFunc, opts *ExecTabletTxOptions) (err error) {
if opts == nil {
opts = &ExecTabletTxOptions{}
}
if opts.RetryOptions == nil {
opts.RetryOptions = newDefaultExecTxRetryOptions(ctx)
}
return backoff.Retry(func() error {
b := newTabletTxBeginner(yc, opts.StartTabletTxOptions)
return execTx(ctx, b, func(ctx context.Context, tx any) error {
return f(ctx, tx.(TabletTx))
})
}, opts.RetryOptions)
}
type txBeginner interface {
BeginTx(ctx context.Context) (tx, error)
}
type tx interface {
Commit() error
Abort() error
}
type masterTxBeginner struct {
yc Client
opts *StartTxOptions
}
func newMasterTxBeginner(yc Client, opts *StartTxOptions) *masterTxBeginner {
return &masterTxBeginner{yc: yc, opts: opts}
}
func (b *masterTxBeginner) BeginTx(ctx context.Context) (tx, error) {
return b.yc.BeginTx(ctx, b.opts)
}
type tabletTxBeginner struct {
yc Client
opts *StartTabletTxOptions
}
func newTabletTxBeginner(yc Client, opts *StartTabletTxOptions) *tabletTxBeginner {
return &tabletTxBeginner{yc: yc, opts: opts}
}
func (b *tabletTxBeginner) BeginTx(ctx context.Context) (tx, error) {
return b.yc.BeginTabletTx(ctx, b.opts)
}
type txFunc func(ctx context.Context, tx any) error
// execTx starts transaction and executes commit/abort based on the
// error returned by the callback function.
func execTx(ctx context.Context, b txBeginner, f txFunc) (err error) {
tx, err := b.BeginTx(ctx)
if err != nil {
return
}
defer func() {
if err != nil {
_ = tx.Abort()
} else {
err = tx.Commit()
}
}()
err = f(ctx, tx)
return err
}