-
Notifications
You must be signed in to change notification settings - Fork 7
/
rados.go
130 lines (109 loc) · 2.6 KB
/
rados.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package wugui
import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"github.com/golang/groupcache"
"github.com/thesues/radoshttpd/rados"
)
var (
chunkPool sync.Pool
radosCache *groupcache.Group
cacheMade bool
)
func GetRadosCacheStats() groupcache.Stats {
return radosCache.Stats
}
func InitRadosCache(name string, size int64, chunkSize int) {
if cacheMade {
panic("groupcache: InitRadosCache must be called only once")
}
cacheMade = true
radosCache = groupcache.NewGroup(name, size, groupcache.GetterFunc(radosGetter))
chunkPool = sync.Pool{
New: func() interface{} { return make([]byte, chunkSize) },
}
}
type Key struct {
poolname string
filename string
offset int64
}
func (k *Key) FromString(key string) (err error) {
i := strings.LastIndex(key, "-")
k.offset, err = strconv.ParseInt(key[(i+1):], 10, 64)
if err != nil {
return err
}
blobref := key[:i]
poolAndFile := strings.SplitN(blobref, "/", 2)
k.poolname = poolAndFile[0]
k.filename = poolAndFile[1]
return
}
func (k *Key) String() string {
// Bucket names cannot contain slash.
// See http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
return fmt.Sprintf("%s/%s-%d", k.poolname, k.filename, k.offset)
}
type RadosReaderAt struct {
striper *rados.StriperPool
poolname string
filename string
size int64
}
func (r *RadosReaderAt) Size() int64 {
return r.size
}
func (r *RadosReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
wantN := len(p)
key := &Key{r.poolname, r.filename, off}
err = radosCache.Get(r, key.String(),
groupcache.TruncatingByteSliceSink(&p))
if err != nil {
return -1, err
}
if len(p) < wantN {
return -1, io.ErrUnexpectedEOF
}
return len(p), err
}
// TODO(wenjianhn):
// 1. func NewRadosReaderAt(poolname string, filename string)
// 2. reuse rados.NewConn
func NewRadosReaderAt(striper *rados.StriperPool, poolname string, filename string, size int64) RadosReaderAt {
return RadosReaderAt{
striper: striper,
poolname: poolname,
filename: filename,
size: size,
}
}
func radosGetter(ctx groupcache.Context, key string, dest groupcache.Sink) error {
rr := ctx.(*RadosReaderAt)
k := &Key{}
err := k.FromString(key)
if err != nil {
return err
}
buf := chunkPool.Get().([]byte)
defer chunkPool.Put(buf)
readP := buf
readN := 0
off := k.offset
for readN < cap(buf) {
count, err := rr.striper.Read(rr.filename, readP, uint64(off))
if err != nil {
return fmt.Errorf("Timeout or Read Error: %s", err.Error())
}
if count == 0 {
break
}
readN += count
off += int64(count)
readP = buf[count:]
}
return dest.SetBytes(buf[:readN])
}