-
Notifications
You must be signed in to change notification settings - Fork 0
/
gsip.go
182 lines (144 loc) · 3.99 KB
/
gsip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package gsip
import (
"bufio"
"encoding/json"
"fmt"
"io"
"sync"
"github.com/jonjohnsonjr/targz/gsip/internal/flate"
"github.com/jonjohnsonjr/targz/gsip/internal/gzip"
)
// Index contains the metadata used by [Reader] to skip around a gzip stream.
// The layout will absolutely change and break you if you depend on it.
type Index struct {
Checkpoints []*flate.Checkpoint
}
type Reader struct {
ra io.ReaderAt
size int64
updates chan *flate.Checkpoint
checkpoints []*flate.Checkpoint
// Reader, available.
mu sync.Mutex
readers map[*gzip.Reader]bool
}
func (r *Reader) Encode(w io.Writer) error {
idx := Index{
Checkpoints: r.checkpoints,
}
return json.NewEncoder(w).Encode(&idx)
}
func Decode(ra io.ReaderAt, size int64, index io.Reader) (*Reader, error) {
idx := Index{}
if err := json.NewDecoder(index).Decode(&idx); err != nil {
return nil, err
}
return &Reader{
ra: ra,
size: size,
checkpoints: idx.Checkpoints,
readers: map[*gzip.Reader]bool{},
}, nil
}
func NewReader(ra io.ReaderAt, size int64) (*Reader, error) {
updates := make(chan *flate.Checkpoint, 10)
// This is our first pass frontier reader that sends us updates.
// We probably need to do something special to make this work in the face of concurrent ReadAt.
sr := io.NewSectionReader(ra, 0, size)
// Add a buffered reader to the "frontier" to make sure we read at least 1MB at a time.
// This avoids sending a ton of tiny http requests when using ranger.
// TODO: Give callers control over this. Does io.SectionReader.Outer help here?
// Should we implement an optional bufio.ReaderAt?
br := bufio.NewReaderSize(sr, 1<<20)
zr, err := gzip.NewReader(br, updates)
if err != nil {
return nil, err
}
r := &Reader{
ra: ra,
size: size,
updates: updates,
checkpoints: []*flate.Checkpoint{},
readers: map[*gzip.Reader]bool{zr: true},
}
// TODO: Locking around this to make sure it's safe.
// TODO: Make sure we don't leak this goroutine.
go func() {
for checkpoint := range updates {
r.checkpoints = append(r.checkpoints, checkpoint)
}
}()
return r, nil
}
func (r *Reader) acquireReader(off int64) (*gzip.Reader, error) {
r.mu.Lock()
// TODO: Appropriate locking around this for concurrency.
// TODO: Even if we don't find an exact match, one of these might be reusable.
// TODO: Consider a fixed size pool of these that signal they're done via Close().
for zr, ok := range r.readers {
if ok && zr.Offset() == off {
r.readers[zr] = false
r.mu.Unlock()
return zr, nil
}
}
r.mu.Unlock()
var highest *flate.Checkpoint
for _, checkpoint := range r.checkpoints {
if checkpoint.Out > off {
break
}
highest = checkpoint
}
if highest == nil {
// No checkpoints probably means we are trying to ReadAt before we index.
// Just try to find any reader that isn't already in use (probably the first one).
r.mu.Lock()
for zr, ok := range r.readers {
if !ok {
continue
}
if zr.Offset() > off {
continue
}
r.readers[zr] = false
r.mu.Unlock()
if _, err := io.CopyN(io.Discard, zr, off-zr.Offset()); err != nil {
return nil, err
}
return zr, nil
}
r.mu.Unlock()
return nil, fmt.Errorf("could not find any checkpoints or readers for offset %d", off)
}
// TODO: Do we need to bound the size?
sr := io.NewSectionReader(r.ra, highest.In, r.size)
zr, err := gzip.Continue(sr, 0, highest, nil)
if err != nil {
return nil, fmt.Errorf("continue: %w", err)
}
// TODO: Make sure this doesn't send a bunch of tiny ReadAts.
discard := off - highest.Out
if _, err := io.CopyN(io.Discard, zr, discard); err != nil {
return nil, err
}
r.mu.Lock()
r.readers[zr] = false
r.mu.Unlock()
return zr, nil
}
func (r *Reader) ReadAt(p []byte, off int64) (int, error) {
zr, err := r.acquireReader(off)
if err != nil {
return 0, err
}
defer func() {
r.mu.Lock()
defer r.mu.Unlock()
r.readers[zr] = true
}()
return io.ReadFull(zr, p)
}
type reader struct {
gzip.Reader
}