-
Notifications
You must be signed in to change notification settings - Fork 0
/
upload_pack_session.go
110 lines (102 loc) · 2.29 KB
/
upload_pack_session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// SPDX-License-Identifier: Apache-2.0
// Copyright © 2021 Wrangle Ltd
package apiclient
import (
"fmt"
"io"
apiutils "github.com/wrgl/core/pkg/api/utils"
"github.com/wrgl/core/pkg/objects"
"github.com/wrgl/core/pkg/ref"
)
const defaultHavesPerRoundTrip = 32
type UploadPackSession struct {
c *Client
db objects.Store
receiver *apiutils.ObjectReceiver
rs ref.Store
wants [][]byte
q *ref.CommitsQueue
popCount int
havesPerRoundTrip int
}
func NewUploadPackSession(db objects.Store, rs ref.Store, c *Client, advertised [][]byte, havesPerRoundTrip int) (*UploadPackSession, error) {
if havesPerRoundTrip == 0 {
havesPerRoundTrip = defaultHavesPerRoundTrip
}
neg := &UploadPackSession{
c: c,
db: db,
rs: rs,
havesPerRoundTrip: havesPerRoundTrip,
}
for _, b := range advertised {
if !objects.CommitExist(db, b) {
neg.wants = append(neg.wants, b)
}
}
if len(neg.wants) == 0 {
return nil, fmt.Errorf("nothing wanted")
}
neg.receiver = apiutils.NewObjectReceiver(db, neg.wants)
return neg, nil
}
func (n *UploadPackSession) popHaves() (haves [][]byte, done bool, err error) {
if n.q == nil {
m, err := ref.ListAllRefs(n.rs)
if err != nil {
return nil, false, err
}
sl := [][]byte{}
for _, v := range m {
sl = append(sl, v)
}
n.q, err = ref.NewCommitsQueue(n.db, sl)
if err != nil {
return nil, false, err
}
}
for i := 0; i < n.havesPerRoundTrip; i++ {
sum, _, err := n.q.PopInsertParents()
if err == io.EOF {
done = true
break
}
if err != nil {
return nil, false, err
}
haves = append(haves, sum)
n.popCount++
}
if n.popCount >= 256 {
done = true
}
return
}
func (n *UploadPackSession) Start() ([][]byte, error) {
for {
haves, done, err := n.popHaves()
if err != nil {
return nil, err
}
acks, pr, err := n.c.PostUploadPack(n.wants, haves, done)
if err != nil {
return nil, err
}
n.wants = nil
if acks == nil {
defer pr.Close()
doneReceiving, err := n.receiver.Receive(pr)
if err != nil {
return nil, err
}
if doneReceiving {
break
}
}
err = n.q.RemoveAncestors(acks)
if err != nil {
return nil, err
}
}
return n.receiver.ReceivedCommits, nil
}