Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blockstore: GetMany blockstore method #492

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added

* `boxo/blockstore`:
* [GetMany blockstore implementation](https://github.com/ipfs/boxo/pull/492)
* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
* `boxo/bitswap/client`:
Expand Down
125 changes: 125 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,13 @@
HashOnRead(enabled bool)
}

// GetManyBlockstore is a blockstore interface that supports GetMany and PutMany methods using ds.TxnDatastore
type GetManyBlockstore interface {
Blockstore
PutMany(ctx context.Context, blocks []blocks.Block) error
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two recommendations:

  1. It seems like there's no need for []blocks.Block and []cid.Cid since Block contains a .Cid() method https://github.com/ipfs/go-block-format/blob/v0.2.0/blocks.go#L19-L25
  2. You may want to consider a streaming interface so that you don't have to buffer all the blocks in memory

If returning an asynchronous object (e.g. channel or iterator) might be worth taking a look at ipfs/kubo#4592 to make sure you don't run into some common pitfalls. With Go generics now iterators may also make this easier than it used to be.

}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +318,123 @@
return output, nil
}

// NewGetManyBlockstore returns a default GetManyBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewGetManyBlockstore(d ds.TxnDatastore, opts ...Option) GetManyBlockstore {
bs := &blockstore{
datastore: batchingTxnDatastoreStub{TxnDatastore: d},
}

for _, o := range opts {
o.f(bs)
}

Check warning on line 330 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L329-L330

Added lines #L329 - L330 were not covered by tests

if !bs.noPrefix {
bs.datastore = dsns.Wrap(bs.datastore, BlockPrefix)
d = dsns.WrapTxnDatastore(d, BlockPrefix)
}

gmbs := &getManyBlockstore{
blockstore: bs,
datastore: d,
}

return gmbs
}

type getManyBlockstore struct {
*blockstore
datastore ds.TxnDatastore
}

type batchingTxnDatastoreStub struct {
ds.BatchingFeature
ds.TxnDatastore
}

func (bs *getManyBlockstore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to not return the CID here given it's in the signature, but also it doesn't seem like []cid.Cid needs to be in the return signature

}

Check warning on line 360 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L357-L360

Added lines #L357 - L360 were not covered by tests

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}

Check warning on line 365 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L364-L365

Added lines #L364 - L365 were not covered by tests
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}

Check warning on line 379 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L378-L379

Added lines #L378 - L379 were not covered by tests
} else {
if bs.blockstore.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

Check warning on line 385 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L382-L385

Added lines #L382 - L385 were not covered by tests

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

Check warning on line 389 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L387-L389

Added lines #L387 - L389 were not covered by tests

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

Check warning on line 394 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L391-L394

Added lines #L391 - L394 were not covered by tests

blks = append(blks, blk)

Check warning on line 396 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L396

Added line #L396 was not covered by tests
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

Check warning on line 401 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L400-L401

Added lines #L400 - L401 were not covered by tests

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *getManyBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

Check warning on line 414 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L412-L414

Added lines #L412 - L414 were not covered by tests

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}

Check warning on line 419 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L418-L419

Added lines #L418 - L419 were not covered by tests
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.blockstore.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue

Check warning on line 426 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L426

Added line #L426 was not covered by tests
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}

Check warning on line 433 in blockstore/blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockstore/blockstore.go#L432-L433

Added lines #L432 - L433 were not covered by tests
}
return t.Commit(ctx)
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
122 changes: 122 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,126 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestGetManyBlockstorePutThenGetBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(bg, block)
if err != nil {
t.Fatal(err)
}

blockFromBlockstore, err := bs.Get(bg, block.Cid())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
32 changes: 17 additions & 15 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ require (
github.com/multiformats/go-multicodec v0.9.0
github.com/prometheus/client_golang v1.16.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0
go.opentelemetry.io/contrib/propagators/autoprop v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/sdk v1.19.0
)

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
Expand All @@ -52,7 +52,7 @@ require (
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -139,15 +139,14 @@ require (
go.opentelemetry.io/contrib/propagators/jaeger v1.15.0 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.15.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -157,16 +156,19 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

replace github.com/ipfs/boxo => ../

replace github.com/ipfs/go-datastore => github.com/vulcanize/go-datastore v0.6.1-internal-0.0.1
Loading