-
Notifications
You must be signed in to change notification settings - Fork 348
/
commit_ordered_iterator.go
131 lines (121 loc) · 3.63 KB
/
commit_ordered_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package ref
import (
"context"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/kv"
)
type OrderedCommitIterator struct {
ctx context.Context
it *kv.PrimaryIterator
store kv.Store
err error
value *graveler.CommitRecord
repositoryPath string
onlyAncestryLeaves bool
firstParents map[string]bool
}
// NewOrderedCommitIterator returns an iterator over all commits in the given repository.
// Ordering is based on the Commit ID value.
// WithOnlyAncestryLeaves causes the iterator to return only commits which are not the first parent of any other commit.
// Consider a commit graph where all non-first-parent edges are removed. This graph is a tree, and ancestry leaves are its leaves.
func NewOrderedCommitIterator(ctx context.Context, store kv.Store, repo *graveler.RepositoryRecord, onlyAncestryLeaves bool) (*OrderedCommitIterator, error) {
repoPath := graveler.RepoPartition(repo)
it, err := kv.NewPrimaryIterator(ctx, store, (&graveler.CommitData{}).ProtoReflect().Type(), repoPath,
[]byte(graveler.CommitPath("")), kv.IteratorOptionsFrom([]byte("")))
if err != nil {
return nil, err
}
var parents map[string]bool
if onlyAncestryLeaves {
parents, err = getAllFirstParents(ctx, store, repo)
if err != nil {
it.Close()
return nil, err
}
}
return &OrderedCommitIterator{
ctx: ctx,
it: it,
store: store,
repositoryPath: repoPath,
onlyAncestryLeaves: onlyAncestryLeaves,
firstParents: parents,
}, nil
}
func (i *OrderedCommitIterator) Next() bool {
if i.Err() != nil || i.it == nil {
return false
}
for i.it.Next() {
e := i.it.Entry()
if e == nil {
i.err = graveler.ErrInvalid
return false
}
commit, ok := e.Value.(*graveler.CommitData)
if commit == nil || !ok {
i.err = graveler.ErrReadingFromStore
return false
}
if !i.onlyAncestryLeaves || !i.firstParents[commit.Id] {
i.value = CommitDataToCommitRecord(commit)
return true
}
}
i.value = nil
return false
}
func (i *OrderedCommitIterator) SeekGE(id graveler.CommitID) {
if i.err != nil {
return
}
i.it.Close()
i.value = nil
i.it, i.err = kv.NewPrimaryIterator(i.ctx, i.store, (&graveler.CommitData{}).ProtoReflect().Type(), i.repositoryPath,
[]byte(graveler.CommitPath("")), kv.IteratorOptionsFrom([]byte(graveler.CommitPath(id))))
}
func (i *OrderedCommitIterator) Value() *graveler.CommitRecord {
if i.Err() != nil {
return nil
}
return i.value
}
func (i *OrderedCommitIterator) Err() error {
if i.err != nil {
return i.err
}
if i.it != nil {
return i.it.Err()
}
return nil
}
func (i *OrderedCommitIterator) Close() {
if i.it != nil {
i.it.Close()
i.it = nil
}
}
// getAllFirstParents returns a set of all commits that are the first parent of some other commit for a given repository.
func getAllFirstParents(ctx context.Context, store kv.Store, repo *graveler.RepositoryRecord) (map[string]bool, error) {
it, err := kv.NewPrimaryIterator(ctx, store, (&graveler.CommitData{}).ProtoReflect().Type(),
graveler.RepoPartition(repo),
[]byte(graveler.CommitPath("")), kv.IteratorOptionsFrom([]byte("")))
if err != nil {
return nil, err
}
defer it.Close()
firstParents := make(map[string]bool)
for it.Next() {
entry := it.Entry()
commit := entry.Value.(*graveler.CommitData)
if len(commit.Parents) > 0 {
parentNo := 0
if graveler.CommitVersion(commit.Version) < graveler.CommitVersionParentSwitch && len(commit.Parents) > 1 {
parentNo = 1
}
parent := commit.Parents[parentNo]
firstParents[parent] = true
}
}
return firstParents, nil
}