Skip to content

Commit

Permalink
feat: defer car writing until after first block
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Sep 2, 2023
1 parent 9e417e4 commit 6f04cff
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 14 deletions.
11 changes: 3 additions & 8 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package frisbii
import (
"bytes"
"context"
"fmt"
"io"

// codecs we care about
Expand All @@ -17,7 +16,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
carstorage "github.com/ipld/go-car/v2/storage"
"github.com/ipld/go-car/v2/storage/deferred"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand All @@ -32,11 +31,7 @@ func StreamCar(
out io.Writer,
request trustlessutils.Request,
) error {
carWriter, err := carstorage.NewWritable(out, []cid.Cid{request.Root}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(request.Duplicates))
if err != nil {
return fmt.Errorf("failed to create car writer: %w", err)
}

carWriter := deferred.NewDeferredCarWriterForStream(out, []cid.Cid{request.Root}, car.AllowDuplicatePuts(request.Duplicates))
requestLsys.StorageReadOpener = carPipe(requestLsys.StorageReadOpener, carWriter)

cfg := traversal.Config{Root: request.Root, Selector: request.Selector()}
Expand All @@ -52,7 +47,7 @@ func StreamCar(
return nil
}

func carPipe(orig linking.BlockReadOpener, car carstorage.WritableCar) linking.BlockReadOpener {
func carPipe(orig linking.BlockReadOpener, car *deferred.DeferredCarWriter) linking.BlockReadOpener {
return func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
r, err := orig(lc, lnk)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-unixfsnode v1.8.0
github.com/ipld/go-car/v2 v2.12.0
github.com/ipld/go-car/v2 v2.12.1-0.20230902103537-b12674b3b055
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-trustless-utils v0.0.0
github.com/ipld/ipld/specs v0.0.0-20230826120441-91918996e8eb
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ github.com/ipfs/go-unixfs v0.4.4 h1:D/dLBOJgny5ZLIur2vIXVQVW0EyDHdOMBDEhgHrt6rY=
github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU=
github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8=
github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs=
github.com/ipld/go-car/v2 v2.12.0 h1:4wpZwCEK2Th7lrVhkAio7fnxZb6COrSHxSz9xCR6FOo=
github.com/ipld/go-car/v2 v2.12.0/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo=
github.com/ipld/go-car/v2 v2.12.1-0.20230902103537-b12674b3b055 h1:XU67HCQO3g/l/f3aey6LdqlE93ALRe/sbGj0T5yviXU=
github.com/ipld/go-car/v2 v2.12.1-0.20230902103537-b12674b3b055/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo=
github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc=
github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s=
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 h1:QAI/Ridj0+foHD6epbxmB4ugxz9B4vmNdYSmQLGa05E=
Expand Down
16 changes: 15 additions & 1 deletion httpipfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"github.com/ipld/frisbii"
"github.com/ipld/go-car/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
trustlesshttp "github.com/ipld/go-trustless-utils/http"
trustlesspathing "github.com/ipld/ipld/specs/pkg-go/trustless-pathing"
"github.com/stretchr/testify/require"
)

func TestHttpIpfsHandler(t *testing.T) {
handler := frisbii.NewHttpIpfs(context.Background(), cidlink.DefaultLinkSystem())
lsys := cidlink.DefaultLinkSystem()
lsys.SetReadStorage(&CorrectedMemStore{Store: &memstore.Store{}})
handler := frisbii.NewHttpIpfs(context.Background(), lsys)
testServer := httptest.NewServer(handler)
defer testServer.Close()

Expand Down Expand Up @@ -63,6 +66,17 @@ func TestHttpIpfsHandler(t *testing.T) {
expectedStatusCode: http.StatusBadRequest,
expectedBody: "invalid Accept header; unsupported: \"applicaiton/json\"",
},
{
// special case where we get to start the request because everything
// is valid, but the block isn't in our blockstore; passing this
// depends on deferring writing the CAR output until after we've
// at least loaded the first block.
name: "block not found",
path: "/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi",
accept: trustlesshttp.RequestAcceptHeader(true),
expectedStatusCode: http.StatusInternalServerError,
expectedBody: "failed to load root node: failed to load root CID: ipld: could not find bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", // the 404 is from memstore, we won't see that with a proper linksys
},
} {
t.Run(testCase.name, func(t *testing.T) {
req := require.New(t)
Expand Down
6 changes: 4 additions & 2 deletions multireadablestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ type CorrectedMemStore struct {

func (cms *CorrectedMemStore) Get(ctx context.Context, key string) ([]byte, error) {
data, err := cms.Store.Get(ctx, key)
cid, _ := cid.Cast([]byte(key))
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
err = format.ErrNotFound{Cid: cid}
}
return data, err
}

func (cms *CorrectedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rc, err := cms.Store.GetStream(ctx, key)
cid, _ := cid.Cast([]byte(key))
if err != nil && err.Error() == "404" {
err = format.ErrNotFound{}
err = format.ErrNotFound{Cid: cid}
}
return rc, err
}
Expand Down

0 comments on commit 6f04cff

Please sign in to comment.