/
session.go
167 lines (144 loc) · 3.78 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package dsync
import (
"context"
"fmt"
"io"
"math/rand"
"sync"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/qri-io/dag"
)
// session tracks the state of a transfer
type session struct {
ctx context.Context
lng ipld.NodeGetter
bapi coreiface.BlockAPI
id string
pin bool
meta map[string]string
info *dag.Info
diff *dag.Manifest
prog dag.Completion
progCh chan dag.Completion
lock sync.Mutex
fin bool
}
// newSession creates a receive state machine
func newSession(ctx context.Context, lng ipld.NodeGetter, bapi coreiface.BlockAPI, info *dag.Info, calcBlockDiff, pinOnComplete bool, meta map[string]string) (s *session, err error) {
diff := info.Manifest
if calcBlockDiff {
log.Debug("calculating block diff")
if diff, err = dag.Missing(ctx, lng, info.Manifest); err != nil {
log.Debugf("error calculating diff err=%q", err)
return nil, err
}
}
s = &session{
id: randStringBytesMask(10),
ctx: ctx,
lng: lng,
bapi: bapi,
info: info,
diff: diff,
pin: pinOnComplete,
meta: meta,
prog: dag.NewCompletion(info.Manifest, diff),
progCh: make(chan dag.Completion),
}
go s.completionChanged()
log.Debugf("created session: %s", s.id)
return s, nil
}
// ReceiveBlock accepts a block from the sender, placing it in the local blockstore
func (s *session) ReceiveBlock(hash string, data io.Reader) ReceiveResponse {
bstat, err := s.bapi.Put(s.ctx, data)
if err != nil {
return ReceiveResponse{
Hash: hash,
Status: StatusRetry,
Err: err,
}
}
id := bstat.Path().Cid()
if id.String() != hash {
return ReceiveResponse{
Hash: hash,
Status: StatusErrored,
Err: fmt.Errorf("hash mismatch. expected: '%s', got: '%s'", hash, id.String()),
}
}
// this should be the only place that modifies progress
for i, h := range s.info.Manifest.Nodes {
if hash == h {
s.prog[i] = 100
}
}
go s.completionChanged()
return ReceiveResponse{
Hash: hash,
Status: StatusOk,
}
}
func (s *session) ReceiveBlocks(ctx context.Context, r io.Reader) error {
progCh := make(chan cid.Cid)
go func() {
for id := range progCh {
idStr := id.String()
for i, h := range s.info.Manifest.Nodes {
if idStr == h {
s.prog[i] = 100
}
}
go s.completionChanged()
}
}()
_, err := AddAllFromCARReader(ctx, s.bapi, r, progCh)
return err
}
// Complete returns if this receive session is finished or not
func (s *session) Complete() bool {
return s.prog.Complete()
}
func (s *session) completionChanged() {
s.progCh <- s.prog
}
// IsFinalizedOnce will return true if the session is complete, but only the first time it is
// called, even if multiple threads call this function at the same time
func (s *session) IsFinalizedOnce() bool {
if !s.Complete() {
return false
}
ret := false
s.lock.Lock()
if !s.fin {
ret = true
s.fin = true
}
defer s.lock.Unlock()
return ret
}
// the best stack overflow answer evaarrr: https://stackoverflow.com/a/22892986/9416066
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
func randStringBytesMask(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdxMax letters!
for i, cache, remain := n-1, rand.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = rand.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)
}