Skip to content

Commit

Permalink
Save cid lists in datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Feb 4, 2021
1 parent 765e173 commit e2032e1
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 2 deletions.
98 changes: 98 additions & 0 deletions core/coreunix/bigfile.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package coreunix

import (
"bytes"
"context"
"io"
"strings"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
multibase "github.com/multiformats/go-multibase"
"github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"
)

type ChunkingManifest struct {
StreamCid cid.Cid
ChunkedCid cid.Cid
Chunks []*ChunkingManifestChunk
}
Expand All @@ -19,9 +26,100 @@ type ChunkingManifestChunk struct {
Size uint64
}

var chunkAtl atlas.Atlas

func init() {
chunkAtl = atlas.MustBuild(
atlas.BuildEntry(ChunkingManifestChunk{}).StructMap().
AddField("ChunkCid", atlas.StructMapEntry{SerialName: "cid"}).
AddField("Offset", atlas.StructMapEntry{SerialName: "offset"}).
AddField("Size", atlas.StructMapEntry{SerialName: "size"}).
Complete(),
atlas.BuildEntry(cid.Cid{}).Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(func(live cid.Cid) ([]byte, error) { return live.MarshalBinary() })).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(func(serializable []byte) (cid.Cid, error) {
c := cid.Cid{}
err := c.UnmarshalBinary(serializable)
if err != nil {
return cid.Cid{}, err
}
return c, nil
})).Complete(),
)
}

func (c ChunkingManifestChunk) Serialize() ([]byte, error) {
b, err := cbor.MarshalAtlased(c, chunkAtl)
if err != nil {
return nil, err
}
return b, nil
}

func (c *ChunkingManifestChunk) Deserialize(data []byte) error {
return cbor.UnmarshalAtlased(cbor.DecodeOptions{}, data, c, chunkAtl)
}

func serializeChunks(chunks []*ChunkingManifestChunk) ([]byte, error) {
if len(chunks) == 0 {
return nil, nil
}
var b strings.Builder
for i := range chunks {
cborChunk, err := chunks[i].Serialize()
if err != nil {
return nil, err
}
encData, err := multibase.Encode(multibase.Base64url, cborChunk)
if err != nil {
// programming error; using unsupported encoding
panic(err.Error())
}
b.WriteString(encData)
b.WriteString(" ")
}
dataBlock := b.String()
return []byte(dataBlock[:len(dataBlock)-1]), nil
}

func deserializeChunks(data []byte) ([]*ChunkingManifestChunk, error) {
var chunks []*ChunkingManifestChunk
b := bytes.NewBuffer(data)

var done bool
for !done {
encStr, err := b.ReadString(byte(' '))
if err != nil {
if err != io.EOF {
return nil, err
}
if encStr == "" {
break
}
done = true
} else {
encStr = encStr[:len(encStr)-1]
}
_, cborChunk, err := multibase.Decode(encStr)
if err != nil {
return nil, err
}

chunk := &ChunkingManifestChunk{}
err = chunk.Deserialize(cborChunk)
if err != nil {
return nil, err
}

chunks = append(chunks, chunk)
}
return chunks, nil
}

func extractChunkingManifest(ctx context.Context, dagSvc ipld.DAGService, chunkedFileCid cid.Cid) (*ChunkingManifest, error) {
getLinks := dag.GetLinksWithDAG(dagSvc)
chunking := &ChunkingManifest{
// TODO: compute and set stream cid (aka "SID")
ChunkedCid: chunkedFileCid,
}
var verr error
Expand Down
41 changes: 41 additions & 0 deletions core/coreunix/bigfilestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package coreunix

import (
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsns "github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)

type bigFileStore struct {
dstore ds.Datastore
}

// bigFilePrefix namespaces big file datastores
var bigFilePrefix = ds.NewKey("bigfiles")

// NewBigFileStore creates a new bifFileStore
func NewBigFileStore(dstore ds.Datastore) *bigFileStore {
return &bigFileStore{
dstore: dsns.Wrap(dstore, bigFilePrefix),
}
}

func (b *bigFileStore) Put(streamCid cid.Cid, chunks []*ChunkingManifestChunk) error {
chunkData, err := serializeChunks(chunks)
if err != nil {
return err
}

dsk := dshelp.CidToDsKey(streamCid)
return b.dstore.Put(dsk, chunkData)
}

func (b *bigFileStore) Get(streamCid cid.Cid) ([]*ChunkingManifestChunk, error) {
data, err := b.dstore.Get(dshelp.CidToDsKey(streamCid))
if err != nil {
return nil, err
}

return deserializeChunks(data)
}
56 changes: 56 additions & 0 deletions core/coreunix/bigfilestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package coreunix

import (
"testing"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
)

func TestBigFileStorePutThenGet(t *testing.T) {
bfs := NewBigFileStore(ds_sync.MutexWrap(ds.NewMapDatastore()))

streamCid, err := cid.Parse("QmWQadpxHe1UgAMdkZ5tm7znzqiixwo5u9XLKCtPGLtdDs")
if err != nil {
panic(err)
}

cidStrs := []string{
"QmQPeNsJPyVWPFDVHb77w8G42Fvo15z4bG2X8D2GhfbSXc",
"QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1",
"QmcvcJRuxFUsM1deMwMzDL7fWB2A7rXhFRNrBAf81KyFuN",
}
chunks := make([]*ChunkingManifestChunk, len(cidStrs))

for i := range cidStrs {
c, err := cid.Parse(cidStrs[i])
if err != nil {
panic(err)
}
chunks[i] = &ChunkingManifestChunk{
ChunkCid: c,
Offset: uint64(i * 1024),
Size: 4096,
}
}

err = bfs.Put(streamCid, chunks)
if err != nil {
t.Fatal(err)
}

chunks2, err := bfs.Get(streamCid)
if err != nil {
t.Fatal(err)
}

if len(chunks2) != len(chunks) {
t.Fatal("wrong number of chunks returned")
}
for i := range chunks {
if !chunks2[i].ChunkCid.Equals(chunks[i].ChunkCid) {
t.Errorf("chunks2[%d] != chunks[%d]", i, i)
}
}
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-bitswap v0.3.4-0.20210203213610-ec4d6a9f0120
github.com/ipfs/go-bitswap v0.3.3
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4
github.com/ipfs/go-cid v0.0.7
Expand Down Expand Up @@ -55,7 +55,7 @@ require (
github.com/ipfs/go-pinning-service-http-client v0.1.0
github.com/ipfs/go-unixfs v0.2.4
github.com/ipfs/go-verifcid v0.0.1
github.com/ipfs/interface-go-ipfs-core v0.4.1-0.20210203224035-283af70309ec
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/ipld/go-car v0.1.1-0.20201015032735-ff6ccdc46acc
github.com/jbenet/go-is-domain v1.0.5
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
Expand Down Expand Up @@ -95,6 +95,7 @@ require (
github.com/multiformats/go-multihash v0.0.14
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
github.com/prometheus/client_golang v1.7.1
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ github.com/ipfs/go-bitswap v0.1.0/go.mod h1:FFJEf18E9izuCqUtHxbWEvq+reg7o4CW5wSA
github.com/ipfs/go-bitswap v0.1.2/go.mod h1:qxSWS4NXGs7jQ6zQvoPY3+NmOfHHG47mhkiLzBpJQIs=
github.com/ipfs/go-bitswap v0.1.3/go.mod h1:YEQlFy0kkxops5Vy+OxWdRSEZIoS7I7KDIwoa5Chkps=
github.com/ipfs/go-bitswap v0.1.8/go.mod h1:TOWoxllhccevbWFUR2N7B1MTSVVge1s6XSMiCSA4MzM=
github.com/ipfs/go-bitswap v0.3.3 h1:CrTO3OiOYFBcdliw074/C7T2QYHEOsPClgvR6RIYcO4=
github.com/ipfs/go-bitswap v0.3.3/go.mod h1:AyWWfN3moBzQX0banEtfKOfbXb3ZeoOeXnZGNPV9S6w=
github.com/ipfs/go-bitswap v0.3.4-0.20210203213610-ec4d6a9f0120 h1:EH6dTmz867wv1V2rwPyOsQ4SDRxyGbikX/PTlvMPBsM=
github.com/ipfs/go-bitswap v0.3.4-0.20210203213610-ec4d6a9f0120/go.mod h1:fjCW0ajzfqPml6HQGSiR+8NVwFPpgIItCyjG70BvCV0=
github.com/ipfs/go-block-format v0.0.1/go.mod h1:DK/YYcsSUIVAFNwo/KZCdIIbpN0ROH/baNLgayt4pFc=
Expand Down

0 comments on commit e2032e1

Please sign in to comment.