forked from firedancer-io/radiance
/
build.go
271 lines (244 loc) · 7.15 KB
/
build.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
//go:build unix
package compactindex
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
)
// Builder creates new compactindex files.
type Builder struct {
Header
buckets []tempBucket
dir string
}
// NewBuilder creates a new index builder.
//
// If dir is an empty string, a random temporary directory is used.
//
// numItems refers to the number of items in the index.
//
// targetFileSize is the size of the file that index entries point to.
// Can be set to zero if unknown, which results in a less efficient (larger) index.
func NewBuilder(dir string, numItems uint, targetFileSize uint64) (*Builder, error) {
if dir == "" {
var err error
dir, err = os.MkdirTemp("", "compactindex-")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir: %w", err)
}
}
if targetFileSize == 0 {
targetFileSize = math.MaxUint64
}
numBuckets := (numItems + targetEntriesPerBucket - 1) / targetEntriesPerBucket
buckets := make([]tempBucket, numBuckets)
for i := range buckets {
name := filepath.Join(dir, fmt.Sprintf("keys-%d", i))
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
buckets[i].file = f
buckets[i].writer = bufio.NewWriter(f)
}
return &Builder{
Header: Header{
FileSize: targetFileSize,
NumBuckets: uint32(numBuckets),
},
buckets: buckets,
dir: dir,
}, nil
}
// Insert writes a key-value mapping to the index.
//
// Index generation will fail if the same key is inserted twice.
// The writer must not pass a value greater than targetFileSize.
func (b *Builder) Insert(key []byte, value uint64) error {
return b.buckets[b.Header.BucketHash(key)].writeTuple(key, value)
}
// Seal writes the final index to the provided file.
// This process is CPU-intensive, use context to abort prematurely.
//
// The file should be opened with access mode os.O_RDWR.
// Passing a non-empty file will result in a corrupted index.
func (b *Builder) Seal(ctx context.Context, f *os.File) (err error) {
// TODO support in-place writing.
// Write header.
var headerBuf [headerSize]byte
b.Header.Store(&headerBuf)
_, err = f.Write(headerBuf[:])
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
// Create hole to leave space for bucket header table.
bucketTableLen := int64(b.NumBuckets) * bucketHdrLen
err = fallocate(f, headerSize, bucketTableLen)
if err != nil {
return fmt.Errorf("failed to fallocate() bucket table: %w", err)
}
// Seal each bucket.
for i := range b.buckets {
if err := b.sealBucket(ctx, i, f); err != nil {
return err
}
}
return nil
}
// sealBucket will mine a bucket hashtable, write entries to a file, a
func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
// Produce perfect hash table for bucket.
bucket := &b.buckets[i]
if err := bucket.flush(); err != nil {
return err
}
const mineAttempts uint32 = 1000
entries, domain, err := bucket.mine(ctx, mineAttempts)
if err != nil {
return fmt.Errorf("failed to mine bucket %d: %w", i, err)
}
// Find current file length.
offset, err := f.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("failed to seek to EOF: %w", err)
}
if offset < 0 {
panic("os.File.Seek() < 0")
}
// Write header to file.
desc := BucketDescriptor{
BucketHeader: BucketHeader{
HashDomain: domain,
NumEntries: uint32(bucket.records),
HashLen: 3, // TODO remove hardcoded constant
FileOffset: uint64(offset),
},
Stride: 3 + intWidth(b.FileSize), // TODO remove hardcoded constant
OffsetWidth: intWidth(b.FileSize),
}
// Write entries to file.
wr := bufio.NewWriter(f)
entryBuf := make([]byte, desc.HashLen+intWidth(b.FileSize)) // TODO remove hardcoded constant
for _, entry := range entries {
desc.marshalEntry(entryBuf, entry)
if _, err := wr.Write(entryBuf[:]); err != nil {
return fmt.Errorf("failed to write record to index: %w", err)
}
}
if err := wr.Flush(); err != nil {
return fmt.Errorf("failed to flush bucket to index: %w", err)
}
// Write header to file.
if err := desc.BucketHeader.writeTo(f, uint(i)); err != nil {
return fmt.Errorf("failed to write bucket header %d: %w", i, err)
}
return nil
}
func (b *Builder) Close() error {
return os.RemoveAll(b.dir)
}
// tempBucket represents the "temporary bucket" file,
// a disk buffer containing a vector of key-value-tuples.
type tempBucket struct {
records uint
file *os.File
writer *bufio.Writer
}
// writeTuple performs a buffered write of a KV-tuple.
func (b *tempBucket) writeTuple(key []byte, value uint64) (err error) {
b.records++
var static [10]byte
binary.LittleEndian.PutUint16(static[0:2], uint16(len(key)))
binary.LittleEndian.PutUint64(static[2:10], value)
if _, err = b.writer.Write(static[:]); err != nil {
return err
}
_, err = b.writer.Write(key)
return
}
// flush empties the in-memory write buffer to the file.
func (b *tempBucket) flush() error {
if err := b.writer.Flush(); err != nil {
return fmt.Errorf("failed to flush writer: %w", err)
}
b.writer = nil
return nil
}
// mine repeatedly hashes the set of entries with different nonces.
//
// Returns a sorted list of hashtable entries upon finding a set of hashes without collisions.
// If a number of attempts was made without success, returns ErrCollision instead.
func (b *tempBucket) mine(ctx context.Context, attempts uint32) (entries []Entry, domain uint32, err error) {
entries = make([]Entry, b.records)
bitmap := make([]byte, 1<<21)
rd := bufio.NewReader(b.file)
for domain = uint32(0); domain < attempts; domain++ {
if err = ctx.Err(); err != nil {
return
}
// Reset bitmap
for i := range bitmap {
bitmap[i] = 0
}
// Reset reader
if _, err = b.file.Seek(0, io.SeekStart); err != nil {
return
}
rd.Reset(b.file)
if hashErr := hashBucket(rd, entries, bitmap, domain); errors.Is(hashErr, ErrCollision) {
continue
} else if hashErr != nil {
return nil, 0, hashErr
}
return // ok
}
return nil, domain, ErrCollision
}
// hashBucket reads and hashes entries from a temporary bucket file.
//
// Uses a 2^24 wide bitmap to detect collisions.
func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32) error {
// TODO Don't hardcode this, choose hash depth dynamically
mask := uint64(0xffffff)
// Scan provided reader for entries and hash along the way.
for i := range entries {
// Read next key from file (as defined by writeTuple)
var static [10]byte
if _, err := io.ReadFull(rd, static[:]); err != nil {
return err
}
keyLen := binary.LittleEndian.Uint16(static[0:2])
value := binary.LittleEndian.Uint64(static[2:10])
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return err
}
// Hash to entry
hash := EntryHash64(nonce, key) & mask
// Check for collision in bitmap
bi, bj := hash/8, hash%8
chunk := bitmap[bi]
if (chunk>>bj)&1 == 1 {
return ErrCollision
}
bitmap[bi] = chunk | (1 << bj)
// Export entry
entries[i] = Entry{
Hash: hash,
Value: value,
}
}
// Sort entries.
sort.Slice(entries, func(i, j int) bool {
return entries[i].Hash < entries[j].Hash
})
return nil
}
var ErrCollision = errors.New("hash collision")