This repository has been archived by the owner on Apr 29, 2020. It is now read-only.
/
auditing_transaction.go
116 lines (97 loc) · 3.05 KB
/
auditing_transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package rc
import (
"context"
"github.com/square/p2/pkg/audit"
pc_fields "github.com/square/p2/pkg/pc/fields"
"github.com/square/p2/pkg/rc/fields"
"github.com/square/p2/pkg/store/consul/transaction"
"github.com/square/p2/pkg/types"
"github.com/square/p2/pkg/util"
"github.com/hashicorp/consul/api"
)
type auditingTransaction struct {
ctx context.Context
nodes map[types.NodeName]struct{}
auditLogStore AuditLogStore
podID types.PodID
az pc_fields.AvailabilityZone
cn pc_fields.ClusterName
}
type scheduledNodesKey struct{}
func (rc *replicationController) newAuditingTransaction(
ctx context.Context,
rcFields fields.RC,
startingNodes []types.NodeName,
) (*auditingTransaction, context.CancelFunc) {
annotatedContext := context.WithValue(ctx, scheduledNodesKey{}, startingNodes)
ctx, cancelFunc := transaction.New(annotatedContext)
startingNodeMap := make(map[types.NodeName]struct{})
for _, node := range startingNodes {
startingNodeMap[node] = struct{}{}
}
podLabels := rcFields.PodLabels
return &auditingTransaction{
ctx: ctx,
nodes: startingNodeMap,
auditLogStore: rc.auditLogStore,
podID: rcFields.Manifest.ID(),
az: pc_fields.AvailabilityZone(podLabels[types.AvailabilityZoneLabel]),
cn: pc_fields.ClusterName(podLabels[types.ClusterNameLabel]),
}, cancelFunc
}
func (a *auditingTransaction) Context() context.Context {
return a.ctx
}
func (a *auditingTransaction) Nodes() []types.NodeName {
var nodes []types.NodeName
for node, _ := range a.nodes {
nodes = append(nodes, node)
}
return nodes
}
func (a *auditingTransaction) AddNode(node types.NodeName) {
a.nodes[node] = struct{}{}
}
func (a *auditingTransaction) RemoveNode(node types.NodeName) {
delete(a.nodes, node)
}
// Commit adds one final operation to the underlying consul transaction to
// create an audit log record with the set of nodes that already have been
// scheduled and nodes that will be scheduled as a part of this transaction by
// the RC. Then it commits the transaction
func (a *auditingTransaction) Commit(txner transaction.Txner) (bool, *api.KVTxnResponse, error) {
err := a.commitCommon(txner)
if err != nil {
return false, nil, err
}
return transaction.Commit(a.ctx, txner)
}
// CommitWithRetries() adds the audit log like Commit() but retries the
// transaction.Commit() until the transaction is applied without an error.
func (a *auditingTransaction) CommitWithRetries(txner transaction.Txner) (bool, *api.KVTxnResponse, error) {
err := a.commitCommon(txner)
if err != nil {
return false, nil, err
}
return transaction.CommitWithRetries(a.ctx, txner)
}
func (a *auditingTransaction) commitCommon(txner transaction.Txner) error {
details, err := audit.NewRCRetargetingEventDetails(
a.podID,
a.az,
a.cn,
a.Nodes(),
)
if err != nil {
return err
}
err = a.auditLogStore.Create(
a.ctx,
audit.RCRetargetingEvent,
details,
)
if err != nil {
return util.Errorf("could not add rc retargeting audit log to context: %s", err)
}
return nil
}