-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
tx_executor.go
280 lines (245 loc) · 9.62 KB
/
tx_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tabletserver
import (
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"context"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
// TxExecutor is used for executing a transactional request.
// TODO: merge this with tx_engine
type TxExecutor struct {
// TODO(sougou): Parameterize this.
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
}
// Prepare performs a prepare on a connection including the redo log work.
// If there is any failure, an error is returned. No cleanup is performed.
// A subsequent call to RollbackPrepared, which is required by the 2PC
// protocol, will perform all the cleanup.
func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
txe.logStats.TransactionID = transactionID
conn, err := txe.te.txPool.GetAndLock(transactionID, "for prepare")
if err != nil {
return err
}
// If no queries were executed, we just rollback.
if len(conn.TxProperties().Queries) == 0 {
conn.Release(tx.TxRollback)
return nil
}
err = txe.te.preparedPool.Put(conn, dtid)
if err != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err)
}
return txe.inTransaction(func(localConn *StatefulConnection) error {
return txe.te.twoPC.SaveRedo(txe.ctx, localConn, dtid, conn.TxProperties().Queries)
})
}
// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (txe *TxExecutor) CommitPrepared(dtid string) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := txe.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
if conn == nil {
return nil
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), txe.ctx)
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
err = txe.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
txe.markFailed(ctx, dtid)
return err
}
_, err = txe.te.txPool.Commit(ctx, conn)
if err != nil {
txe.markFailed(ctx, dtid)
return err
}
txe.te.preparedPool.Forget(dtid)
return nil
}
// markFailed does the necessary work to mark a CommitPrepared
// as failed. It marks the dtid as failed in the prepared pool,
// increments the InternalErros counter, and also changes the
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of TxExecutor's context.
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
if err = txe.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}
if _, err = txe.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}
// RollbackPrepared rolls back a prepared transaction. This function handles
// the case of an incomplete prepare.
//
// If the prepare completely failed, it will just rollback the original
// transaction identified by originalID.
//
// If the connection was moved to the prepared pool, but redo log
// creation failed, then it will rollback that transaction and
// return the conn to the txPool.
//
// If prepare was fully successful, it will also delete the redo log.
// If the redo log deletion fails, it returns an error indicating that
// a retry is needed.
//
// In recovery mode, the original transaction id will not be available.
// If so, it must be set to 0, and the function will not attempt that
// step. If the original transaction is still alive, the transaction
// killer will be the one to eventually roll it back.
func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now())
defer func() {
if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, preparedConn)
}
if originalID != 0 {
txe.te.Rollback(txe.ctx, originalID)
}
}()
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteRedo(txe.ctx, conn, dtid)
})
}
// CreateTransaction creates the metadata for a 2PC transaction.
func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.CreateTransaction(txe.ctx, conn, dtid, participants)
})
}
// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
txe.logStats.TransactionID = transactionID
conn, err := txe.te.txPool.GetAndLock(transactionID, "for 2pc commit")
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
err = txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
}
_, err = txe.te.txPool.Commit(txe.ctx, conn)
return err
}
// SetRollback transitions the 2pc transaction to the Rollback state.
// If a transaction id is provided, that transaction is also rolled back.
func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now())
txe.logStats.TransactionID = transactionID
if transactionID != 0 {
txe.te.Rollback(txe.ctx, transactionID)
}
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
}
// ConcludeTransaction deletes the 2pc transaction metadata
// essentially resolving it.
func (txe *TxExecutor) ConcludeTransaction(dtid string) error {
if !txe.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now())
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteTransaction(txe.ctx, conn, dtid)
})
}
// ReadTransaction returns the metadata for the sepcified dtid.
func (txe *TxExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) {
if !txe.te.twopcEnabled {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
return txe.te.twoPC.ReadTransaction(txe.ctx, dtid)
}
// ReadTwopcInflight returns info about all in-flight 2pc transactions.
func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) {
if !txe.te.twopcEnabled {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
prepared, failed, err = txe.te.twoPC.ReadAllRedo(txe.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
distributed, err = txe.te.twoPC.ReadAllTransactions(txe.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
return distributed, prepared, failed, nil
}
func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
err = f(conn)
if err != nil {
return err
}
_, err = txe.te.txPool.Commit(txe.ctx, conn)
if err != nil {
return err
}
return nil
}