-
Notifications
You must be signed in to change notification settings - Fork 568
/
commit_set.go
346 lines (332 loc) · 12.7 KB
/
commit_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package server
import (
"context"
"google.golang.org/protobuf/proto"
"github.com/pachyderm/pachyderm/v2/src/internal/client"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/pfsdb"
"github.com/pachyderm/pachyderm/v2/src/internal/transactionenv/txncontext"
"github.com/pachyderm/pachyderm/v2/src/pfs"
pfsserver "github.com/pachyderm/pachyderm/v2/src/server/pfs"
)
// returns CommitInfos in a commit set, topologically sorted.
// A commit set will include all the commits that were created across repos for a run, along
// with all of the commits that the run's commit's rely on (present in previous commit sets).
func (d *driver) inspectCommitSetImmediateTx(ctx context.Context, txnCtx *txncontext.TransactionContext, commitSet *pfs.CommitSet, includeAliases bool) ([]*pfs.CommitInfo, error) {
var cis []*pfs.CommitInfo
if includeAliases {
cs, err := pfsdb.CommitSetProvenance(txnCtx.SqlTx, commitSet.Id)
if err != nil {
return nil, err
}
for _, c := range cs {
ci, err := pfsdb.GetCommitByCommitKey(ctx, txnCtx.SqlTx, c)
if err != nil {
return nil, errors.Wrap(err, "inspect commit set immediate")
}
cis = append(cis, ci)
}
}
commits, err := pfsdb.ListCommitTxByFilter(ctx, txnCtx.SqlTx, &pfs.Commit{Id: commitSet.Id})
if err != nil {
return nil, errors.Wrap(err, "inspect commit set immediate")
}
cis = append(cis, commits...)
return TopologicalSort(cis), nil
}
func topSortHelper(ci *pfs.CommitInfo, visited map[string]struct{}, commits map[string]*pfs.CommitInfo) []*pfs.CommitInfo {
if _, ok := visited[pfsdb.CommitKey(ci.Commit)]; ok {
return nil
}
var result []*pfs.CommitInfo
for _, p := range ci.DirectProvenance {
provCI, commitExists := commits[pfsdb.CommitKey(p)]
_, commitVisited := visited[pfsdb.CommitKey(p)]
if commitExists && !commitVisited {
result = append(result, topSortHelper(provCI, visited, commits)...)
}
}
result = append(result, ci)
visited[pfsdb.CommitKey(ci.Commit)] = struct{}{}
return result
}
// TopologicalSort sorts a slice of commit infos topologically based on their provenance
func TopologicalSort(cis []*pfs.CommitInfo) []*pfs.CommitInfo {
commits := make(map[string]*pfs.CommitInfo)
visited := make(map[string]struct{})
for _, ci := range cis {
commits[pfsdb.CommitKey(ci.Commit)] = ci
}
var result []*pfs.CommitInfo
for _, ci := range cis {
result = append(result, topSortHelper(ci, visited, commits)...)
}
return result
}
func (d *driver) inspectCommitSetImmediate(ctx context.Context, commitset *pfs.CommitSet, cb func(*pfs.CommitInfo) error) error {
var commitInfos []*pfs.CommitInfo
if err := d.txnEnv.WithReadContext(ctx, func(ctx context.Context, txnCtx *txncontext.TransactionContext) error {
var err error
commitInfos, err = d.inspectCommitSetImmediateTx(ctx, txnCtx, commitset, true)
return err
}); err != nil {
return err
}
for _, commitInfo := range commitInfos {
if err := cb(commitInfo); err != nil {
return err
}
}
return nil
}
func (d *driver) inspectCommitSet(ctx context.Context, commitset *pfs.CommitSet, wait bool, cb func(*pfs.CommitInfo) error) error {
if !wait {
return d.inspectCommitSetImmediate(ctx, commitset, cb)
}
sent := map[string]struct{}{}
send := func(ci *pfs.CommitInfo) error {
if _, ok := sent[pfsdb.CommitKey(ci.Commit)]; ok {
return nil
}
sent[pfsdb.CommitKey(ci.Commit)] = struct{}{}
return cb(ci)
}
var unfinishedCommits []*pfs.Commit
if err := d.inspectCommitSetImmediate(ctx, commitset, func(ci *pfs.CommitInfo) error {
if ci.Finished != nil {
return send(ci)
}
unfinishedCommits = append(unfinishedCommits, proto.Clone(ci.Commit).(*pfs.Commit))
return nil
}); err != nil {
return err
}
for _, uc := range unfinishedCommits {
// TODO: make a dedicated call just for the blocking part, inspectCommit is a little heavyweight?
ci, err := d.inspectCommit(ctx, uc, pfs.CommitState_FINISHED)
if err != nil {
return err
}
if err := send(ci); err != nil {
return err
}
}
return nil
}
// TODO(provenance): performance concerns in inspecting each commit set
// TODO(fahad/albert): change list commit set to query the pfsdb and return a list of all unique commit sets.
func (d *driver) listCommitSet(ctx context.Context, project *pfs.Project, cb func(*pfs.CommitSetInfo) error) error {
// Track the commitsets we've already processed
seen := map[string]struct{}{}
// Return commitsets by the newest commit in each set (which can be at a different
// timestamp due to triggers or deferred processing)
err := pfsdb.ForEachCommit(ctx, d.env.DB, nil, func(commitWithID pfsdb.CommitWithID) error {
commitInfo := commitWithID.CommitInfo
if project != nil && commitInfo.Commit.AccessRepo().Project.Name != project.Name {
return nil
}
if _, ok := seen[commitInfo.Commit.Id]; ok {
return nil
}
seen[commitInfo.Commit.Id] = struct{}{}
var commitInfos []*pfs.CommitInfo
if err := d.inspectCommitSet(ctx, &pfs.CommitSet{Id: commitInfo.Commit.Id}, false, func(ci *pfs.CommitInfo) error {
commitInfos = append(commitInfos, ci)
return nil
}); err != nil {
return err
}
return cb(&pfs.CommitSetInfo{
CommitSet: client.NewCommitSet(commitInfo.Commit.Id),
Commits: commitInfos,
})
}, pfsdb.OrderByCommitColumn{Column: pfsdb.CommitColumnID, Order: pfsdb.SortOrderDesc})
return errors.Wrap(err, "list commit set")
}
// dropCommitSet is only implemented for commits with no children, so if any
// commits in the commitSet have children the operation will fail.
func (d *driver) dropCommitSet(ctx context.Context, txnCtx *txncontext.TransactionContext, commitset *pfs.CommitSet) error {
css, err := d.subvenantCommitSets(txnCtx, commitset)
if err != nil {
return err
}
if len(css) > 0 {
return &pfsserver.ErrSquashWithSubvenance{CommitSet: commitset, SubvenantCommitSets: css}
}
cis, err := d.inspectCommitSetImmediateTx(ctx, txnCtx, commitset, false)
if err != nil {
return err
}
for _, ci := range cis {
if ci.Commit.AccessRepo().Type == pfs.SpecRepoType && ci.Origin.Kind == pfs.OriginKind_USER {
return errors.Errorf("cannot squash commit %s because it updated a pipeline", ci.Commit)
}
if len(ci.ChildCommits) > 0 {
return &pfsserver.ErrDropWithChildren{Commit: ci.Commit}
}
}
// While this is a 'drop' operation and not a 'squash', proper drop semantics
// aren't implemented at the moment. Squashing the head of a branch is
// effectively a drop, though, because there is no child commit that contains
// the data from the given commits, which is why it is an error to drop any
// non-head commits (until generalized drop semantics are implemented).
for _, ci := range cis {
if err := d.deleteCommit(ctx, txnCtx, ci); err != nil {
return err
}
}
// notify PPS that this commitset has been dropped so it can clean up any
// jobs associated with it at the end of the transaction
txnCtx.StopJobSet(commitset)
return nil
}
func (d *driver) squashCommitSet(ctx context.Context, txnCtx *txncontext.TransactionContext, commitset *pfs.CommitSet) error {
css, err := d.subvenantCommitSets(txnCtx, commitset)
if err != nil {
return err
}
if len(css) > 0 {
return &pfsserver.ErrSquashWithSubvenance{CommitSet: commitset, SubvenantCommitSets: css}
}
commitInfos, err := d.inspectCommitSetImmediateTx(ctx, txnCtx, commitset, false)
if err != nil {
return err
}
for _, ci := range commitInfos {
if ci.Commit.Repo.Type == pfs.SpecRepoType && ci.Origin.Kind == pfs.OriginKind_USER {
return errors.Errorf("cannot squash commit %s because it updated a pipeline", ci.Commit)
}
if len(ci.ChildCommits) == 0 {
return &pfsserver.ErrSquashWithoutChildren{Commit: ci.Commit}
}
}
for _, ci := range commitInfos {
if err := d.deleteCommit(ctx, txnCtx, ci); err != nil {
return err
}
}
// notify PPS that this commitset has been squashed so it can clean up any
// jobs associated with it at the end of the transaction
txnCtx.StopJobSet(commitset)
return nil
}
// Since commits are only deleted as part of deleting a commit set, in most cases
// we will delete many commits at a time. The graph traversal computations can result
// in many I/Os. Therefore, this function bulkifies the graph traversals to run the
// entire operation performantly. This function takes care to load and save an object
// no more than one time.
//
// to delete a single commit
// 1. delete the commit and its associated file set
// 2. check whether the commit was at the head of a branch, and update the branch head if necessary
// 3. updating the ChildCommits pointers of deletedCommit.ParentCommit
// 4. updating the ParentCommit pointer of deletedCommit.ChildCommits
func (d *driver) deleteCommit(ctx context.Context, txnCtx *txncontext.TransactionContext, ci *pfs.CommitInfo) error {
for _, child := range ci.ChildCommits {
childInfo, err := pfsdb.GetCommitByCommitKey(ctx, txnCtx.SqlTx, child)
if err != nil {
return errors.Wrapf(err, "error checking child commit state")
}
if childInfo.Finished == nil {
var suffix string
if childInfo.Finishing != nil {
// user might already have called "finish",
suffix = ", consider using WaitCommit"
}
return errors.Errorf("cannot squash until child commit %s is finished%s", child, suffix)
}
}
// Delete the commit's filesets
if err := d.commitStore.DropFileSetsTx(txnCtx.SqlTx, ci.Commit); err != nil {
return errors.EnsureStack(err)
}
// update branch heads
headlessBranches := make([]*pfs.BranchInfo, 0)
repoInfo, err := pfsdb.GetRepoByName(ctx, txnCtx.SqlTx, ci.Commit.Repo.Project.Name, ci.Commit.Repo.Name, ci.Commit.Repo.Type)
if err != nil {
return err
}
branchInfo := &pfs.BranchInfo{}
for _, b := range repoInfo.Branches {
branchInfoWithID, err := pfsdb.GetBranchInfoWithID(ctx, txnCtx.SqlTx, b)
if err != nil {
return errors.Wrapf(err, "delete commit: getting branch %s", b)
}
branchInfo = branchInfoWithID.BranchInfo
if pfsdb.CommitKey(branchInfo.Head) == pfsdb.CommitKey(ci.Commit) {
if ci.ParentCommit == nil {
headlessBranches = append(headlessBranches, proto.Clone(branchInfo).(*pfs.BranchInfo))
} else {
branchInfo.Head = ci.ParentCommit
}
}
if _, err := pfsdb.UpsertBranch(ctx, txnCtx.SqlTx, branchInfo); err != nil {
return errors.Wrapf(err, "delete commit: updating branch %s", branchInfo.Branch)
}
}
if len(headlessBranches) > 0 {
repoCommit, err := d.makeEmptyCommit(ctx, txnCtx, headlessBranches[0].Branch, headlessBranches[0].DirectProvenance, nil)
if err != nil {
return err
}
for _, bi := range headlessBranches {
bi.Head = repoCommit.CommitInfo.Commit
if _, err := pfsdb.UpsertBranch(ctx, txnCtx.SqlTx, bi); err != nil {
return errors.Wrapf(err, "delete commit: updating branch %s", bi.Branch)
}
}
}
// delete commit.
if err := pfsdb.DeleteCommit(ctx, txnCtx.SqlTx, ci.Commit); err != nil {
return errors.EnsureStack(err)
}
return nil
}
// A commit set 'X' is subvenant to another commit set 'Y' if it contains commits
// that are subvenant to commits in 'Y'. Commit set subvenance is transitivie.
//
// The implementation repeatedly applies CommitSetSubvenance() to compute all of the Subvenant commit sets.
// To understand why, first consider the simple case with the commit provenance graph where r@X & q@Y are in p@Y's provenance.
// For this graph, CommitSetSubvenance("X") evaluates to [p@Y] which we can use to infer that commit set Y is subvenant to commit set X.
// Now consider the same graph, with the addition of a commit s@Z that has q@Y in its subvenance.
// In this case, CommitSetSubvenance(X) still evaluates to [p@Y]. But since a commit in 'Z', depends on a commit
// in 'Y', we haven't yet computed all of 'X”s subvenant commit sets. Therefore,
// we re-evaluate CommitSetSubvenance for each collected commit set until our resulting set becomes stable.
func (d *driver) subvenantCommitSets(txnCtx *txncontext.TransactionContext, commitset *pfs.CommitSet) ([]*pfs.CommitSet, error) {
collectSubvCommitSets := func(setIDs map[string]struct{}) (map[string]struct{}, error) {
subvCommitSets := make(map[string]struct{})
for id := range setIDs {
subvCommits, err := pfsdb.CommitSetSubvenance(txnCtx.SqlTx, id)
if err != nil {
return nil, err
}
for _, subvCommit := range subvCommits {
if _, ok := setIDs[subvCommit.Id]; !ok {
subvCommitSets[subvCommit.Id] = struct{}{}
}
}
}
return subvCommitSets, nil
}
subvCSs, err := collectSubvCommitSets(map[string]struct{}{
commitset.Id: {},
})
if err != nil {
return nil, err
}
completeSubvCSs := make(map[string]struct{})
for len(subvCSs) > 0 {
for cs := range subvCSs {
completeSubvCSs[cs] = struct{}{}
}
subvCSs, err = collectSubvCommitSets(subvCSs)
if err != nil {
return nil, err
}
}
result := make([]*pfs.CommitSet, 0)
for cs := range completeSubvCSs {
result = append(result, &pfs.CommitSet{Id: cs})
}
return result, nil
}