From 206309dde9a9a0ac850b2532ef2508f2f1a672de Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 2 Sep 2023 20:14:53 +1000 Subject: [PATCH] feat: add DeferredCarWriter extracted from Lassie --- v2/storage/deferred/deferredcarwriter.go | 177 ++++++++++++++ v2/storage/deferred/deferredcarwriter_test.go | 231 ++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 v2/storage/deferred/deferredcarwriter.go create mode 100644 v2/storage/deferred/deferredcarwriter_test.go diff --git a/v2/storage/deferred/deferredcarwriter.go b/v2/storage/deferred/deferredcarwriter.go new file mode 100644 index 00000000..dfcd9527 --- /dev/null +++ b/v2/storage/deferred/deferredcarwriter.go @@ -0,0 +1,177 @@ +package deferred + +import ( + "context" + "io" + "os" + "sync" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + carstorage "github.com/ipld/go-car/v2/storage" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" + ipldstorage "github.com/ipld/go-ipld-prime/storage" +) + +type putCb struct { + cb func(int) + once bool +} + +var _ ipldstorage.WritableStorage = (*DeferredCarWriter)(nil) +var _ io.Closer = (*DeferredCarWriter)(nil) + +// DeferredCarWriter creates a write-only CAR either to an existing stream or +// to a file designated by a supplied path. CAR content (including header) +// only begins when the first Put() operation is performed. If the output is a +// file, it will be created when the first Put() operation is performed. +// DeferredCarWriter is threadsafe, and can be used concurrently. +// Closing the writer will close, but not delete, the underlying file. +// +// This utility is useful for cases where a CAR will be streamed but an error +// may occur before any content is written. In this case, the CAR file will not +// be created, and the output stream will not be written to. In the case of an +// HTTP server, this means that the client will not receive a CAR header only, +// instead there will be an opportunity to return a proper HTTP error to the +// client. +// +// The OnPut listener can be used to either track each Put() operation, or to +// just track the first Put() operation, which can be useful for setting +// HTTP headers in the assumption that the beginning of a valid CAR is about to +// be streamed. +type DeferredCarWriter struct { + root cid.Cid + outPath string + outStream io.Writer + + lk sync.Mutex + f *os.File + w carstorage.WritableCar + putCb []putCb + opts []carv2.Option +} + +// NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a +// file designated by the supplied path. The file will only be created on the +// first Put() operation. +// +// No options are supplied to carstorage.NewWritable by default, add +// the car.WriteAsCarV1(true) option to write a CARv1 file. +func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} +} + +// NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to +// the supplied stream. The stream will only be written to on the first Put() +// operation. +// +// The car.WriteAsCarV1(true) option will be supplied by default to +// carstorage.NewWritable as CARv2 is not a valid streaming format due to the +// header. +func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { + opts = append([]carv2.Option{carv2.WriteAsCarV1(true)}, opts...) + return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} +} + +// OnPut will call a callback when each Put() operation is started. The argument +// to the callback is the number of bytes being written. If once is true, the +// callback will be removed after the first call. +func (dcw *DeferredCarWriter) OnPut(cb func(int), once bool) { + if dcw.putCb == nil { + dcw.putCb = make([]putCb, 0) + } + dcw.putCb = append(dcw.putCb, putCb{cb: cb, once: once}) +} + +// Has returns false if the key was not already written to the CAR output. +func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error) { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + if dcw.w == nil { // shortcut, haven't written anything, don't even initialise + return false, nil + } + + writer, err := dcw.writer() + if err != nil { + return false, err + } + + return writer.Has(ctx, key) +} + +// Put writes the given content to the CAR output stream, creating it if it +// doesn't exist yet. +func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byte) error { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + if dcw.putCb != nil { + // call all callbacks, remove those that were only needed once + for i := 0; i < len(dcw.putCb); i++ { + cb := dcw.putCb[i] + cb.cb(len(content)) + if cb.once { + dcw.putCb = append(dcw.putCb[:i], dcw.putCb[i+1:]...) + i-- + } + } + } + + // first Put() call, initialise writer, which will write a CAR header + writer, err := dcw.writer() + if err != nil { + return err + } + + return writer.Put(ctx, key, content) +} + +// writer() +func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { + if dcw.w == nil { + outStream := dcw.outStream + if outStream == nil { + openedFile, err := os.OpenFile(dcw.outPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + dcw.f = openedFile + outStream = openedFile + } + w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, dcw.opts...) + if err != nil { + return nil, err + } + dcw.w = w + } + return dcw.w, nil +} + +// Close closes the underlying file, if one was created. +func (dcw *DeferredCarWriter) Close() error { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + err := dcw.w.Finalize() + + if dcw.f != nil { + defer func() { dcw.f = nil }() + err2 := dcw.f.Close() + if err == nil { + err = err2 + } + } + return err +} + +// BlockWriteOpener returns a BlockWriteOpener that operates on this storage. +func (dcw *DeferredCarWriter) BlockWriteOpener() linking.BlockWriteOpener { + return func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + wr, wrcommit, err := ipldstorage.PutStream(lctx.Ctx, dcw) + return wr, func(lnk ipld.Link) error { + return wrcommit(lnk.Binary()) + }, err + } +} diff --git a/v2/storage/deferred/deferredcarwriter_test.go b/v2/storage/deferred/deferredcarwriter_test.go new file mode 100644 index 00000000..790cde45 --- /dev/null +++ b/v2/storage/deferred/deferredcarwriter_test.go @@ -0,0 +1,231 @@ +package deferred_test + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "os" + "sync" + "testing" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + deferred "github.com/ipld/go-car/v2/storage/deferred" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +var rng = rand.New(rand.NewSource(3333)) +var rngLk sync.Mutex + +func TestDeferredCarWriterForPath(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + for version := 1; version <= 2; version++ { + t.Run(fmt.Sprintf("version=%d", version), func(t *testing.T) { + tmpFile := t.TempDir() + "/test.car" + + opts := []carv2.Option{} + if version == 1 { + opts = append(opts, carv2.WriteAsCarV1(true)) + } + cw := deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, opts...) + + _, err := os.Stat(tmpFile) + req.True(os.IsNotExist(err)) + + req.NoError(cw.Put(ctx, testCid1.KeyString(), testData1)) + req.NoError(cw.Put(ctx, testCid2.KeyString(), testData2)) + + stat, err := os.Stat(tmpFile) + req.NoError(err) + req.True(stat.Size() > int64(len(testData1)+len(testData2))) + + req.NoError(cw.Close()) + + // shouldn't be deleted + _, err = os.Stat(tmpFile) + req.NoError(err) + + r, err := os.Open(tmpFile) + req.NoError(err) + t.Cleanup(func() { r.Close() }) + carv2, err := carv2.NewBlockReader(r) + req.NoError(err) + + // compare CAR contents to what we wrote + req.Equal([]cid.Cid{testCid1}, carv2.Roots) + req.Equal(uint64(version), carv2.Version) + + blk, err := carv2.Next() + req.NoError(err) + req.Equal(testCid1, blk.Cid()) + req.Equal(testData1, blk.RawData()) + + blk, err = carv2.Next() + req.NoError(err) + req.Equal(testCid2, blk.Cid()) + req.Equal(testData2, blk.RawData()) + + _, err = carv2.Next() + req.ErrorIs(io.EOF, err) + }) + } +} + +func TestDeferredCarWriter(t *testing.T) { + for _, tc := range []string{"path", "stream"} { + tc := tc + t.Run(tc, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + testCid3, _ := randBlock() + + var cw *deferred.DeferredCarWriter + var buf bytes.Buffer + tmpFile := t.TempDir() + "/test.car" + + if tc == "path" { + cw = deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, carv2.WriteAsCarV1(true)) + _, err := os.Stat(tmpFile) + require.True(t, os.IsNotExist(err)) + } else { + cw = deferred.NewDeferredCarWriterForStream(testCid1, &buf) + require.Equal(t, buf.Len(), 0) + } + + has, err := cw.Has(ctx, testCid3.KeyString()) + require.NoError(t, err) + require.False(t, has) + + require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) + has, err = cw.Has(ctx, testCid1.KeyString()) + require.NoError(t, err) + require.True(t, has) + require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) + has, err = cw.Has(ctx, testCid1.KeyString()) + require.NoError(t, err) + require.True(t, has) + has, err = cw.Has(ctx, testCid2.KeyString()) + require.NoError(t, err) + require.True(t, has) + has, err = cw.Has(ctx, testCid3.KeyString()) + require.NoError(t, err) + require.False(t, has) + + if tc == "path" { + stat, err := os.Stat(tmpFile) + require.NoError(t, err) + require.True(t, stat.Size() > int64(len(testData1)+len(testData2))) + } else { + require.True(t, buf.Len() > len(testData1)+len(testData2)) + } + + require.NoError(t, cw.Close()) + + var rdr *carv2.BlockReader + if tc == "path" { + r, err := os.Open(tmpFile) + require.NoError(t, err) + rdr, err = carv2.NewBlockReader(r) + require.NoError(t, err) + t.Cleanup(func() { r.Close() }) + } else { + rdr, err = carv2.NewBlockReader(&buf) + require.NoError(t, err) + } + + // compare CAR contents to what we wrote + require.Equal(t, rdr.Roots, []cid.Cid{testCid1}) + require.Equal(t, rdr.Version, uint64(1)) + + blk, err := rdr.Next() + require.NoError(t, err) + require.Equal(t, blk.Cid(), testCid1) + require.Equal(t, blk.RawData(), testData1) + + blk, err = rdr.Next() + require.NoError(t, err) + require.Equal(t, blk.Cid(), testCid2) + require.Equal(t, blk.RawData(), testData2) + + _, err = rdr.Next() + require.ErrorIs(t, err, io.EOF) + }) + } +} + +func TestDeferredCarWriterPutCb(t *testing.T) { + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + var buf bytes.Buffer + cw := deferred.NewDeferredCarWriterForStream(testCid1, &buf) + + var pc1 int + cw.OnPut(func(ii int) { + switch pc1 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + case 1: + require.Equal(t, len(testData2), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc1++ + }, false) + var pc2 int + cw.OnPut(func(ii int) { + switch pc2 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + case 1: + require.Equal(t, len(testData2), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc2++ + }, false) + var pc3 int + cw.OnPut(func(ii int) { + switch pc3 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc3++ + }, true) + + require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) + require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) + require.NoError(t, cw.Close()) + + require.Equal(t, 2, pc1) + require.Equal(t, 2, pc2) + require.Equal(t, 1, pc3) +} + +func randBlock() (cid.Cid, []byte) { + data := make([]byte, 1024) + rngLk.Lock() + rng.Read(data) + rngLk.Unlock() + h, err := mh.Sum(data, mh.SHA2_512, -1) + if err != nil { + panic(err) + } + return cid.NewCidV1(cid.Raw, h), data +}