diff --git a/v2/block_reader.go b/v2/block_reader.go index 1ceefc5..c5be096 100644 --- a/v2/block_reader.go +++ b/v2/block_reader.go @@ -1,6 +1,7 @@ package car import ( + "errors" "fmt" "io" @@ -23,6 +24,7 @@ type BlockReader struct { // Used internally only, by BlockReader.Next during iteration over blocks. r io.Reader offset uint64 + v1offset uint64 readerSize int64 opts Options } @@ -80,7 +82,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) { if _, err := rs.Seek(int64(v2h.DataOffset)-PragmaSize-HeaderSize, io.SeekCurrent); err != nil { return nil, err } - br.offset = uint64(v2h.DataOffset) + br.v1offset = uint64(v2h.DataOffset) + br.offset = br.v1offset br.readerSize = int64(v2h.DataOffset + v2h.DataSize) // Set br.r to a LimitReader reading from r limited to dataSize. @@ -96,6 +99,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) { return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version) } br.Roots = header.Roots + hs, _ := carv1.HeaderSize(header) + br.offset += hs default: // Otherwise, error out with invalid version since only versions 1 or 2 are expected. return nil, fmt.Errorf("invalid car version: %d", br.Version) @@ -136,10 +141,22 @@ func (br *BlockReader) Next() (blocks.Block, error) { return blocks.NewBlockWithCid(data, c) } +// BlockMetadata contains metadata about a block's section in a CAR file/stream. +// +// There are two offsets for the block data which will be the same if the +// original CAR is a CARv1, but will differ if the original CAR is a CARv2. In +// the case of a CARv2, SourceOffset will be the offset from the beginning of +// the file/steam, and Offset will be the offset from the beginning of the CARv1 +// payload container within the CARv2. +// +// Offset is useful for index generation which requires an offset from the CARv1 +// payload; while SourceOffset is useful for direct block reads out of the +// source file/stream regardless of version. type BlockMetadata struct { cid.Cid - Offset uint64 - Size uint64 + Offset uint64 // Offset of the block data in the container CARv1 + SourceOffset uint64 // SourceOffset is the offset of block data in the source file/stream + Size uint64 } // SkipNext jumps over the next block, returning metadata about what it is (the CID, offset, and size). @@ -148,24 +165,33 @@ type BlockMetadata struct { // If the underlying reader used by the BlockReader is actually a ReadSeeker, this method will attempt to // seek over the underlying data rather than reading it into memory. func (br *BlockReader) SkipNext() (*BlockMetadata, error) { - sctSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize) + sectionSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize) if err != nil { return nil, err } - - if sctSize == 0 { - _, _, err := cid.CidFromBytes([]byte{}) + if sectionSize == 0 { + _, _, err := cid.CidFromBytes([]byte{}) // generate zero-byte CID error + if err == nil { + panic("expected zero-byte CID error") + } return nil, err } - cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sctSize))) + lenSize := uint64(varint.UvarintSize(sectionSize)) + + cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sectionSize))) if err != nil { return nil, err } - blkSize := sctSize - uint64(cidSize) + blockSize := sectionSize - uint64(cidSize) + blockOffset := br.offset + lenSize + uint64(cidSize) + + // move our reader forward; either by seeking or slurping + if brs, ok := br.r.(io.ReadSeeker); ok { - // carv1 and we don't know the size, so work it out and cache it + // carv1 and we don't know the size, so work it out and cache it so we + // can use it to determine over-reads if br.readerSize == -1 { cur, err := brs.Seek(0, io.SeekCurrent) if err != nil { @@ -180,42 +206,37 @@ func (br *BlockReader) SkipNext() (*BlockMetadata, error) { return nil, err } } - // seek. - finalOffset, err := brs.Seek(int64(blkSize), io.SeekCurrent) + + // seek forward past the block data + finalOffset, err := brs.Seek(int64(blockSize), io.SeekCurrent) if err != nil { return nil, err } - if finalOffset != int64(br.offset)+int64(sctSize)+int64(varint.UvarintSize(sctSize)) { - return nil, fmt.Errorf("unexpected length") + if finalOffset != int64(br.offset)+int64(lenSize)+int64(sectionSize) { + return nil, errors.New("unexpected length") } if finalOffset > br.readerSize { return nil, io.ErrUnexpectedEOF } - br.offset = uint64(finalOffset) - return &BlockMetadata{ - c, - uint64(finalOffset) - sctSize - uint64(varint.UvarintSize(sctSize)), - blkSize, - }, nil - } - - // read to end. - readCnt, err := io.CopyN(io.Discard, br.r, int64(blkSize)) - if err != nil { - if err == io.EOF { - return nil, io.ErrUnexpectedEOF + } else { // just a reader, we need to slurp the block bytes + readCnt, err := io.CopyN(io.Discard, br.r, int64(blockSize)) + if err != nil { + if err == io.EOF { + return nil, io.ErrUnexpectedEOF + } + return nil, err + } + if readCnt != int64(blockSize) { + return nil, errors.New("unexpected length") } - return nil, err - } - if readCnt != int64(blkSize) { - return nil, fmt.Errorf("unexpected length") } - origOffset := br.offset - br.offset += uint64(varint.UvarintSize(sctSize)) + sctSize + + br.offset = blockOffset + blockSize return &BlockMetadata{ - c, - origOffset, - blkSize, + Cid: c, + Offset: blockOffset - br.v1offset, + SourceOffset: blockOffset, + Size: blockSize, }, nil } diff --git a/v2/block_reader_test.go b/v2/block_reader_test.go index ca6c240..c4f6c7b 100644 --- a/v2/block_reader_test.go +++ b/v2/block_reader_test.go @@ -2,12 +2,14 @@ package car_test import ( "bytes" + "crypto/rand" "encoding/hex" "fmt" "io" "os" "testing" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/internal/carv1" @@ -229,6 +231,153 @@ func TestMaxHeaderLength(t *testing.T) { require.EqualError(t, err, "invalid header data, length of read beyond allowable maximum") } +func TestBlockReader(t *testing.T) { + req := require.New(t) + + // prepare a CARv1 with 100 blocks + roots := []cid.Cid{cid.MustParse("bafyrgqhai26anf3i7pips7q22coa4sz2fr4gk4q4sqdtymvvjyginfzaqewveaeqdh524nsktaq43j65v22xxrybrtertmcfxufdam3da3hbk")} + blks := make([]struct { + block blocks.Block + dataOffset uint64 + }, 100) + v1buf := new(bytes.Buffer) + carv1.WriteHeader(&carv1.CarHeader{Roots: roots, Version: 1}, v1buf) + vb := make([]byte, 2) + for i := 0; i < 100; i++ { + blk := randBlock(100 + i) // we should cross the varint two-byte boundary in here somewhere + vn := varint.PutUvarint(vb, uint64(len(blk.Cid().Bytes())+len(blk.RawData()))) + n, err := v1buf.Write(vb[:vn]) + req.NoError(err) + req.Equal(n, vn) + n, err = v1buf.Write(blk.Cid().Bytes()) + req.NoError(err) + req.Equal(len(blk.Cid().Bytes()), n) + blks[i] = struct { + block blocks.Block + dataOffset uint64 + }{block: blk, dataOffset: uint64(v1buf.Len())} + n, err = v1buf.Write(blk.RawData()) + req.NoError(err) + req.Equal(len(blk.RawData()), n) + } + + v2buf := new(bytes.Buffer) + n, err := v2buf.Write(carv2.Pragma) + req.NoError(err) + req.Equal(len(carv2.Pragma), n) + v2Header := carv2.NewHeader(uint64(v1buf.Len())) + ni, err := v2Header.WriteTo(v2buf) + req.NoError(err) + req.Equal(carv2.HeaderSize, int(ni)) + n, err = v2buf.Write(v1buf.Bytes()) + req.NoError(err) + req.Equal(v1buf.Len(), n) + + v2padbuf := new(bytes.Buffer) + n, err = v2padbuf.Write(carv2.Pragma) + req.NoError(err) + req.Equal(len(carv2.Pragma), n) + v2Header = carv2.NewHeader(uint64(v1buf.Len())) + // pad with 100 bytes + v2Header.DataOffset += 100 + ni, err = v2Header.WriteTo(v2padbuf) + req.NoError(err) + req.Equal(carv2.HeaderSize, int(ni)) + v2padbuf.Write(make([]byte, 100)) + n, err = v2padbuf.Write(v1buf.Bytes()) + req.NoError(err) + req.Equal(v1buf.Len(), n) + + for _, testCase := range []struct { + name string + reader func() io.Reader + v1offset uint64 + }{ + { + name: "v1", + reader: func() io.Reader { return &readerOnly{bytes.NewReader(v1buf.Bytes())} }, + }, + { + name: "v2", + reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2buf.Bytes())} }, + v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize), + }, + { + name: "v2 padded", + reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2padbuf.Bytes())} }, + v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100, + }, + { + name: "v1 w/ReadSeeker", + reader: func() io.Reader { return bytes.NewReader(v1buf.Bytes()) }, + }, + { + name: "v2 w/ReadSeeker", + reader: func() io.Reader { return bytes.NewReader(v2buf.Bytes()) }, + v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize), + }, + { + name: "v2 padded w/ReadSeeker", + reader: func() io.Reader { return bytes.NewReader(v2padbuf.Bytes()) }, + v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + req := require.New(t) + + car, err := carv2.NewBlockReader(testCase.reader()) + req.NoError(err) + req.ElementsMatch(roots, car.Roots) + + for i := 0; i < 100; i++ { + blk, err := car.Next() + req.NoError(err) + req.Equal(blks[i].block.Cid(), blk.Cid()) + req.Equal(blks[i].block.RawData(), blk.RawData()) + } + _, err = car.Next() + req.ErrorIs(err, io.EOF) + + car, err = carv2.NewBlockReader(testCase.reader()) + req.NoError(err) + req.ElementsMatch(roots, car.Roots) + + for i := 0; i < 100; i++ { + blk, err := car.SkipNext() + req.NoError(err) + req.Equal(blks[i].block.Cid(), blk.Cid) + req.Equal(uint64(len(blks[i].block.RawData())), blk.Size) + req.Equal(blks[i].dataOffset, blk.Offset, "block #%d", i) + req.Equal(blks[i].dataOffset+testCase.v1offset, blk.SourceOffset) + } + _, err = car.Next() + req.ErrorIs(err, io.EOF) + }) + } +} + +type readerOnly struct { + r io.Reader +} + +func (r readerOnly) Read(b []byte) (int, error) { + return r.r.Read(b) +} + +func randBlock(l int) blocks.Block { + data := make([]byte, l) + rand.Read(data) + h, err := mh.Sum(data, mh.SHA2_512, -1) + if err != nil { + panic(err) + } + blk, err := blocks.NewBlockWithCid(data, cid.NewCidV1(cid.Raw, h)) + if err != nil { + panic(err) + } + return blk +} + func requireReaderFromPath(t *testing.T, path string) io.Reader { f, err := os.Open(path) require.NoError(t, err)