/
prefixed.go
84 lines (70 loc) · 1.94 KB
/
prefixed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package kv
import (
"bytes"
"context"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/stream"
)
var _ Store = &Prefixed{}
// Prefixed is a Store whichs stores entries beneath a prefix in a backing Store.
type Prefixed struct {
inner Store
prefix []byte
}
func NewPrefixed(inner Store, prefix []byte) *Prefixed {
if len(prefix) == 0 {
panic("NewPrefix with empty prefix")
}
return &Prefixed{
inner: inner,
prefix: slices.Clone(prefix),
}
}
func (s *Prefixed) Get(ctx context.Context, key []byte, buf []byte) (int, error) {
return s.inner.Get(ctx, s.addPrefix(key), buf)
}
func (s *Prefixed) Put(ctx context.Context, key []byte, value []byte) error {
return s.inner.Put(ctx, s.addPrefix(key), value)
}
func (s *Prefixed) Exists(ctx context.Context, key []byte) (bool, error) {
return s.inner.Exists(ctx, s.addPrefix(key))
}
func (s *Prefixed) Delete(ctx context.Context, key []byte) error {
return s.inner.Delete(ctx, s.addPrefix(key))
}
func (s *Prefixed) NewKeyIterator(span Span) stream.Iterator[[]byte] {
span2 := Span{
Begin: s.prefix,
End: PrefixEnd(s.prefix),
}
if span.Begin != nil {
span2.Begin = s.addPrefix(span.Begin)
}
if span.End != nil {
span2.End = s.addPrefix(span.End)
}
return &prefixedIter{inner: s.inner.NewKeyIterator(span2), prefix: s.prefix}
}
type prefixedIter struct {
inner stream.Iterator[[]byte]
prefix []byte
}
func (it *prefixedIter) Next(ctx context.Context, dst *[]byte) error {
for {
if err := it.inner.Next(ctx, dst); err != nil {
return err
}
if bytes.HasPrefix(*dst, it.prefix) {
*dst = bytes.TrimPrefix(*dst, it.prefix)
return nil
}
log.Error(ctx, "key does not have prefix", zap.ByteString("key", *dst), zap.ByteString("prefix", it.prefix))
}
}
func (s *Prefixed) addPrefix(key []byte) (ret []byte) {
ret = append(ret, s.prefix...)
ret = append(ret, key...)
return ret
}