-
Notifications
You must be signed in to change notification settings - Fork 567
/
util.go
130 lines (117 loc) · 2.95 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package chunk
import (
"bytes"
"context"
"math/rand"
"os"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/pachyderm/pachyderm/src/client/pkg/require"
"github.com/pachyderm/pachyderm/src/server/pkg/obj"
)
const (
// KB is Kilobyte.
KB = 1024
// MB is Megabyte.
MB = 1024 * KB
)
// LocalStorage creates a local chunk storage instance.
// Useful for storage layer tests.
func LocalStorage(tb testing.TB) (obj.Client, *Storage) {
wd, err := os.Getwd()
require.NoError(tb, err)
objC, err := obj.NewLocalClient(wd)
require.NoError(tb, err)
return objC, NewStorage(objC)
}
// Cleanup cleans up a local chunk storage instance.
func Cleanup(objC obj.Client, chunks *Storage) {
chunks.DeleteAll(context.Background())
objC.Delete(context.Background(), prefix)
}
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
// RandSeq generates a random sequence of data (n is number of bytes)
func RandSeq(n int) []byte {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return []byte(string(b))
}
// Reference creates a data reference for the full chunk referenced by a data reference.
func Reference(dataRef *DataRef) *DataRef {
chunkRef := &DataRef{}
chunkRef.ChunkInfo = dataRef.ChunkInfo
chunkRef.SizeBytes = dataRef.ChunkInfo.SizeBytes
return chunkRef
}
func joinAnnotations(as []*Annotation, a *Annotation) []*Annotation {
if as != nil {
lastA := as[len(as)-1]
// If the annotation being added is the same as the
// last, then they are merged.
if lastA.Data == a.Data {
if lastA.tags != nil && a.tags != nil {
lastA.buf.Write(a.buf.Bytes())
if lastA.tags != nil {
lastA.tags = joinTags(lastA.tags, a.tags)
}
return as
} else if lastA.drs != nil && a.drs != nil {
return as
}
}
}
return append(as, a)
}
func joinTags(ts1, ts2 []*Tag) []*Tag {
if ts1 != nil {
lastT := ts1[len(ts1)-1]
if lastT.Id == ts2[0].Id {
lastT.SizeBytes += ts2[0].SizeBytes
ts2 = ts2[1:]
}
}
return append(ts1, ts2...)
}
func splitAnnotation(a *Annotation, size int) (*Annotation, *Annotation) {
a1 := copyAnnotation(a)
a2 := copyAnnotation(a)
if a.buf != nil {
a1.buf = bytes.NewBuffer(a.buf.Bytes()[:size])
a2.buf = bytes.NewBuffer(a.buf.Bytes()[size:])
}
if a.tags != nil {
a1.tags, a2.tags = splitTags(a.tags, size)
}
return a1, a2
}
func copyAnnotation(a *Annotation) *Annotation {
copyA := &Annotation{Data: a.Data}
if a.NextDataRef != nil {
copyA.NextDataRef = &DataRef{}
}
if a.buf != nil {
copyA.buf = &bytes.Buffer{}
}
return copyA
}
func splitTags(ts []*Tag, size int) ([]*Tag, []*Tag) {
var ts1, ts2 []*Tag
for _, t := range ts {
ts2 = append(ts2, proto.Clone(t).(*Tag))
}
for {
if int(ts2[0].SizeBytes) >= size {
t := proto.Clone(ts2[0]).(*Tag)
t.SizeBytes = int64(size)
ts1 = append(ts1, t)
ts2[0].SizeBytes -= int64(size)
break
}
size -= int(ts2[0].SizeBytes)
ts1 = append(ts1, ts2[0])
ts2 = ts2[1:]
}
return ts1, ts2
}