/
adapter.go
127 lines (106 loc) · 2.17 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package frio
import (
"io"
"os"
"time"
)
type AdapterCache interface {
Add(key string, data []byte) error
Get(key string) ([]byte, bool)
AddWithExpire(key string, data []byte, exp time.Duration) error
}
type KeyStreamerAt interface {
StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)
}
type Adapter struct {
cache AdapterCache
keyStreamer KeyStreamerAt
}
type Options interface {
adapterOpt(adapter *Adapter) error
}
type cache struct {
numEntries int
}
func (c *cache) adapterOpt(adapter *Adapter) error {
adapter.cache = NewLRUCache(c.numEntries)
return nil
}
func WithDataCache(numEntries int) Options {
return &cache{numEntries: numEntries}
}
func NewAdapter(keyStreamer KeyStreamerAt, options ...Options) *Adapter {
ada := &Adapter{
keyStreamer: keyStreamer,
}
for _, option := range options {
option.adapterOpt(ada)
}
return ada
}
func (a *Adapter) Reader(key string) *Reader {
return &Reader{
a: a,
key: key,
off: 0,
}
}
func (a *Adapter) ReadAt(key string) *Reader {
return &Reader{
a: a,
key: key,
}
}
func (a *Adapter) srcRead(key string, off, n int64) ([]byte, int, error) {
if bytes, ok := a.cache.Get(key); ok && bytes != nil {
return bytes, len(bytes), nil
}
r, _, err := a.keyStreamer.StreamAt(key, off, n)
if err != nil {
return nil, 0, err
}
p, err := io.ReadAll(r)
if p != nil && err == nil {
a.cache.Add(key, p)
}
return p, len(p), nil
}
type Reader struct {
a *Adapter
key string
off, size int64
}
func (r *Reader) Read() ([]byte, int, error) {
bt, n, err := r.a.srcRead(r.key, r.off, 0)
if err != nil {
return nil, 0, err
}
return bt, n, err
}
func (r *Reader) ReadAt(off int64, size int64) ([]byte, int, error) {
r.off += off
r.size = size
bt, n, err := r.a.srcRead(r.key, r.off, size)
if err != nil {
return nil, 0, err
}
return bt, n, nil
}
func (r *Reader) Seek(off int64, nWhence int) (int64, error) {
coff := r.off
switch nWhence {
case io.SeekCurrent:
coff += off
case io.SeekStart:
coff = off
case io.SeekEnd:
coff = r.size + off
default:
return 0, os.ErrInvalid
}
if coff < 0 {
return r.off, os.ErrInvalid
}
r.off = coff
return r.off, nil
}