/
transaction.go
146 lines (131 loc) · 4.63 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package client
import (
"context"
"fmt"
"github.com/pachyderm/pachyderm/src/client/pfs"
"github.com/pachyderm/pachyderm/src/client/pkg/grpcutil"
"github.com/pachyderm/pachyderm/src/client/transaction"
"google.golang.org/grpc/metadata"
)
const transactionMetadataKey = "pach-transaction"
// WithTransaction (client-side) returns a new APIClient that will run supported
// write operations within the specified transaction.
func (c APIClient) WithTransaction(txn *transaction.Transaction) *APIClient {
md, _ := metadata.FromOutgoingContext(c.Ctx())
md = md.Copy()
if txn != nil {
md.Set(transactionMetadataKey, txn.ID)
} else {
md.Set(transactionMetadataKey)
}
ctx := metadata.NewOutgoingContext(c.Ctx(), md)
return c.WithCtx(ctx)
}
// GetTransaction (should be run from the server-side) loads the active
// transaction from the grpc metadata and returns the associated transaction
// object - or `nil` if no transaction is set.
func GetTransaction(ctx context.Context) (*transaction.Transaction, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("request metadata could not be parsed from context")
}
txns := md.Get(transactionMetadataKey)
if txns == nil || len(txns) == 0 {
return nil, nil
} else if len(txns) > 1 {
return nil, fmt.Errorf("multiple active transactions found in context")
}
return &transaction.Transaction{ID: txns[0]}, nil
}
// GetTransaction is a helper function to get the active transaction from the
// client's context metadata.
func (c APIClient) GetTransaction() (*transaction.Transaction, error) {
return GetTransaction(c.Ctx())
}
// NewCommitResponse is a helper function to instantiate a TransactionResponse
// for a transaction item that returns a Commit ID.
func NewCommitResponse(commit *pfs.Commit) *transaction.TransactionResponse {
return &transaction.TransactionResponse{
Commit: commit,
}
}
// ListTransaction is an RPC that fetches a list of all open transactions in the
// Pachyderm cluster.
func (c APIClient) ListTransaction() ([]*transaction.TransactionInfo, error) {
response, err := c.TransactionAPIClient.ListTransaction(
c.Ctx(),
&transaction.ListTransactionRequest{},
)
if err != nil {
return nil, grpcutil.ScrubGRPC(err)
}
return response.TransactionInfo, nil
}
// StartTransaction is an RPC that registers a new transaction with the
// Pachyderm cluster and returns the identifier of the new transaction.
func (c APIClient) StartTransaction() (*transaction.Transaction, error) {
response, err := c.TransactionAPIClient.StartTransaction(
c.Ctx(),
&transaction.StartTransactionRequest{},
)
if err != nil {
return nil, grpcutil.ScrubGRPC(err)
}
return response, nil
}
// FinishTransaction is an RPC that closes an existing transaction in the
// Pachyderm cluster and commits its changes to the persisted cluster metadata
// transactionally.
func (c APIClient) FinishTransaction(txn *transaction.Transaction) (*transaction.TransactionInfo, error) {
response, err := c.TransactionAPIClient.FinishTransaction(
c.Ctx(),
&transaction.FinishTransactionRequest{
Transaction: txn,
},
)
if err != nil {
return nil, grpcutil.ScrubGRPC(err)
}
return response, nil
}
// DeleteTransaction is an RPC that aborts an existing transaction in the
// Pachyderm cluster and removes it from the cluster.
func (c APIClient) DeleteTransaction(txn *transaction.Transaction) error {
_, err := c.TransactionAPIClient.DeleteTransaction(
c.Ctx(),
&transaction.DeleteTransactionRequest{
Transaction: txn,
},
)
return grpcutil.ScrubGRPC(err)
}
// InspectTransaction is an RPC that fetches the detailed information for an
// existing transaction in the Pachyderm cluster.
func (c APIClient) InspectTransaction(txn *transaction.Transaction) (*transaction.TransactionInfo, error) {
response, err := c.TransactionAPIClient.InspectTransaction(
c.Ctx(),
&transaction.InspectTransactionRequest{
Transaction: txn,
},
)
if err != nil {
return nil, grpcutil.ScrubGRPC(err)
}
return response, nil
}
// ExecuteInTransaction executes a callback within a transaction.
// The callback should use the passed in APIClient.
// If the callback returns a nil error, then the transaction will be finished.
// If the callback returns a non-nil error, then the transaction will be deleted.
func (c APIClient) ExecuteInTransaction(f func(c *APIClient) error) (*transaction.TransactionInfo, error) {
txn, err := c.StartTransaction()
if err != nil {
return nil, err
}
if err := f(c.WithTransaction(txn)); err != nil {
// We ignore the delete error, because we are more interested in the error from the callback.
c.DeleteTransaction(txn)
return nil, err
}
return c.FinishTransaction(txn)
}