Skip to content

Commit

Permalink
fix: correct chunk position starting a block boundary (#3782)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Jun 5, 2019
1 parent e9b0724 commit 341fd2c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 56 deletions.
1 change: 1 addition & 0 deletions kythe/go/util/riegeli/BUILD
Expand Up @@ -28,6 +28,7 @@ go_test(
name = "riegeli_test",
srcs = ["riegeli_test.go"],
library = ":riegeli",
shard_count = 10,
deps = [
":riegeli_test_go_proto",
"//kythe/go/util/compare",
Expand Down
75 changes: 45 additions & 30 deletions kythe/go/util/riegeli/reader.go
Expand Up @@ -103,14 +103,9 @@ func (r *reader) Seek(pos int64) error {
return fmt.Errorf("error verifying file: %v", err)
}

blockStart := (pos / blockSize) * blockSize
if pos-blockStart < blockHeaderSize {
pos += blockHeaderSize
}

if pos < r.r.Position() || pos >= r.r.Position()+r.chunkSize {
// We're seeking outside of the current chunk.
if err := r.r.SeekToChunkContaining(pos); err != nil {
if err := r.r.SeekToChunkContaining(pos); err != nil && err != io.EOF {
return fmt.Errorf("failed to seek to enclosing chunk: %v", err)
}
r.recordReader = nil
Expand All @@ -135,21 +130,18 @@ func (r *reader) nextRecord() ([]byte, error) {

func (r *reader) ensureRecordReader() error {
if r.recordReader != nil && r.recordReader.Len() == 0 {
if err := r.recordReader.Close(); err != nil {
return fmt.Errorf("error closing record reader: %v", err)
}
r.recordReader = nil
}

for r.recordReader == nil {
c, size, err := r.r.Next()
c, chunkSize, err := r.r.Next()
if err != nil {
return err
} else if c.Header.NumRecords == 0 && c.Header.ChunkType != fileSignatureChunkType && c.Header.ChunkType != fileMetadataChunkType {
// ignore chunks with no records; even for unknown chunk types
continue
}
r.chunkSize = size
r.chunkSize = chunkSize

switch c.Header.ChunkType {
case fileSignatureChunkType:
Expand All @@ -165,11 +157,8 @@ func (r *reader) ensureRecordReader() error {
return fmt.Errorf("didn't find single RecordsMetadata record: found %d", rd.Len())
}
rec, err := rd.Next()
cErr := rd.Close()
if err != nil {
return fmt.Errorf("reading RecordsMetadata: %v", err)
} else if cErr != nil {
return fmt.Errorf("closing RecordsMetadata reader: %v", err)
}
r.metadata = new(rmpb.RecordsMetadata)
if err := proto.Unmarshal(rec, r.metadata); err != nil {
Expand Down Expand Up @@ -207,8 +196,6 @@ func verifySignature(c *chunk) error {

// A recordReader reads a finite stream of records.
type recordReader interface {
io.Closer

// Next reads and returns the next record. io.EOF is returned if no further
// records exist.
Next() ([]byte, error)
Expand All @@ -229,9 +216,6 @@ type fixedRecordReader struct {
index int
}

// Close implements part of the recordReader interface.
func (r *fixedRecordReader) Close() error { return nil }

// Len implements part of the recordReader interface.
func (r *fixedRecordReader) Len() int { return len(r.records) - r.index }

Expand All @@ -253,6 +237,8 @@ func (r *fixedRecordReader) Index() int { return r.index }
func (r *fixedRecordReader) Seek(index int) {
if index < 0 {
index = 0
} else if index >= len(r.records) {
index = len(r.records)
}
r.index = index
}
Expand Down Expand Up @@ -361,14 +347,25 @@ func (b *blockReader) Next() ([]byte, error) {
}

// Position returns the current position within the underlying ReadSeeker.
func (b *blockReader) Position() int64 { return b.position }
func (b *blockReader) Position() int64 {
// The blockReader bookkeeping only tracks positions outside of block headers.
// However, Riegeli considers the starting position of a chunk that begins
// immediately after a block header to be the start of the block.
if b.position%blockSize == blockHeaderSize {
return b.position - blockHeaderSize
}
return b.position
}

// Seek seeks to the the given position within the underlying ReadSeeker.
func (b *blockReader) Seek(pos int64) error {
blockStart := (pos / blockSize) * blockSize
if pos-blockStart < blockHeaderSize {
if pos == blockStart {
pos = blockStart + blockHeaderSize
} else if pos-blockStart < blockHeaderSize {
return fmt.Errorf("attempting to seek into block header: %d", pos)
} else if err := b.readBlock(blockStart); err != nil {
}
if err := b.readBlock(blockStart); err != nil {
return err
} else if _, err := b.buf.Seek(pos-(blockStart+blockHeaderSize), io.SeekStart); err != nil {
return err
Expand All @@ -394,15 +391,27 @@ func (b *blockReader) readBlock(blockStart int64) error {
return nil
}

// SeekToNextChunkInBlock seeks to the first chunk starting within the block
// SeekToNextChunkInBlock seeks to the first chunk starting from the block
// starting at the given offset.
func (b *blockReader) SeekToNextChunkInBlock(blockStart int64) error {
if err := b.readBlock(blockStart); err != nil {
return err
}
var offset int64
if b.header.PreviousChunk != 0 {
offset = int64(b.header.NextChunk) - blockHeaderSize
for {
if err := b.readBlock(blockStart); err != nil {
return err
}
if b.header.PreviousChunk == 0 {
// Block starts with a chunk
offset = 0
} else {
// Block interrupts a chunk
offset = int64(b.header.NextChunk) - blockHeaderSize
}

if offset < usableBlockSize {
break
}

blockStart += blockSize
}
if _, err := b.buf.Seek(offset, io.SeekStart); err != nil {
return err
Expand Down Expand Up @@ -443,6 +452,8 @@ func (c *chunkReader) Next() (*chunk, int64, error) {
}
chunkSize += int64(padding)
}
blockHeaders := blockHeaderSize * int64(interveningBlockHeaders(int(c.position), int(chunkSize)))
chunkSize += blockHeaders
return &chunk{Header: *h, Data: data}, chunkSize, err
}

Expand Down Expand Up @@ -471,10 +482,14 @@ func (c *chunkReader) SeekToChunkContaining(pos int64) error {
if err == io.EOF {
return io.EOF
} else if err != nil {
return fmt.Errorf("reading chunk header: %v", err)
return fmt.Errorf("reading chunk header at %d: %v", c.position, err)
}

nextChunk := c.position + chunkHeaderSize + int64(h.DataSize) + int64(paddingSize(int(c.position), h))
chunkSize := chunkHeaderSize + int64(h.DataSize) + int64(paddingSize(int(c.position), h))
blockHeaders := blockHeaderSize * int64(interveningBlockHeaders(int(c.position), int(chunkSize)))
chunkSize += blockHeaders

nextChunk := c.position + chunkSize
if pos < nextChunk {
// We're at the chunk containing the desired position.
break
Expand Down
19 changes: 11 additions & 8 deletions kythe/go/util/riegeli/riegeli_test.go
Expand Up @@ -313,29 +313,32 @@ func TestReaderSeekKnownPositions(t *testing.T) {

func TestReaderSeekAllPositions(t *testing.T) {
const N = 1e4
buf := writeStrings(t, &WriterOptions{}, N)

rd := NewReadSeeker(bytes.NewReader(buf.Bytes()))
buf := writeStrings(t, &WriterOptions{}, N).Bytes()
rd := NewReadSeeker(bytes.NewReader(buf))

// Ensure every byte position is seekable
var expected int
for i := 0; i < buf.Len(); i++ {
for i := 0; i < len(buf); i++ {
if err := rd.Seek(int64(i)); err != nil {
t.Fatalf("Error seeking to %d/%d: %v", i, buf.Len(), err)
t.Fatalf("Error seeking to %d/%d for %d: %v", i, len(buf), expected, err)
}
pos, err := rd.Position()
if err != nil {
t.Fatalf("Position error: %v", err)
}
rec, err := rd.Next()
if expected == N-1 {
if err != io.EOF {
t.Fatalf("Read past end of file at %d: %v %v", i, rec, err)
t.Fatalf("Read past end of file at %d (%v): %v %v", i, pos, rec, err)
}
} else if err != nil {
t.Fatalf("Read error at %d/%d: %v; expected: %d", i, buf.Len(), err, expected)
t.Fatalf("Read error at %d/%d: %v; expected: %d", i, len(buf), err, expected)
}

if expected != N-1 && string(rec) != fmt.Sprintf("%d", expected) {
expected++
if string(rec) != fmt.Sprintf("%d", expected) {
t.Fatalf("At %d/%d found: %s; expected: %d;", i, buf.Len(), hex.EncodeToString(rec), expected)
t.Fatalf("At %d/%d found: %s; expected: %d;", i, len(buf), string(rec), expected)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion kythe/go/util/riegeli/util.go
Expand Up @@ -17,6 +17,8 @@
package riegeli

import (
"fmt"

"github.com/minio/highwayhash"
)

Expand Down Expand Up @@ -96,12 +98,14 @@ const (
)

func interveningBlockHeaders(pos, size int) int {
if pos%blockSize == blockHeaderSize {
panic(fmt.Errorf("invalid chunk boundary: %d", pos))
}
return (size + (pos+usableBlockSize-1)%blockSize) / usableBlockSize
}

func paddingSize(pos int, h *chunkHeader) int {
size := chunkHeaderSize + int(h.DataSize)
size += interveningBlockHeaders(pos, size)
if int(h.NumRecords) <= size {
return 0
}
Expand Down
27 changes: 10 additions & 17 deletions kythe/go/util/riegeli/writer.go
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"log"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -313,15 +314,17 @@ func newRecordChunkWriter(opts *WriterOptions) (*recordChunkWriter, error) {
}, nil
}

// WriteTo implements the io.WriterTo interface for chunkHeaders.
func (h *chunkHeader) WriteTo(w io.Writer) (int, error) {
// Marshal writes the chunk header to the given buffer.
func (h *chunkHeader) Marshal(buf []byte) {
if len(buf) != chunkHeaderSize {
log.Panicf("wrong chunk header buffer size: %d != %d", len(buf), chunkHeaderSize)
}
// header_hash (8 bytes) — hash of the rest of the header
// data_size (8 bytes) — size of data
// data_hash (8 bytes) — hash of data
// chunk_type (1 byte) — determines how to interpret data
// num_records (7 bytes) — number of records after decoding
// decoded_data_size (8 bytes) — sum of record sizes after decoding
var buf [chunkHeaderSize]byte
binary.LittleEndian.PutUint64(buf[8:16], h.DataSize)
copy(buf[16:], h.DataHash[:])
buf[24] = byte(h.ChunkType)
Expand All @@ -332,24 +335,14 @@ func (h *chunkHeader) WriteTo(w io.Writer) (int, error) {
binary.LittleEndian.PutUint64(buf[32:40], h.DecodedDataSize)
hash := hashBytes(buf[8:])
binary.LittleEndian.PutUint64(buf[:8], hash)
return w.Write(buf[:])
}

// WriteTo writes the chunk to w, given its starting position within w.
func (c *chunk) WriteTo(w *blockWriter, pos int) (int, error) {
binary.LittleEndian.PutUint64(c.Header.DataHash[:], hashBytes(c.Data))
// TODO(schroederc): reuse buffers
var buf bytes.Buffer
if _, err := c.Header.WriteTo(&buf); err != nil {
return 0, err
}
(&buf).Write(c.Data)
padding := paddingSize(pos, &c.Header)
for i := 0; i < padding; i++ {
(&buf).WriteByte(0)
}
if buf.Len() != chunkHeaderSize+len(c.Data)+padding {
return 0, fmt.Errorf("bad chunk size: %v", buf.Len())
}
return w.WriteChunk(buf.Bytes())
buf := make([]byte, chunkHeaderSize+len(c.Data)+paddingSize(pos, &c.Header))
c.Header.Marshal(buf[:chunkHeaderSize])
copy(buf[chunkHeaderSize:], c.Data)
return w.WriteChunk(buf)
}

0 comments on commit 341fd2c

Please sign in to comment.