Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blockstore: GetMany blockstore method #492

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added

* `boxo/blockstore`:
* [GetMany blockstore implementation](https://github.com/ipfs/boxo/pull/492)
* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
* `boxo/bitswap/client`:
Expand Down
229 changes: 229 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,13 @@
HashOnRead(enabled bool)
}

// TxnBlockstore is a blockstore interface that supports GetMany and PutMany methods using ds.TxnDatastore
type TxnBlockstore interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TxnBlockstore is wrong because the blockstore does not provide transactions, GetManyBlockstore was fine mb.

Blockstore
PutMany(ctx context.Context, blocks []blocks.Block) error
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two recommendations:

  1. It seems like there's no need for []blocks.Block and []cid.Cid since Block contains a .Cid() method https://github.com/ipfs/go-block-format/blob/v0.2.0/blocks.go#L19-L25
  2. You may want to consider a streaming interface so that you don't have to buffer all the blocks in memory

If returning an asynchronous object (e.g. channel or iterator) might be worth taking a look at ipfs/kubo#4592 to make sure you don't run into some common pitfalls. With Go generics now iterators may also make this easier than it used to be.

}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +318,227 @@
return output, nil
}

// TxnBlockstoreOption is a txnBlockstore option implementation
type TxnBlockstoreOption struct {
f func(bs *txnBlockstore)
}

// NewTxnBlockstore returns a default TxnBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewTxnBlockstore(d ds.TxnDatastore, opts ...TxnBlockstoreOption) TxnBlockstore {
bs := &txnBlockstore{
datastore: d,
}

for _, o := range opts {
o.f(bs)
}

Check warning on line 335 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L334-L335

Added lines #L334 - L335 were not covered by tests

if !bs.noPrefix {
bs.datastore = dsns.WrapTxnDatastore(bs.datastore, BlockPrefix)
}
return bs
}

type txnBlockstore struct {
datastore ds.TxnDatastore

rehash atomic.Bool
writeThrough bool
noPrefix bool
}

func (bs *txnBlockstore) HashOnRead(enabled bool) {
bs.rehash.Store(enabled)

Check warning on line 352 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L351-L352

Added lines #L351 - L352 were not covered by tests
}

func (bs *txnBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
logger.Error("undefined cid in blockstore")
return nil, ipld.ErrNotFound{Cid: k}
}
bdata, err := bs.datastore.Get(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return nil, ipld.ErrNotFound{Cid: k}
}
if err != nil {
return nil, err
}
if bs.rehash.Load() {
rbcid, err := k.Prefix().Sum(bdata)
if err != nil {
return nil, err
}

Check warning on line 371 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L355-L371

Added lines #L355 - L371 were not covered by tests

if !rbcid.Equals(k) {
return nil, ErrHashMismatch
}

Check warning on line 375 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L373-L375

Added lines #L373 - L375 were not covered by tests

return blocks.NewBlockWithCid(bdata, rbcid)

Check warning on line 377 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L377

Added line #L377 was not covered by tests
}
return blocks.NewBlockWithCid(bdata, k)

Check warning on line 379 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L379

Added line #L379 was not covered by tests
}

func (bs *txnBlockstore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to not return the CID here given it's in the signature, but also it doesn't seem like []cid.Cid needs to be in the return signature

}

Check warning on line 387 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L384-L387

Added lines #L384 - L387 were not covered by tests

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}

Check warning on line 392 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L391-L392

Added lines #L391 - L392 were not covered by tests
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}

Check warning on line 406 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L405-L406

Added lines #L405 - L406 were not covered by tests
} else {
if bs.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

Check warning on line 412 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L409-L412

Added lines #L409 - L412 were not covered by tests

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

Check warning on line 416 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L414-L416

Added lines #L414 - L416 were not covered by tests

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

Check warning on line 421 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L418-L421

Added lines #L418 - L421 were not covered by tests

blks = append(blks, blk)

Check warning on line 423 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L423

Added line #L423 was not covered by tests
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

Check warning on line 428 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L427-L428

Added lines #L427 - L428 were not covered by tests

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *txnBlockstore) Put(ctx context.Context, block blocks.Block) error {
k := dshelp.MultihashToDsKey(block.Cid().Hash())

// Has is cheaper than Put, so see if we already have it
if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
return nil // already stored.
}

Check warning on line 445 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L437-L445

Added lines #L437 - L445 were not covered by tests
}
return bs.datastore.Put(ctx, k, block.RawData())

Check warning on line 447 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L447

Added line #L447 was not covered by tests
}

func (bs *txnBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

Check warning on line 454 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L452-L454

Added lines #L452 - L454 were not covered by tests

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}

Check warning on line 459 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L458-L459

Added lines #L458 - L459 were not covered by tests
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue

Check warning on line 466 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L466

Added line #L466 was not covered by tests
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}

Check warning on line 473 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L472-L473

Added lines #L472 - L473 were not covered by tests
}
return t.Commit(ctx)
}

func (bs *txnBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
return bs.datastore.Has(ctx, dshelp.MultihashToDsKey(k.Hash()))

Check warning on line 479 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L478-L479

Added lines #L478 - L479 were not covered by tests
}

func (bs *txnBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
size, err := bs.datastore.GetSize(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return -1, ipld.ErrNotFound{Cid: k}
}
return size, err

Check warning on line 487 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L482-L487

Added lines #L482 - L487 were not covered by tests
}

func (bs *txnBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
return bs.datastore.Delete(ctx, dshelp.MultihashToDsKey(k.Hash()))

Check warning on line 491 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L490-L491

Added lines #L490 - L491 were not covered by tests
}

// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *txnBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := bs.datastore.Query(ctx, q)
if err != nil {
return nil, err
}

Check warning on line 505 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L498-L505

Added lines #L498 - L505 were not covered by tests

output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()

Check warning on line 512 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L507-L512

Added lines #L507 - L512 were not covered by tests

for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
logger.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}

Check warning on line 522 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L514-L522

Added lines #L514 - L522 were not covered by tests

// need to convert to key.Key using key.KeyFromDsKey.
bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
if err != nil {
logger.Warnf("error parsing key from binary: %s", err)
continue

Check warning on line 528 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L525-L528

Added lines #L525 - L528 were not covered by tests
}
k := cid.NewCidV1(cid.Raw, bk)
select {
case <-ctx.Done():
return
case output <- k:

Check warning on line 534 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L530-L534

Added lines #L530 - L534 were not covered by tests
}
}
}()

return output, nil

Check warning on line 539 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L539

Added line #L539 was not covered by tests
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
104 changes: 104 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,108 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewTxnBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewTxnBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewTxnBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewTxnBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
Loading