diff --git a/.golangci.sourceinc.yaml b/.golangci.sourceinc.yaml index 8620778af9..d51640452e 100644 --- a/.golangci.sourceinc.yaml +++ b/.golangci.sourceinc.yaml @@ -447,7 +447,7 @@ linters-settings: govet: # report about shadowed variables - check-shadowing: true + check-shadowing: false # settings per analyzer settings: diff --git a/core/store.go b/core/store.go index 7f45afc049..81a391c573 100644 --- a/core/store.go +++ b/core/store.go @@ -17,8 +17,19 @@ import ( // MultiStore is an interface wrapper around the 3 main types of stores needed for // MerkleCRDTs type MultiStore interface { + Rootstore() DSReaderWriter + + // Datastore is a wrapped root DSReaderWriter + // under the /data namespace Datastore() DSReaderWriter + + // Headstore is a wrapped root DSReaderWriter + // under the /head namespace Headstore() DSReaderWriter + + // DAGstore is a wrapped root DSReaderWriter + // as a Blockstore, embedded into a DAGStore + // under the /blocks namespace DAGstore() DAGStore } diff --git a/core/txn.go b/core/txn.go index c25c01434b..16f7cd2848 100644 --- a/core/txn.go +++ b/core/txn.go @@ -18,4 +18,5 @@ type Txn interface { iterable.IterableTxn MultiStore Systemstore() DSReaderWriter + IsBatch() bool } diff --git a/core/type.go b/core/type.go index a4a7a93007..8c62120c18 100644 --- a/core/type.go +++ b/core/type.go @@ -33,3 +33,9 @@ var ( byte(3): COMPOSITE, } ) + +// reserved names +const ( + HEAD = "_head" + COMPOSITE_ID = "C" +) diff --git a/db/base/descriptions.go b/db/base/descriptions.go index a8899f89e3..4299b51c93 100644 --- a/db/base/descriptions.go +++ b/db/base/descriptions.go @@ -10,8 +10,10 @@ package base import ( + "errors" "fmt" + ds "github.com/ipfs/go-datastore" "github.com/sourcenetwork/defradb/core" ) @@ -19,13 +21,25 @@ const ( ObjectMarker = byte(0xff) // @todo: Investigate object marker values ) +// Reserved system index identifiers. Any index that defra requires +// to enable any internal feature, requires a reserved index ID. +// Changing an existing system index ID is A BREAKING CHANGE. +// Adding new IDs incrementally is not breaking. +const ( + PrimaryIndex int32 = 0 + + // VersionIndex is a Reserved secondary index that represents a + // versioned instance of the PrimaryIndex + VersionIndex int32 = -1 * iota +) + // CollectionDescription describes a Collection and // all its associated metadata type CollectionDescription struct { Name string ID uint32 Schema SchemaDescription - Indexes []IndexDescription + Indexes []IndexDescription // @todo: New system reserved indexes are NEGATIVE. Maybe we need a map here } // IDString returns the collection ID as a string @@ -44,6 +58,32 @@ func (col CollectionDescription) GetField(name string) (FieldDescription, bool) return FieldDescription{}, false } +func (c CollectionDescription) GetIndexDocKey(key ds.Key, indexID uint32) ds.Key { + return ds.NewKey(c.IDString()).ChildString(fmt.Sprint(indexID)).Child(key) +} + +func (c CollectionDescription) GetPrimaryIndexDocKey(key ds.Key) ds.Key { + return c.GetIndexDocKey(key, c.Indexes[0].ID) +} + +func (c CollectionDescription) GetFieldKey(key ds.Key, fieldName string) ds.Key { + if !c.Schema.IsEmpty() { + return key.ChildString(fmt.Sprint(c.Schema.GetFieldKey(fieldName))) + } + return key.ChildString(fieldName) +} + +func (c CollectionDescription) GetPrimaryIndexDocKeyForCRDT(ctype core.CType, key ds.Key, fieldName string) (ds.Key, error) { + switch ctype { + case core.COMPOSITE: + return c.GetPrimaryIndexDocKey(key).ChildString(core.COMPOSITE_ID), nil + case core.LWW_REGISTER: + fieldKey := c.GetFieldKey(key, fieldName) + return c.GetPrimaryIndexDocKey(fieldKey), nil + } + return ds.Key{}, errors.New("Invalid CRDT type") +} + // IndexDescription describes an Index on a Collection // and its assocatied metadata. type IndexDescription struct { @@ -99,6 +139,15 @@ func (sd SchemaDescription) IsEmpty() bool { return len(sd.Fields) == 0 } +func (sd SchemaDescription) GetFieldKey(fieldName string) uint32 { + for _, field := range sd.Fields { + if field.Name == fieldName { + return uint32(field.ID) + } + } + return uint32(0) +} + type FieldKind uint8 // Note: These values are serialized and persisted in the database, avoid modifying existing values diff --git a/db/base/keys.go b/db/base/keys.go new file mode 100644 index 0000000000..fcd6a6811c --- /dev/null +++ b/db/base/keys.go @@ -0,0 +1,14 @@ +package base + +import ( + ds "github.com/ipfs/go-datastore" +) + +var ( + // Individual Store Keys + RootStoreKey = ds.NewKey("/db") + SystemStoreKey = RootStoreKey.ChildString("/system") + DataStoreKey = RootStoreKey.ChildString("/data") + HeadStoreKey = RootStoreKey.ChildString("/heads") + BlockStoreKey = RootStoreKey.ChildString("/blocks") +) diff --git a/db/base/maker.go b/db/base/maker.go index 636dca8754..6475d1348d 100644 --- a/db/base/maker.go +++ b/db/base/maker.go @@ -15,6 +15,7 @@ import ( ) var ( + ROOT = "/db" SYSTEM = "/db/system" DATA = "/db/data" BLOCK = "/db/block" @@ -49,3 +50,11 @@ func MakeCollectionSystemKey(name string) core.Key { func MakeSchemaSystemKey(name string) core.Key { return core.Key{Key: schemaNs.ChildString(name)} } + +// MakeIndexPrefixKeyRaw is the same as MakeIndexPrefixKey but it takes as inputs +// the raw datastore keys, instead of the collection and index objects respectively. +func MakeIndexPrefixKeyRaw(collectionID ds.Key, indexID ds.Key) core.Key { + return core.Key{Key: core.NewKey(DATA). + Child(collectionID). + Child(indexID)} +} diff --git a/db/collection.go b/db/collection.go index 7db4ae8e78..6d864fc66b 100644 --- a/db/collection.go +++ b/db/collection.go @@ -426,10 +426,12 @@ func (c *Collection) save(ctx context.Context, txn *Txn, doc *document.Document) doc.Clean() }) - links = append(links, core.DAGLink{ + link := core.DAGLink{ Name: k, Cid: c, - }) + } + links = append(links, link) + // fmt.Println("links:", link) } } // Update CompositeDAG @@ -441,8 +443,17 @@ func (c *Collection) save(ctx context.Context, txn *Txn, doc *document.Document) if err != nil { return nil } - _, err = c.saveValueToMerkleCRDT(ctx, txn, c.getPrimaryIndexDocKey(dockey), core.COMPOSITE, buf, links) - return err + + headCID, err := c.saveValueToMerkleCRDT(ctx, txn, c.getPrimaryIndexDocKey(dockey), core.COMPOSITE, buf, links) + if err != nil { + return nil + } + + txn.OnSuccess(func() { + doc.SetHead(headCID) + }) + // fmt.Printf("final: %s\n\n", docCid) + return nil } // Delete will attempt to delete a document by key @@ -627,10 +638,18 @@ func (c *Collection) commitImplicitTxn(ctx context.Context, txn *Txn) error { return nil } +func (c *Collection) GetIndexDocKey(key ds.Key, indexID uint32) ds.Key { + return c.getIndexDocKey(key, indexID) +} + func (c *Collection) getIndexDocKey(key ds.Key, indexID uint32) ds.Key { return c.colIDKey.ChildString(fmt.Sprint(indexID)).Child(key) } +func (c *Collection) GetPrimaryIndexDocKey(key ds.Key) ds.Key { + return c.getPrimaryIndexDocKey(key) +} + func (c *Collection) getPrimaryIndexDocKey(key ds.Key) ds.Key { return c.getIndexDocKey(key, c.PrimaryIndex().ID) } diff --git a/db/collection_update.go b/db/collection_update.go index d500f713be..9e1ed912fd 100644 --- a/db/collection_update.go +++ b/db/collection_update.go @@ -25,7 +25,7 @@ import ( "github.com/sourcenetwork/defradb/query/graphql/parser" "github.com/sourcenetwork/defradb/query/graphql/planner" - "github.com/fxamacker/cbor/v2" + cbor "github.com/fxamacker/cbor/v2" ds "github.com/ipfs/go-datastore" ) @@ -185,7 +185,6 @@ func (c *Collection) updateWithKey(ctx context.Context, txn *Txn, key key.DocKey } func (c *Collection) updateWithKeys(ctx context.Context, txn *Txn, keys []key.DocKey, updater interface{}, opts ...client.UpdateOpt) (*client.UpdateResult, error) { - fmt.Println("updating keys:", keys) patch, err := parseUpdater(updater) if err != nil { return nil, err @@ -384,6 +383,18 @@ func (c *Collection) applyMerge(ctx context.Context, txn *Txn, doc map[string]in return err } + // handle Int/Float case + // JSON is annoying in that it represents all numbers + // as Float64s. So our merge object contains float64s + // even for fields defined as Ints, which causes issues + // when we serialize that in CBOR. To generate the delta + // payload. + // So lets just make sure ints are ints + // ref: https://play.golang.org/p/djThEqGXtvR + if fd.Kind == base.FieldKind_INT { + merge[mfield] = int64(mval.(float64)) + } + val := document.NewCBORValue(fd.Typ, cval) fieldKey := c.getFieldKey(key, mfield) c, err := c.saveDocValue(ctx, txn, c.getPrimaryIndexDocKey(fieldKey), val) diff --git a/db/db.go b/db/db.go index 63e9539297..b89ec5c9f8 100644 --- a/db/db.go +++ b/db/db.go @@ -17,6 +17,7 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/merkle/crdt" "github.com/sourcenetwork/defradb/query/graphql/planner" "github.com/sourcenetwork/defradb/query/graphql/schema" @@ -37,13 +38,6 @@ var ( ErrOptionsEmpty = errors.New("Empty options configuration provided") - // Individual Store Keys - rootStoreKey = ds.NewKey("/db") - systemStoreKey = rootStoreKey.ChildString("/system") - dataStoreKey = rootStoreKey.ChildString("/data") - headStoreKey = rootStoreKey.ChildString("/heads") - blockStoreKey = rootStoreKey.ChildString("/blocks") - log = logging.Logger("defra.db") ) @@ -90,10 +84,10 @@ type DB struct { // NewDB creates a new instance of the DB using the given options func NewDB(rootstore ds.Batching, options interface{}) (*DB, error) { log.Debug("loading: internal datastores") - systemstore := namespace.Wrap(rootstore, systemStoreKey) - datastore := namespace.Wrap(rootstore, dataStoreKey) - headstore := namespace.Wrap(rootstore, headStoreKey) - blockstore := namespace.Wrap(rootstore, blockStoreKey) + systemstore := namespace.Wrap(rootstore, base.SystemStoreKey) + datastore := namespace.Wrap(rootstore, base.DataStoreKey) + headstore := namespace.Wrap(rootstore, base.HeadStoreKey) + blockstore := namespace.Wrap(rootstore, base.BlockStoreKey) dagstore := store.NewDAGStore(blockstore) crdtFactory := crdt.DefaultFactory.WithStores(datastore, headstore, dagstore) diff --git a/db/fetcher/dag.go b/db/fetcher/dag.go index 36f164528c..58aef0007e 100644 --- a/db/fetcher/dag.go +++ b/db/fetcher/dag.go @@ -68,6 +68,11 @@ func (hf *HeadFetcher) Start(ctx context.Context, txn core.Txn, spans core.Spans } var err error + if hf.kvIter != nil { + if err := hf.kvIter.Close(); err != nil { + return err + } + } hf.kvIter, err = txn.Headstore().Query(ctx, q) if err != nil { return err @@ -142,6 +147,14 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) { return hf.cid, nil } +func (hf *HeadFetcher) Close() error { + if hf.kvIter == nil { + return nil + } + + return hf.kvIter.Close() +} + /* // List returns the list of current heads plus the max height. // @todo Document Heads.List function diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index f60f49b38b..805d0bdab4 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -22,20 +22,22 @@ import ( "github.com/sourcenetwork/defradb/document" ) -/* -var DocumentFetcher DocumentFetcher = &Fetcher{} -DocumentFetcher.Init() -*/ -// type DocumentFetcher interface { -// Init(col *base.CollectionDescription, index *base.IndexDescription, fields []*base.FieldDescription, reverse bool) error -// Start(txn core.Txn, spans core.Spans) error -// FetchNext() (*document.EncodedDocument, error) -// FetchNextDecoded() (*document.Document, error) -// } - -// var ( -// _ DocumentFetcher = &DocFetcher{} -// ) +// Fetcher is the interface for collecting documents +// from the underlying data store. It handles all +// the key/value scanning, aggregation, and document +// encoding. +type Fetcher interface { + Init(col *base.CollectionDescription, index *base.IndexDescription, fields []*base.FieldDescription, reverse bool) error + Start(ctx context.Context, txn core.Txn, spans core.Spans) error + FetchNext(ctx context.Context) (*document.EncodedDocument, error) + FetchNextDecoded(ctx context.Context) (*document.Document, error) + FetchNextMap(ctx context.Context) ([]byte, map[string]interface{}, error) + Close() error +} + +var ( + _ Fetcher = (*DocumentFetcher)(nil) +) type DocumentFetcher struct { col *base.CollectionDescription @@ -76,7 +78,18 @@ func (df *DocumentFetcher) Init(col *base.CollectionDescription, index *base.Ind df.initialized = true df.doc = new(document.EncodedDocument) df.doc.Schema = &col.Schema + + if df.kvResultsIter != nil { + if err := df.kvResultsIter.Close(); err != nil { + return err + } + } df.kvResultsIter = nil + if df.kvIter != nil { + if err := df.kvIter.Close(); err != nil { + return err + } + } df.kvIter = nil df.schemaFields = make(map[uint32]base.FieldDescription) @@ -112,7 +125,6 @@ func (df *DocumentFetcher) Start(ctx context.Context, txn core.Txn, spans core.S } } } - df.indexKey = nil df.spans = uniqueSpans df.curSpanIndex = -1 diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go new file mode 100644 index 0000000000..e4ce0a4883 --- /dev/null +++ b/db/fetcher/versioned.go @@ -0,0 +1,434 @@ +// Copyright 2021 Source Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package fetcher + +import ( + "container/list" + "context" + "fmt" + + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/db/base" + "github.com/sourcenetwork/defradb/merkle/crdt" + "github.com/sourcenetwork/defradb/store" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + format "github.com/ipfs/go-ipld-format" + dag "github.com/ipfs/go-merkledag" + "github.com/pkg/errors" +) + +var ( + // interface check + _ Fetcher = (*VersionedFetcher)(nil) +) + +// HistoryFetcher is like the normal DocumentFetcher, except it is able to traverse +// to a specific version in the documents history graph, and return the fetched +// state at that point exactly. +// +// Given the following Document state graph +// +// {} --> V1 --> V2 --> V3 --> V4 +// ^ ^ +// | | +// Target Version Current State +// +// +// A regular DocumentFetcher fetches and returns the state at V4, but the +// VersionsedFetcher would step backwards through the update graph, recompose +// the state at the "Target Version" V1, and return the state at that point. +// +// This is achieved by reconstructing the target state using the given MerkleCRDT +// DAG. Given the Target Version CID, we collect all the individual delta nodes +// in the MerkleDAG, until we reach the initial (genesis) state. +// +// Transient/Ephemeral datastores are intanciated for the lifetime of the +// traversal query, on a per object basis. This should be a basic map based +// ds.Datastore, abstracted into a DSReaderWriter. +// +// The goal of the VersionedFetcher is to implement the same external API/Interface as +// the DocumentFetcher, and to have it return the encoded/decoded document as +// defined in the version, so that it can be used as a drop in replacement within +// the scanNode query planner system. +// +// Current limitations: +// - We can only return a single record from an VersionedFetcher +// instance. +// - We can't query into related sub objects (at the moment, as related objects +// ids aren't in the state graphs. +// - Probably more... +// +// Future optimizations: +// - Incremental checkpoint/snapshotting +// - Reverse traversal (starting from the current state, and working backwards) +// - Create a effecient memory store for in-order traversal (BTree, etc) +// +// Note: Should we transition this state traversal into the CRDT objects themselves, and not +// within a new fetcher? +type VersionedFetcher struct { + // embed the regular doc fetcher + *DocumentFetcher + + txn core.Txn + ctx context.Context + + // Transient version store + root ds.Datastore + store core.Txn + + key core.Key + version cid.Cid + + queuedCids *list.List + + col *base.CollectionDescription + // @todo index *base.IndexDescription + mCRDTs map[uint32]crdt.MerkleCRDT +} + +// Init + +// Start + +func (vf *VersionedFetcher) Init(col *base.CollectionDescription, index *base.IndexDescription, fields []*base.FieldDescription, reverse bool) error { + vf.col = col + vf.queuedCids = list.New() + vf.mCRDTs = make(map[uint32]crdt.MerkleCRDT) + + // run the DF init, VersionedFetchers only supports the Primary (0) index + vf.DocumentFetcher = new(DocumentFetcher) + return vf.DocumentFetcher.Init(col, &col.Indexes[0], fields, reverse) + +} + +// Start serializes the correct state accoriding to the Key and CID +func (vf *VersionedFetcher) Start(ctx context.Context, txn core.Txn, spans core.Spans) error { + if vf.col == nil { + return errors.New("VersionedFetcher cannot be started without a CollectionDescription") + } + + if len(spans) != 1 { + return errors.New("spans must contain only a single entry") + } + + // For the VersionedFetcher, the spans needs to be in the format + // Span{Start: DocKey, End: CID} + dk := spans[0].Start() + cidRaw := spans[0].End() + if len(dk.String()) == 0 { + return errors.New("spans missing start DocKey") + } else if len(cidRaw.String()) == 0 { + return errors.New("span missing end CID") + } + + // decode cidRaw from core.Key to cid.Cid + // need to remove '/' prefix from the core.Key + + c, err := cid.Decode(cidRaw.String()[1:]) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("Failed to decode CID for VersionedFetcher: %s", cidRaw.String())) + } + + vf.txn = txn + vf.ctx = ctx + vf.key = dk + vf.version = c + + // create store + root := ds.NewMapDatastore() + vf.root = root + vf.store, err = store.NewTxnFrom(ctx, root, false) // were going to discard and nuke this later + if err != nil { + return err + } + + if err := vf.seekTo(vf.version); err != nil { + return fmt.Errorf("Failed seeking state to %v: %w", c, err) + } + + return vf.DocumentFetcher.Start(ctx, vf.store, nil) +} + +func (vf *VersionedFetcher) Rootstore() ds.Datastore { + return vf.root +} + +// Start a fetcher with the needed info (cid embedded in a span) + +/* +1. Init with DocKey (VersionedFetched is scoped to a single doc) +2. - Create transient stores (head, data, block) +3. Start with a given Txn and CID span set (length 1 for now) +4. call traverse with the target cid +5. + +err := VersionFetcher.Start(txn, spans) { + vf.traverse(cid) +} +*/ + +// SeekTo exposes the private seekTo +func (vf *VersionedFetcher) SeekTo(ctx context.Context, c cid.Cid) error { + err := vf.seekTo(c) + if err != nil { + return err + } + + return vf.DocumentFetcher.Start(ctx, vf.store, nil) +} + +// seekTo seeks to the given CID version by steping through the CRDT +// state graph from the beginning to the target state, creating the +// serialized state at the given version. It starts by seeking to the +// closest existing state snapshot in the transient Versioned stores, +// which on the first run is 0. It seeks by iteratively jumping through +// the state graph via the `_head` link. +func (vf *VersionedFetcher) seekTo(c cid.Cid) error { + // reinit the queued cids list + vf.queuedCids = list.New() + + // recursive step through the graph + err := vf.seekNext(c, true) + if err != nil { + return err + } + + // after seekNext is completed, we have a populated + // queuedCIDs list, and all the necessary + // blocks in our local store + // If we are using a batch store, then we need to commit + if vf.store.IsBatch() { + if err := vf.store.Commit(vf.ctx); err != nil { + return err + } + } + + // if we have a queuedCIDs length of 0, means we don't need + // to do any more state serialization + + // for cid in CIDs { + /// + /// vf.merge(cid) + /// // Note: we need to determine what state we are "Merging" + /// // into. This isn't necessary for the base case where we only + /// // are concerned with generating the Versioned state for a single + /// // CID, but for multiple CIDs, or if we reuse the transient store + /// // as a cache, we need to swap out states to the parent of the current + /// // CID. + // } + for ccv := vf.queuedCids.Front(); ccv != nil; ccv = ccv.Next() { + cc, ok := ccv.Value.(cid.Cid) + if !ok { + return errors.New("queueudCids contains an invalid CID value") + } + err := vf.merge(cc) + if err != nil { + return fmt.Errorf("Failed merging state: %w", err) + } + } + + // If we are using a batch store, then we need to commit + if vf.store.IsBatch() { + if err := vf.store.Commit(vf.ctx); err != nil { + return err + } + } + + // we now have all the the required state stored + // in our transient local Version_Index, we now need to + // transfer it to the Primary_Index. + + // Once all values are transferred, exit with no errors + // Any future operation can resume using the current PrimaryIndex + // which is actually the serialized state of the CRDT graph at + // the exact version + + return nil +} + +// seekNext is the recursive iteration step of seekTo, its goal is +// to build the queuedCids list, and to transfer the required +// blocks from the global to the local store. +func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error { + // check if cid block exists in the global store, handle err + + // @todo: Find an effecient way to determine if a CID is a member of a + // DocKey State graph + // @body: We could possibly append the DocKey to the CID either as a + // child key, or an instance on the CID key. + + hasLocalBlock, err := vf.store.DAGstore().Has(vf.ctx, c) + if err != nil { + return fmt.Errorf("(version fetcher) failed to find block in blockstore: %w", err) + } + // skip if we already have it locally + if hasLocalBlock { + return nil + } + + blk, err := vf.txn.DAGstore().Get(vf.ctx, c) + if err != nil { + return fmt.Errorf("(version fetcher) failed to get block in blockstore: %w", err) + } + + // store the block in the local (transient store) + if err := vf.store.DAGstore().Put(vf.ctx, blk); err != nil { + return fmt.Errorf("(version fetcher) failed to write block to blockstore : %w", err) + } + + // add the CID to the queuedCIDs list + if topParent { + vf.queuedCids.PushFront(c) + } + + // decode the block + nd, err := dag.DecodeProtobuf(blk.RawData()) + if err != nil { + return fmt.Errorf("(version fetcher) failed to decode protobuf: %w", err) + } + + // subDAGLinks := make([]cid.Cid, 0) // @todo: set slice size + l, err := nd.GetNodeLink(core.HEAD) + // ErrLinkNotFound is fine, it just means we have no more head links + if err != nil && err != dag.ErrLinkNotFound { + return fmt.Errorf("(version fetcher) failed to get node link from DAG: %w", err) + } + + // only seekNext on parent if we have a HEAD link + if err != dag.ErrLinkNotFound { + err := vf.seekNext(l.Cid, true) + if err != nil { + return err + } + } + + // loop over links and ignore head links + for _, l := range nd.Links() { + if l.Name == core.HEAD { + continue + } + + err := vf.seekNext(l.Cid, false) + if err != nil { + return err + } + } + + return nil +} + +// merge in the state of the IPLD Block identified by CID c into the +// VersionedFetcher state. +// Requires the CID to already exists in the DAGStore. +// This function only works for merging Composite MerkleCRDT objects. +// +// First it checks for the existence of the block, +// then extracts the delta object and priority from the block +// gets the existing MerkleClock instance, or creates one. +// +// Currently we assume the CID is a CompositeDAG CRDT node +func (vf *VersionedFetcher) merge(c cid.Cid) error { + // get node + nd, err := vf.getDAGNode(c) + if err != nil { + return err + } + + // first arg 0 is the index for the composite DAG in the mCRDTs cache + if err := vf.processNode(0, nd, core.COMPOSITE, ""); err != nil { + return err + } + + // handle subgraphs + // loop over links and ignore head links + for _, l := range nd.Links() { + if l.Name == core.HEAD { + continue + } + + // get node + subNd, err := vf.getDAGNode(l.Cid) + if err != nil { + return err + } + + fieldID := vf.col.Schema.GetFieldKey(l.Name) + if fieldID == uint32(0) { + return fmt.Errorf("Invalid sub graph field name: %s", l.Name) + } + // @todo: Right now we ONLY handle LWW_REGISTER, need to swith on this and get CType from descriptions + if err := vf.processNode(fieldID, subNd, core.LWW_REGISTER, l.Name); err != nil { + return err + } + } + + return nil +} + +func (vf *VersionedFetcher) processNode(crdtIndex uint32, nd format.Node, ctype core.CType, fieldName string) (err error) { + // handle CompositeDAG + mcrdt, exists := vf.mCRDTs[crdtIndex] + if !exists { + key, err := vf.col.GetPrimaryIndexDocKeyForCRDT(ctype, vf.key.Key, fieldName) + if err != nil { + return err + } + mcrdt, err = crdt.DefaultFactory.InstanceWithStores(vf.store, ctype, key) + if err != nil { + return err + } + vf.mCRDTs[crdtIndex] = mcrdt + // compositeClock = compMCRDT + } + + delta, err := mcrdt.DeltaDecode(nd) + if err != nil { + return err + } + + height := delta.GetPriority() + _, err = mcrdt.Clock().ProcessNode(vf.ctx, nil, nd.Cid(), height, delta, nd) + return err +} + +func (vf *VersionedFetcher) getDAGNode(c cid.Cid) (*dag.ProtoNode, error) { + // get Block + blk, err := vf.store.DAGstore().Get(vf.ctx, c) + if err != nil { + return nil, fmt.Errorf("Failed to get DAG Node: %w", err) + } + + // get node + // decode the block + return dag.DecodeProtobuf(blk.RawData()) +} + +func (vf *VersionedFetcher) Close() error { + vf.store.Discard(vf.ctx) + if err := vf.root.Close(); err != nil { + return err + } + + return vf.DocumentFetcher.Close() +} + +func NewVersionedSpan(dockey core.Key, version cid.Cid) core.Spans { + return core.Spans{core.NewSpan(dockey, core.NewKey(version.String()))} +} + +// func createMerkleCRDT(ctype core.CType, key ds.Key, store core.MultiStore) (crdt.MerkleCRDT, error) { +// key := vf.col.GetPrimaryIndexDocKey(vf.key.Key).ChildString(core.COMPOSITE_ID) +// compCRDT, err = crdt.DefaultFactory.InstanceWithStores(vf.store, core.COMPOSITE, key) +// if err != nil { +// return err +// } +// } diff --git a/db/fetcher/versioned_test.go b/db/fetcher/versioned_test.go new file mode 100644 index 0000000000..1b7b0e8cea --- /dev/null +++ b/db/fetcher/versioned_test.go @@ -0,0 +1,515 @@ +package fetcher_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/db" + "github.com/sourcenetwork/defradb/db/base" + "github.com/sourcenetwork/defradb/db/fetcher" + "github.com/sourcenetwork/defradb/document" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/assert" +) + +type update struct { + payload []byte + diffOps map[string]interface{} + cid string +} + +var ( + testStates = []update{ + { + payload: []byte(`{ + "name": "Alice", + "age": 31, + "points": 100, + "verified": true + }`), + // cid: "Qmcv2iU3myUBwuFCHe3w97sBMMER2FTY2rpbNBP6cqWb4S", + cid: "bafybeicxbqhopabjap7f6vvjx3yhv3rpwmhg6i424c4wzifvjm2tva4pam", + }, + { + payload: []byte(`{ + "name": "Pete", + "age": 31, + "points": 99.9, + "verified": true + }`), + diffOps: map[string]interface{}{ + "name": "Pete", + "points": 99.9, + }, + // cid: "QmPgnQvhPuLGwVU4ZEcbRy7RNCxSkeS72eKwXusUrAEEXR", + cid: "bafybeihzfah32hooz7enj77wzav2exnzkrwloy3ktucc4kqjo6xjgubtvm", + }, + { + payload: []byte(`{ + "name": "Pete", + "age": 22, + "points": 99.9, + "verified": false + }`), + diffOps: map[string]interface{}{ + "verified": false, + "age": 22, + }, + // cid: "QmRpMfTzExGrXat5W9uCAEtnSpRTvWBcd1hBYNWVPdN9Xh", + cid: "bafybeicvdptyfvhjznlwnp6mki46evmavrv7hhdyv4aegk6gcmtfvxizri", + }, + { + payload: []byte(`{ + "name": "Pete", + "age": 22, + "points": 129.99, + "verified": false + }`), + diffOps: map[string]interface{}{ + "points": 129.99, + }, + // cid: "QmRWYwKadjWqHLrzPKd7MdS4EoQuT2RzWVTaBxxVkeSjFH", + cid: "bafybeiefvibtjye66scc2nrnhvjpl7hhf4hvqrk2gyswnxvdwivrww42ly", + }, + } +) + +func newMemoryDB() (*db.DB, error) { + rootstore := ds.NewMapDatastore() + return db.NewDB(rootstore, struct{}{}) +} + +func TestVersionedFetcherInit(t *testing.T) { + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) +} + +func TestVersionedFetcherStart(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // db.PrintDump() + // assert.True(t, false) // force printing dump + + // c, err := cid.Decode(testStates[3].cid) + // require.NoError(t, err) + + // require.NoError(t, err) + // fmt.Println(bl) + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[3].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + // err = vf.SeekTo(version) + // assert.NoError(t, err) + + // store.PrintStore(vf.Rootstore()) + // assert.True(t, false) +} + +func TestVersionedFetcherNextMap(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // assert.True(t, false) // force printing dump + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[0].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + _, doc, err := vf.FetchNextMap(ctx) + assert.NoError(t, err) + + var state map[string]interface{} + err = json.Unmarshal(testStates[0].payload, &state) + assert.NoError(t, err) + + compareVersionedDocs(t, doc, state) + + // fmt.Println(doc) + // assert.True(t, false) + +} + +func TestVersionedFetcherNextMapV1(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // assert.True(t, false) // force printing dump + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[1].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + _, doc, err := vf.FetchNextMap(ctx) + assert.NoError(t, err) + + var state map[string]interface{} + err = json.Unmarshal(testStates[1].payload, &state) + assert.NoError(t, err) + + compareVersionedDocs(t, doc, state) + + // fmt.Println(doc) + // assert.True(t, false) + +} + +func TestVersionedFetcherNextMapV2(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // assert.True(t, false) // force printing dump + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[2].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + _, doc, err := vf.FetchNextMap(ctx) + assert.NoError(t, err) + + var state map[string]interface{} + err = json.Unmarshal(testStates[2].payload, &state) + assert.NoError(t, err) + + compareVersionedDocs(t, doc, state) + + // fmt.Println(doc) + // assert.True(t, false) + +} + +func TestVersionedFetcherNextMapV3(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // assert.True(t, false) // force printing dump + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[3].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + _, doc, err := vf.FetchNextMap(ctx) + assert.NoError(t, err) + + var state map[string]interface{} + err = json.Unmarshal(testStates[3].payload, &state) + assert.NoError(t, err) + + compareVersionedDocs(t, doc, state) + + // fmt.Println(doc) + // assert.True(t, false) +} + +func TestVersionedFetcherIncrementalSeekTo(t *testing.T) { + ctx := context.Background() + db, err := newMemoryDB() + assert.NoError(t, err) + + col, err := newTestCollectionWithSchema(db) + assert.NoError(t, err) + + err = createDocUpdates(col) + assert.NoError(t, err) + + // assert.True(t, false) // force printing dump + + vf := &fetcher.VersionedFetcher{} + desc := col.Description() + err = vf.Init(&desc, nil, nil, false) + assert.NoError(t, err) + + txn, err := db.NewTxn(ctx, false) + assert.NoError(t, err) + + key := core.NewKey("bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d") + version, err := cid.Decode(testStates[0].cid) + assert.NoError(t, err) + + span := fetcher.NewVersionedSpan(key, version) + err = vf.Start(ctx, txn, span) + assert.NoError(t, err) + + // loop over updates so we can seek to them + // skip first (create) + for _, update := range testStates[1:] { + fmt.Println("Seeking to:", update.cid) + c, err := cid.Decode(update.cid) + assert.NoError(t, err) + + err = vf.SeekTo(ctx, c) + assert.NoError(t, err) + + _, doc, err := vf.FetchNextMap(ctx) + assert.NoError(t, err) + + fmt.Println("fetched doc:", doc) + + var state map[string]interface{} + err = json.Unmarshal(update.payload, &state) + assert.NoError(t, err) + + compareVersionedDocs(t, doc, state) + } +} + +// func buildTestState() (*db.DB, *db.Collection, error) { + +// } + +func compareVersionedDocs(t *testing.T, doc, expected map[string]interface{}) { + for k, v := range doc { + if k == "_key" { + continue + } + // make sure our floats are converted + if f, ok := expected[k].(float64); ok { + if f == float64(int64(f)) { + expected[k] = int64(f) + } + } + + if i, ok := v.(uint64); ok { + if i == uint64(int64(i)) { + v = int64(i) + } + } + assert.Equal(t, expected[k], v) + } +} + +func createDocUpdates(col *db.Collection) error { + // col, err := newTestCollectionWithSchema(db) + // if err != ni + + // dockey: bae-ed7f0bd5-3f5b-5e93-9310-4b2e71ac460d + // cid: Qmcv2iU3myUBwuFCHe3w97sBMMER2FTY2rpbNBP6cqWb4S + // sub: + // -age: QmSom35RYVzYTE7nGsudvomv1pi9ffjEfSFsPZgQRM92v1 + // -name: QmeKjH2iuNjbWqZ5Lx9hSCiZDeCQvb4tHNyGm99dvB69M9 + // -points: Qmd7mvZJkL9uQoC2YZsQE3ijmyGAaHgSnZMvLY4H71Vmaz + // -verified: QmNRQwWjTBTDfAFUHkG8yuKmtbprYQtGs4jYxGJ5fCfXtn + // testJSONObj := []byte(`{ + // "name": "Alice", + // "age": 31, + // "points": 100, + // "verified": true + // }`) + + // doc, err := document.NewFromJSON(testJSONObj) + // if err != nil { + // return err + // } + + // if err := col.Save(doc); err != nil { + // return err + // } + + // // update #1 + // // cid: QmPgnQvhPuLGwVU4ZEcbRy7RNCxSkeS72eKwXusUrAEEXR + // // sub: + // // - name: QmZzL7AUq1L9whhHvVfbBJho6uAJQnAZWEFWYsTD2PgCKM + // // - points: Qmejouu71QPjTue2P1gLnrzqApa8cU6NPdBoGrCQdpSC1Q + // doc.Set("name", "Pete") + // doc.Set("points", 99.9) + // if err := col.Update(doc); err != nil { + // return err + // } + + // // update #2 + // // cid: QmRpMfTzExGrXat5W9uCAEtnSpRTvWBcd1hBYNWVPdN9Xh + // // sub: + // // - verified: QmNTLb5ChDx3HjeAMuWVm7wmgjbXPzDRdPNnzwRqG71T2Q + // // - age: QmfJTRSXy1x4VxaVDqSa35b3sXQkCAppPSwfhwKGkV2zez + // doc.Set("verified", false) + // doc.Set("age", 22) + // if err := col.Update(doc); err != nil { + // return err + // } + + // // update #3 + // // cid: QmRWYwKadjWqHLrzPKd7MdS4EoQuT2RzWVTaBxxVkeSjFH + // // sub: + // // - points: QmQGkkF1xpLkMFWtG5fNTGs6VwbNXESrtG2Mj35epLU8do + // doc.Set("points", 129.99) + // err = col.Update(doc) + + var doc *document.Document + var err error + ctx := context.Background() + for i, update := range testStates { + if i == 0 { // create + doc, err = document.NewFromJSON(update.payload) + if err != nil { + return err + } + if err := col.Save(ctx, doc); err != nil { + return err + } + } else { + if update.diffOps == nil { + return errors.New("Expecting diffOp for update") + } + + for k, v := range update.diffOps { + doc.Set(k, v) + } + err = col.Update(ctx, doc) + if err != nil { + return err + } + } + fmt.Printf("Update #%v cid %v\n", i+1, doc.Head()) + } + + return err +} + +func newTestCollectionWithSchema(d *db.DB) (*db.Collection, error) { + desc := base.CollectionDescription{ + Name: "users", + Schema: base.SchemaDescription{ + Fields: []base.FieldDescription{ + { + Name: "_key", + Kind: base.FieldKind_DocKey, + }, + { + Name: "name", + Kind: base.FieldKind_STRING, + Typ: core.LWW_REGISTER, + }, + { + Name: "age", + Kind: base.FieldKind_INT, + Typ: core.LWW_REGISTER, + }, + { + Name: "verified", + Kind: base.FieldKind_BOOL, + Typ: core.LWW_REGISTER, + }, + { + Name: "points", + Kind: base.FieldKind_FLOAT, + Typ: core.LWW_REGISTER, + }, + }, + }, + } + + ctx := context.Background() + col, err := d.CreateCollection(ctx, desc) + return col.(*db.Collection), err +} diff --git a/db/tests/utils.go b/db/tests/utils.go index 6d2ecb927d..eb816741e3 100644 --- a/db/tests/utils.go +++ b/db/tests/utils.go @@ -192,6 +192,7 @@ func ExecuteQueryTestCase(t *testing.T, schema string, collectionNames []string, return } collections = append(collections, col) + fmt.Printf("Collection name:%s id%v\n", col.Name(), col.ID()) } // insert docs diff --git a/db/txn.go b/db/txn.go index 3ba88aa403..35b71b7a9d 100644 --- a/db/txn.go +++ b/db/txn.go @@ -96,7 +96,7 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) { } // hide a ds.Batching store as a ds.Txn - rb := shimBatcherTxn{ + rb := store.ShimBatcherTxn{ Read: db.rootstore, Batch: batcher, } @@ -115,7 +115,7 @@ func (db *DB) newTxn(ctx context.Context, readonly bool) (*Txn, error) { // txn.headstore = ds.NewLogDatastore(ktds.Wrap(shimStore, db.hsKeyTransform), fmt.Sprintf("%s:headstore", txnid)) // batchstore := ds.NewLogDatastore(ktds.Wrap(shimStore, db.dagKeyTransform), fmt.Sprintf("%s:dagstore", txnid)) - shimStore := shimTxnStore{txn.IterableTxn} + shimStore := store.ShimTxnStore{Txn: txn.IterableTxn} txn.systemstore = ktds.Wrap(shimStore, db.ssKeyTransform) txn.datastore = ktds.Wrap(shimStore, db.dsKeyTransform) txn.headstore = ktds.Wrap(shimStore, db.hsKeyTransform) @@ -146,6 +146,12 @@ func (txn *Txn) DAGstore() core.DAGStore { return txn.dagstore } +// Rootstore returns the underlying txn as a DSReaderWriter to implement +// the MultiStore interface +func (txn *Txn) Rootstore() core.DSReaderWriter { + return txn.IterableTxn +} + func (txn *Txn) IsBatch() bool { return txn.isBatch } @@ -185,30 +191,6 @@ func (txn *Txn) runSuccessFns(ctx context.Context) { } } -// Shim to make ds.Txn support ds.Datastore -type shimTxnStore struct { - ds.Txn -} - -func (ts shimTxnStore) Sync(ctx context.Context, prefix ds.Key) error { - return ts.Txn.Commit(ctx) -} - -func (ts shimTxnStore) Close() error { - ts.Discard(context.TODO()) - return nil -} - -// shim to make ds.Batch implement ds.Datastore -type shimBatcherTxn struct { - ds.Read - ds.Batch -} - -func (shimBatcherTxn) Discard(_ context.Context) { - // noop -} - // txn := db.NewTxn() // users := db.GetCollection("users") // usersTxn := users.WithTxn(txn) diff --git a/document/document.go b/document/document.go index 1e7c5429b4..d90405ff6f 100644 --- a/document/document.go +++ b/document/document.go @@ -68,6 +68,8 @@ type Document struct { values map[Field]Value // @TODO: schemaInfo schema.Info + head cid.Cid + // marks if document has unsaved changes isDirty bool } @@ -156,6 +158,14 @@ func NewFromJSON(obj []byte, schema ...base.SchemaDescription) (*Document, error return NewFromMap(data, schema...) } +func (doc *Document) Head() cid.Cid { + return doc.head +} + +func (doc *Document) SetHead(head cid.Cid) { + doc.head = head +} + // Key returns the generated DocKey for this document func (doc *Document) Key() key.DocKey { return doc.key @@ -296,6 +306,7 @@ func (doc *Document) setObject(t core.CType, field string, val *Document) error return doc.set(t, field, &value) } +// @todo: Update with document schemas func (doc *Document) setAndParseType(field string, value interface{}) error { if value == nil { return nil @@ -304,6 +315,11 @@ func (doc *Document) setAndParseType(field string, value interface{}) error { switch val := value.(type) { // int (any number) + case int: + err := doc.setCBOR(core.LWW_REGISTER, field, int64(val)) + if err != nil { + return err + } case float64: // case int64: diff --git a/go.mod b/go.mod index 95ed06d284..8a2dbc51fe 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.0.15 + github.com/pkg/errors v0.9.1 github.com/satori/go.uuid v1.2.0 github.com/spf13/cobra v1.1.3 github.com/spf13/viper v1.7.0 @@ -73,7 +74,6 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.2.0 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect diff --git a/merkle/clock/ipld.go b/merkle/clock/ipld.go index b5d89a2d38..87ea853223 100644 --- a/merkle/clock/ipld.go +++ b/merkle/clock/ipld.go @@ -31,10 +31,12 @@ func init() { ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) } +type deltaExtractorFn func(ipld.Node) (core.Delta, error) + // crdtNodeGetter wraps an ipld.NodeGetter with some additional utility methods type crdtNodeGetter struct { ipld.NodeGetter - deltaExtractor func(ipld.Node) (core.Delta, error) + deltaExtractor deltaExtractorFn } func (ng *crdtNodeGetter) GetDelta(ctx context.Context, c cid.Cid) (ipld.Node, core.Delta, error) { @@ -146,3 +148,5 @@ func makeNode(delta core.Delta, heads []cid.Cid) (ipld.Node, error) { } return nd, nil } + +// type LocalNodeGetter diff --git a/merkle/crdt/composite.go b/merkle/crdt/composite.go index a503878764..cce8167f52 100644 --- a/merkle/crdt/composite.go +++ b/merkle/crdt/composite.go @@ -54,7 +54,7 @@ func NewMerkleCompositeDAG(datastore core.DSReaderWriter, headstore core.DSReade // strip collection/index identifier from docKey headsetKey := ds.KeyWithNamespaces(dockey.List()[2:]) clock := clock.NewMerkleClock(headstore, dagstore, headsetKey.String(), compositeDag) - base := &baseMerkleCRDT{clock, compositeDag} + base := &baseMerkleCRDT{clock: clock, crdt: compositeDag} return &MerkleCompositeDAG{ baseMerkleCRDT: base, diff --git a/merkle/crdt/factory.go b/merkle/crdt/factory.go index 8974a9e789..0e1f8f4277 100644 --- a/merkle/crdt/factory.go +++ b/merkle/crdt/factory.go @@ -147,6 +147,11 @@ func (factory Factory) WithDagstore(dagstore core.DAGStore) Factory { return factory } +// Rootstore impements MultiStore +func (factory Factory) Rootstore() core.DSReaderWriter { + return nil +} + // SetLogger sets the current logger // func (factory *Factory) SetLogger(l logging.StandardLogger) error { // factory.log = l diff --git a/merkle/crdt/lwwreg.go b/merkle/crdt/lwwreg.go index b57f607e2f..583e7c1e98 100644 --- a/merkle/crdt/lwwreg.go +++ b/merkle/crdt/lwwreg.go @@ -11,10 +11,12 @@ package crdt import ( "context" + "fmt" "log" "github.com/sourcenetwork/defradb/core" corecrdt "github.com/sourcenetwork/defradb/core/crdt" + "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/merkle/clock" // "github.com/sourcenetwork/defradb/store" @@ -50,16 +52,41 @@ type MerkleLWWRegister struct { // NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a LWWRegister CRDT func NewMerkleLWWRegister(datastore core.DSReaderWriter, headstore core.DSReaderWriter, dagstore core.DAGStore, ns, dockey ds.Key) *MerkleLWWRegister { - register := corecrdt.NewLWWRegister(datastore, ns, dockey.String() /* stuff like namespace and ID */) + // New Register + reg := corecrdt.NewLWWRegister(datastore, ns, dockey.String() /* stuff like namespace and ID */) - // strip collection/index identifier from docKey - headsetKey := ds.KeyWithNamespaces(dockey.List()[2:]) - clock := clock.NewMerkleClock(headstore, dagstore, headsetKey.String(), register) - base := &baseMerkleCRDT{clock, register} + // New Clock + // two possible cases here + // 1) Primary index + // 2) Versioned Index + var headsetKey ds.Key + list := dockey.List()[1:] // remove collection identifier + if list[0] == fmt.Sprint(base.PrimaryIndex) { + // strip collection/index identifier from docKey, and any trailing + // data AFTER the docKey. + headsetKey = ds.KeyWithNamespaces(list[1:]) + } else if list[0] == fmt.Sprint(base.VersionIndex) { + // splice out the Version CID component of the + // VersionIndex compound index key. + // Currently, the key is in the following format + // /VersionIndexID/DocKey/VersionCID/.../FieldIdentifer + // + // We want to remove the VersionIndexID and the VersionCID, but keep the rest. + headsetKey = ds.KeyWithNamespaces(append(list[1:2], list[3:]...)) + } else { + // error, lets panic for now. TODO: FIX THIS + panic("invalid index identifier for Merkle CRDT") + } + + clk := clock.NewMerkleClock(headstore, dagstore, headsetKey.String(), reg) + // newBaseMerkleCRDT(clock, register) + base := &baseMerkleCRDT{clock: clk, crdt: reg} + // instatiate MerkleLWWRegister + // return return &MerkleLWWRegister{ baseMerkleCRDT: base, - reg: register, + reg: reg, } } diff --git a/merkle/crdt/merklecrdt.go b/merkle/crdt/merklecrdt.go index ebfa712700..9911b7e309 100644 --- a/merkle/crdt/merklecrdt.go +++ b/merkle/crdt/merklecrdt.go @@ -27,6 +27,7 @@ import ( // so it can be merged with any given semantics. type MerkleCRDT interface { core.ReplicatedData + Clock() core.MerkleClock } // type MerkleCRDTInitFn func(ds.Key) MerkleCRDT @@ -68,6 +69,20 @@ func (base *baseMerkleCRDT) Value(ctx context.Context) ([]byte, error) { return base.crdt.Value(ctx) } +func (base *baseMerkleCRDT) Clock() core.MerkleClock { + return base.clock +} + +// func (base *baseMerkleCRDT) ProcessNode(ng core.NodeGetter, root cid.Cid, rootPrio uint64, delta core.Delta, node ipld.Node) ([]cid.Cid, error) { +// current := node.Cid() +// err := base.Merge(delta, dshelp.CidToDsKey(current).String()) +// if err != nil { +// return nil, fmt.Errorf("error merging delta from %s : %w", current, err) +// } + +// return base.clock.ProcessNode(ng, root, rootPrio, delta, node) +// } + // Publishes the delta to state func (base *baseMerkleCRDT) Publish(ctx context.Context, delta core.Delta) (cid.Cid, error) { return base.clock.AddDAGNode(ctx, delta) diff --git a/merkle/crdt/merklecrdt_test.go b/merkle/crdt/merklecrdt_test.go index 8200b9bd8d..4d53b9bd24 100644 --- a/merkle/crdt/merklecrdt_test.go +++ b/merkle/crdt/merklecrdt_test.go @@ -45,7 +45,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, core.DSReaderWriter) { id := "MyKey" reg := corecrdt.NewLWWRegister(datastore, ds.NewKey(""), id) clk := clock.NewMerkleClock(headstore, dagstore, id, reg) - return &baseMerkleCRDT{clk, reg}, s + return &baseMerkleCRDT{clock: clk, crdt: reg}, s } func TestMerkleCRDTPublish(t *testing.T) { diff --git a/query/graphql/parser/query.go b/query/graphql/parser/query.go index ada54deb25..02967fe5d9 100644 --- a/query/graphql/parser/query.go +++ b/query/graphql/parser/query.go @@ -32,6 +32,14 @@ const ( HiddenFieldName = "_hidden" ) +// Enum for different types of read Select queries +type SelectQueryType int + +const ( + ScanQuery = iota + VersionedScanQuery +) + var dbAPIQueryNames = map[string]bool{ "latestCommits": true, "allCommits": true, @@ -94,6 +102,10 @@ type Select struct { Alias string CollectionName string + // QueryType indicates what kind of query this is + // Currently supports: ScanQuery, VersionedScanQuery + QueryType SelectQueryType + // Root is the top level query parsed type Root SelectionType @@ -365,6 +377,12 @@ func parseSelect(rootType SelectionType, field *ast.Field) (*Select, error) { Fields: fields, } } + + if len(slct.DocKeys) != 0 && len(slct.CID) != 0 { + slct.QueryType = VersionedScanQuery + } else { + slct.QueryType = ScanQuery + } } // if theres no field selections, just return diff --git a/query/graphql/planner/commit.go b/query/graphql/planner/commit.go index c7b8691f33..a378d5b979 100644 --- a/query/graphql/planner/commit.go +++ b/query/graphql/planner/commit.go @@ -11,7 +11,6 @@ package planner import ( "errors" - "fmt" "math" cid "github.com/ipfs/go-cid" @@ -118,6 +117,9 @@ func (p *Planner) commitSelectLatest(parsed *parser.CommitSelect) (*commitSelect return commit, nil } +// commitSelectBlock is a CommitSelect node intialized witout a headsetScanNode, and is +// expected to be given a target CID in the parser.CommitSelect object. It returns +// a single commit if found func (p *Planner) commitSelectBlock(parsed *parser.CommitSelect) (*commitSelectNode, error) { dag := p.DAGScan() if parsed.Cid != "" { @@ -126,8 +128,8 @@ func (p *Planner) commitSelectBlock(parsed *parser.CommitSelect) (*commitSelectN return nil, err } dag.cid = &c - fmt.Println("got cid:", c) - } + // fmt.Println("got cid:", c) + } // @todo: handle error if no CID is given return &commitSelectNode{ p: p, @@ -136,6 +138,8 @@ func (p *Planner) commitSelectBlock(parsed *parser.CommitSelect) (*commitSelectN }, nil } +// commitSelectAll is a CommitSelect initialized with a headsetScanNode, and will +// recursively return all graph commits in order. func (p *Planner) commitSelectAll(parsed *parser.CommitSelect) (*commitSelectNode, error) { dag := p.DAGScan() headset := p.HeadScan() diff --git a/query/graphql/planner/dagscan.go b/query/graphql/planner/dagscan.go index 471d35e508..a083424008 100644 --- a/query/graphql/planner/dagscan.go +++ b/query/graphql/planner/dagscan.go @@ -74,7 +74,6 @@ func (h *headsetScanNode) initScan() error { h.spans = append(h.spans, core.NewSpan(h.key, h.key.PrefixEnd())) } - // fmt.Println("startin fetcher with spans:", h.spans[0].Start()) err := h.fetcher.Start(h.p.ctx, h.p.txn, h.spans) if err != nil { return err @@ -108,7 +107,9 @@ func (h *headsetScanNode) Values() map[string]interface{} { } } -func (h *headsetScanNode) Close() error { return nil } +func (h *headsetScanNode) Close() error { + return h.fetcher.Close() +} func (h *headsetScanNode) Source() planNode { return nil } @@ -273,6 +274,25 @@ func (n *dagScanNode) Next() (bool, error) { return true, nil } +// -> D1 -> E1 -> F1 +// A -> B -> C | +// -> D2 -> E2 -> F2 + +/* + +/db/blocks/QmKJHSDLFKJHSLDFKJHSFLDFDJKSDF => IPLD_BLOCK_BYTE_ARRAY +/db/blocks/QmJSDHGFKJSHGDKKSDGHJKFGHKSD => IPLD_BLOCK_BYTE_ARRAY +/db/blocks/QmHLSHDFLHJSDFLHJFSLDKSH => IPLD_BLOCK_BYTE_ARRAY => []byte("hello") +/db/blocks/QmSFHLSDHLHJSDLFHJLSD => IPLD_BLOCK_BYTE_ARRAY => []byte("goodbye") +/db/data/1/0/bae-ALICE/1:v => "hello" +/db/data/1/0/bae-ALICE/C:v => []byte... +/db/heads/bae-ALICE/C/QmJSDHGFKJSHGDKKSDGHJKFGHKSD => [priority=1] +/db/heads/bae-ALICE/C/QmKJHSDLFKJHSLDFKJHSFLDFDJKSDF => [priority=1] +/db/heads/bae-ALICE/1/QmHLSHDFLHJSDFLHJFSLDKSH => [priority=1] +/db/heads/bae-ALICE/1/QmSFHLSDHLHJSDLFHJLSD => [priority=1] + +*/ + // func (n *dagScanNode) nextHead() (cid.Cid, error) { // } diff --git a/query/graphql/planner/datasource.go b/query/graphql/planner/datasource.go index 24bb477cdb..bf4f3a4d10 100644 --- a/query/graphql/planner/datasource.go +++ b/query/graphql/planner/datasource.go @@ -30,13 +30,13 @@ type planSource struct { // datasource is a set of utilities for constructing scan/index/join nodes // from a given query statement -func (p *Planner) getSource(collection string) (planSource, error) { +func (p *Planner) getSource(collection string, versioned bool) (planSource, error) { // for now, we only handle simple collection scannodes - return p.getCollectionScanPlan(collection) + return p.getCollectionScanPlan(collection, versioned) } // @todo: Add field selection -func (p *Planner) getCollectionScanPlan(collection string) (planSource, error) { +func (p *Planner) getCollectionScanPlan(collection string, versioned bool) (planSource, error) { if collection == "" { return planSource{}, errors.New("collection name cannot be empty") } @@ -45,7 +45,7 @@ func (p *Planner) getCollectionScanPlan(collection string) (planSource, error) { return planSource{}, err } - scan := p.Scan() + scan := p.Scan(versioned) err = scan.initCollection(colDesc) if err != nil { return planSource{}, err diff --git a/query/graphql/planner/multi.go b/query/graphql/planner/multi.go index f7d24ceedf..2d713d4d03 100644 --- a/query/graphql/planner/multi.go +++ b/query/graphql/planner/multi.go @@ -33,7 +33,6 @@ type MultiNode interface { planNode Children() []planNode AddChild(string, planNode) error - ReplaceChildAt(int, string, planNode) error SetMultiScanner(*multiScanNode) } @@ -173,6 +172,51 @@ func (p *parallelNode) nextMerge(index int, plan mergeNode) (bool, error) { return true, nil } +/* + +scan node +========= +{ + _key: bae-ALICE, + name: Alice, + points: 124, + verified: false +} + +typeJoin node(merge) +============= +{ + friends: [ + { + _key: bae-BOB, + name: bob, + points: 99.9, + verified: true, + } + ] +} + +output +====== + +{ + _key: bae-ALICE, + name: Alice, + points: 124, + verified: false, + + friends: [ + { + _key: bae-BOB, + name: bob, + points: 99.9, + verified: true, + } + ] +} + +*/ + func (p *parallelNode) nextAppend(index int, plan appendNode) (bool, error) { if key, ok := p.doc["_key"].(string); ok { // pass the doc key as a reference through the spans interface @@ -203,6 +247,45 @@ func (p *parallelNode) nextAppend(index int, plan appendNode) (bool, error) { return true, nil } +/* + +query { + user { + _key + name + points + verified + + _version { + cid + } + } +} + +scan node +========= +{ + _key: bae-ALICE, + name: Alice, + points: 124, + verified: false +} + +_version: commitSelectTopNode(append) +=================== +[ + { + cid: QmABC + }, + { + cid: QmDEF + } + ... +] + + +*/ + func (p *parallelNode) Values() map[string]interface{} { // result := make(map[string]interface{}) // for i, plan := range p.children { @@ -233,16 +316,6 @@ func (p *parallelNode) AddChild(field string, node planNode) error { return nil } -func (p *parallelNode) ReplaceChildAt(i int, field string, node planNode) error { - if i >= len(p.children) { - return errors.New("Index to replace child node at doesn't exist (out of bounds)") - } - - p.children[i] = node - p.childFields[i] = field - return nil -} - func (p *parallelNode) SetMultiScanner(ms *multiScanNode) { p.multiscan = ms } diff --git a/query/graphql/planner/planner.go b/query/graphql/planner/planner.go index cc24629806..3247614248 100644 --- a/query/graphql/planner/planner.go +++ b/query/graphql/planner/planner.go @@ -156,7 +156,9 @@ func (p *Planner) makePlan(stmt parser.Statement) (planNode, error) { if err != nil { return nil, err } - return plan, nil + + err = plan.Init() + return plan, err } // plan optimization. Includes plan expansion and wiring diff --git a/query/graphql/planner/scan.go b/query/graphql/planner/scan.go index 62636ef6d6..175a91c82a 100644 --- a/query/graphql/planner/scan.go +++ b/query/graphql/planner/scan.go @@ -41,10 +41,14 @@ type scanNode struct { scanInitialized bool - fetcher fetcher.DocumentFetcher + fetcher fetcher.Fetcher } func (n *scanNode) Init() error { + // init the fetcher + if err := n.fetcher.Init(&n.desc, n.index, n.fields, n.reverse); err != nil { + return err + } return n.initScan() } @@ -57,8 +61,7 @@ func (n *scanNode) initCollection(desc base.CollectionDescription) error { // Start starts the internal logic of the scanner // like the DocumentFetcher, and more. func (n *scanNode) Start() error { - // init the fetcher - return n.fetcher.Init(&n.desc, n.index, n.fields, n.reverse) + return nil // noop } func (n *scanNode) initScan() error { @@ -67,7 +70,6 @@ func (n *scanNode) initScan() error { n.spans = append(n.spans, core.NewSpan(start, start.PrefixEnd())) } - // fmt.Println("Initializing scan with the following spans:", n.spans) err := n.fetcher.Start(n.p.ctx, n.p.txn, n.spans) if err != nil { return err @@ -81,12 +83,6 @@ func (n *scanNode) initScan() error { // Returns true, if there is a result, // and false otherwise. func (n *scanNode) Next() (bool, error) { - if !n.scanInitialized { - if err := n.initScan(); err != nil { - return false, err - } - } - // keep scanning until we find a doc that passes the filter for { var err error @@ -126,8 +122,14 @@ func (n *scanNode) Source() planNode { return nil } // Merge implements mergeNode func (n *scanNode) Merge() bool { return true } -func (p *Planner) Scan() *scanNode { - return &scanNode{p: p} +func (p *Planner) Scan(versioned bool) *scanNode { + var f fetcher.Fetcher + if versioned { + f = new(fetcher.VersionedFetcher) + } else { + f = new(fetcher.DocumentFetcher) + } + return &scanNode{p: p, fetcher: f} } // multiScanNode is a buffered scanNode that has diff --git a/query/graphql/planner/select.go b/query/graphql/planner/select.go index 56f6afc6f7..fee4cffbb9 100644 --- a/query/graphql/planner/select.go +++ b/query/graphql/planner/select.go @@ -15,9 +15,35 @@ import ( "github.com/sourcenetwork/defradb/core" "github.com/sourcenetwork/defradb/db/base" + "github.com/sourcenetwork/defradb/db/fetcher" "github.com/sourcenetwork/defradb/query/graphql/parser" + + "github.com/ipfs/go-cid" + "github.com/pkg/errors" ) +/* + +SELECT * From TableA as A JOIN TableB as B ON a.id = b.friend_id + +{ + query { + user { + age + + friend { + name + } + + address { + street + } + } + } +} + +*/ + // wraps a selectNode and all the logic of a plan // graph into a single struct for proper plan // expansion @@ -32,6 +58,10 @@ type selectTopNode struct { // top of the plan graph plan planNode + + // plan -> limit -> sort -> sort.plan = (values -> container | SORT_STRADEGY) -> render -> source + + // ... source -> MultiNode -> TypeJoinNode.plan = (typeJoinOne | typeJoinMany) -> scanNode } func (n *selectTopNode) Init() error { return n.plan.Init() } @@ -134,7 +164,7 @@ func (n *selectNode) initSource(parsed *parser.Select) ([]aggregateNode, error) if parsed.CollectionName == "" { parsed.CollectionName = parsed.Name } - sourcePlan, err := n.p.getSource(parsed.CollectionName) + sourcePlan, err := n.p.getSource(parsed.CollectionName, parsed.QueryType == parser.VersionedScanQuery) if err != nil { return nil, err } @@ -151,12 +181,22 @@ func (n *selectNode) initSource(parsed *parser.Select) ([]aggregateNode, error) origScan.filter = n.filter n.filter = nil - // if we have a FindByDockey filter, create a span for it - // and propagate it to the scanNode - // @todo: When running the optimizer, check if the filter object - // contains a _key equality condition, and upgrade it to a point lookup - // instead of a prefix scan + filter via the Primary Index (0), like here: - if parsed.DocKeys != nil { + // If we have both a DocKey and a CID, then we need to run + // a TimeTravel (History-Traversing Versioned) query, which means + // we need to propogate the values to the underlying VersionedFetcher + if parsed.QueryType == parser.VersionedScanQuery { + c, err := cid.Decode(parsed.CID) + if err != nil { + return nil, errors.Wrap(err, "Failed to propagate VersionFetcher span, invalid CID") + } + spans := fetcher.NewVersionedSpan(core.NewKey(parsed.DocKeys[0]), c) // @todo check len + origScan.Spans(spans) + } else if parsed.DocKeys != nil { // If we *just* have a DocKey(s), run a FindByDocKey(s) optimization + // if we have a FindByDockey filter, create a span for it + // and propogate it to the scanNode + // @todo: When running the optimizer, check if the filter object + // contains a _key equality condition, and upgrade it to a point lookup + // instead of a prefix scan + filter via the Primary Index (0), like here: spans := make(core.Spans, len(parsed.DocKeys)) for i, docKey := range parsed.DocKeys { dockeyIndexKey := base.MakeIndexKey(&sourcePlan.info.collectionDescription, @@ -190,11 +230,26 @@ func (n *selectNode) initFields(parsed *parser.Select) ([]aggregateNode, error) // - commitScan if f.Name == parser.VersionFieldName { // reserved sub type for object queries commitSlct := &parser.CommitSelect{ - Name: f.Name, - Alias: f.Alias, - Type: parser.LatestCommits, + Name: f.Name, + Alias: f.Alias, + // Type: parser.LatestCommits, Fields: f.Fields, } + // handle _version sub selection query differently + // if we are executing a regular Scan query + // or a TimeTravel query. + if parsed.QueryType == parser.VersionedScanQuery { + // for a TimeTravel query, we don't need the Latest + // commit. Instead, _version references the CID + // of that Target version we are querying. + // So instead of a LatestCommit subquery, we need + // a OneCommit subquery, with the supplied parameters. + commitSlct.DocKey = parsed.DocKeys[0] // @todo check length + commitSlct.Cid = parsed.CID + commitSlct.Type = parser.OneCommit + } else { + commitSlct.Type = parser.LatestCommits + } commitPlan, err := n.p.CommitSelect(commitSlct) if err != nil { return nil, err diff --git a/query/graphql/planner/type_join.go b/query/graphql/planner/type_join.go index 9df0e77f1c..1b6e948ced 100644 --- a/query/graphql/planner/type_join.go +++ b/query/graphql/planner/type_join.go @@ -19,6 +19,167 @@ import ( "github.com/sourcenetwork/defradb/query/graphql/schema" ) +/* + +type User { + name: String + age: Int + friends: [Friend] +} + +type Friend { + name: String + friendsDate: DateTime + user_id: DocKey +} + +- > + +/graphql +/explain + + +{ + query { + user { selectTopNode -> (source) selectNode -> (source) scanNode(user) -> filter: NIL + [_key] + name + + // key = bae-KHDFLGHJFLDG + friends selectNode -> (source) scanNode(friend) -> filter: {user_id: {_eq: "bae-KHDFLGHJFLDG"}} { + name + date: friendsDate + } + } + } +} + +selectTopNode - > selectNode -> MultiNode.children: []planNode -> multiScanNode(scanNode(user)**) -> } -> scanNode(user).Next() -> FETCHER_STUFF + FILTER_STUFF + OTHER_STUFF + -> TypeJoinNode(merge**) -> TypeJoinOneMany -> (one) multiScanNode(scanNode(user)**) -> } -> scanNode(user).Value() -> doc + -> (many) selectNode - > scanNode(friend) + +1. NEXT/VALUES MultiNode.doc = {_key: bae-KHDFLGHJFLDG, name: "BOB"} +2. NEXT/VALUES TypeJoinOneMany.one {_key: bae-KHDFLGHJFLDG, name: "BOB"} +3. NEXT/VALUES (many).selectNode.doc = {name: "Eric", date: Oct29} +LOOP +4. NEXT/VALUES TypeJoinNode {_key: bae-KHDFLGHJFLDG, name: "BOB"} + {friends: [{{name: "Eric", date: Oct29}}]} +5. NEXT/VALUES (many).selectNode.doc = {name: "Jimmy", date: Oct21} +6. NEXT/VALUES TypeJoinNode {_key: bae-KHDFLGHJFLDG, name: "BOB"} + {friends: [{name: "Eric", date: Oct29}, {name: "Jimmy", date: Oct21}]} +GOTO LOOP + +// SPLIT FILTER +query { + user { + age + name + points + + friends { + name + points + } + } +} + +{ + data: [ + { + _key: bae-ALICE + age: 22, + name: "Alice", + points: 45, + + friends: [ + { + name: "Bob", + points: 11 + user_id: "bae-ALICE" + }, + ] + }, + + { + _key: bae-CHARLIE + age: 22, + name: "Charlie", + points: 45, + + friends: [ + // { + // name: "Mickey", + // points: 6 + // } + ] + }, + ] +} + +ALL EMPTY +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = NIL -> ... -> scanNode.filter = NIL + +ROOT EMPTY / SUB FULL +{friends: {points: {_gt: 10}}} +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = {friends: {points: {_gt: 10}}} -> ... -> scanNode.filter = NIL + +ROOT FULL / SUB EMPTY +{age: {_gte: 21}} +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = NIL -> ... -> scanNode(user).filter = {age: {_gte: 21}} + +ROOT FULL / SUB FULL +{age: {_gte: 21}, friends: {points: {_gt: 10}}} +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = {friends: {points: {_gt: 10}}} -> ... -> scanNode(user).filter = {age: {_gte: 21}} + -> scanNode(friends).filter = NIL + +ROOT FULL / SUB EMPTY / SUB SUB FULL +{age: {_gte: 21}} +friends: {points: {_gt: 10}} +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = NIL -> ... -> scanNode(user).filter = {age: {_gte: 21}} + -> scanNode(friends).filter = {points: {_gt: 10}} + +ROOT FULL / SUB FULL / SUB SUB FULL +{age: {_gte: 21}} +friends: {points: {_gt: 10}} +PLAN -> selectTopNode.plan -> limit (optional) -> order (optional) -> selectNode.filter = {friends: {points: {_gt: 10}}} -> ... -> scanNode(user).filter = {age: {_gte: 21}} + -> scanNode(friends).filter = {points: {_gt: 10}} + + +ONE-TO-ONE EXAMPLE WITH FILTER TRACKING +type user { + age: Int + points: Float + name: String + + address: Address @primary + address_id: bae-address-VALUE +} + +type Address: { + street_name: String + house_number: Int + city: String + country: String + ... + + user: user + # user_id: DocKey +} + +query { + user { + age + points + name + + address { + street_name + city + country + } + } +} + +*/ + // typeIndexJoin provides the needed join functionality // for querying relationship based sub types. // It constructs a new plan node, which queries the @@ -293,7 +454,6 @@ func (n *typeJoinOne) valuesPrimary(doc map[string]interface{}) map[string]inter // re-initialize the sub type plan if err := n.subType.Init(); err != nil { // @todo pair up on the error handling / logging properly. - fmt.Println("sub-type initalization error with re-initalizing : %w", err) return doc } @@ -304,7 +464,6 @@ func (n *typeJoinOne) valuesPrimary(doc map[string]interface{}) map[string]inter // @todo pair up on the error handling / logging properly. if err != nil { - fmt.Println("Internal primary value error : %w", err) return doc } @@ -415,7 +574,7 @@ func (n *typeJoinMany) Values() map[string]interface{} { } else { docKey := doc["_key"].(string) filter := map[string]interface{}{ - n.rootName + "_id": docKey, + n.rootName + "_id": docKey, // user_id: "bae-ALICE" | user_id: "bae-CHARLIE" } // using the doc._key as a filter err := appendFilterToScanNode(n.subType, filter) @@ -445,10 +604,10 @@ func (n *typeJoinMany) Values() map[string]interface{} { } func (n *typeJoinMany) Close() error { - err := n.root.Close() - if err != nil { + if err := n.root.Close(); err != nil { return err } + return n.subType.Close() } diff --git a/query/graphql/planner/update.go b/query/graphql/planner/update.go index 2a25e6859f..fc6bc380c4 100644 --- a/query/graphql/planner/update.go +++ b/query/graphql/planner/update.go @@ -73,13 +73,11 @@ func (n *updateNode) Next() (bool, error) { results, err = n.collection.UpdateWithFilter(n.p.ctx, n.filter, n.patch) } - fmt.Println("update node error : ", err) if err != nil { return false, err } // consume the updates into our valuesNode - fmt.Println(results) for _, resKey := range results.DocKeys { err := n.updateIter.docs.AddDoc(map[string]interface{}{"_key": resKey}) if err != nil { diff --git a/query/graphql/planner/versionedscan.go b/query/graphql/planner/versionedscan.go new file mode 100644 index 0000000000..c937ad9ae8 --- /dev/null +++ b/query/graphql/planner/versionedscan.go @@ -0,0 +1,128 @@ +package planner + +import ( + "errors" + + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/db/base" + "github.com/sourcenetwork/defradb/db/fetcher" + "github.com/sourcenetwork/defradb/query/graphql/parser" + + "github.com/ipfs/go-cid" +) + +var ( + emptyCID = cid.Cid{} +) + +// scans an index for records +type versionedScanNode struct { + p *Planner + + // versioned data + key core.Key + version cid.Cid + + desc base.CollectionDescription + + fields []*base.FieldDescription + doc map[string]interface{} + docKey []byte + + reverse bool + + // filter data + filter *parser.Filter + + scanInitialized bool + + fetcher fetcher.VersionedFetcher +} + +func (n *versionedScanNode) Init() error { + // init the fetcher + if err := n.fetcher.Init(&n.desc, nil, n.fields, n.reverse); err != nil { + return err + } + return n.initScan() +} + +// Start starts the internal logic of the scanner +// like the DocumentFetcher, and more. +func (n *versionedScanNode) Start() error { + return nil // noop +} + +func (n *versionedScanNode) initScan() error { + if len(n.key.String()) == 0 || n.version.Equals(emptyCID) { + return errors.New("VersionedScan is missing either a DocKey or VersionCID") + } + + // create a span of the form {DocKey, VersionCID} + // spans := core.Spans{core.NewSpan(n.key, core.NewKey(n.version.String()))} + spans := fetcher.NewVersionedSpan(n.key, n.version) + err := n.fetcher.Start(n.p.ctx, n.p.txn, spans) + if err != nil { + return err + } + + n.scanInitialized = true + return nil +} + +// Next gets the next result. +// Returns true, if there is a result, +// and false otherwise. +func (n *versionedScanNode) Next() (bool, error) { + // if !n.scanInitialized { + // if err := n.initScan(); err != nil { + // return false, err + // } + // } + + // keep scanning until we find a doc that passes the filter + for { + var err error + n.docKey, n.doc, err = n.fetcher.FetchNextMap(n.p.ctx) + if err != nil { + return false, err + } + if n.doc == nil { + return false, nil + } + + passed, err := parser.RunFilter(n.doc, n.filter, n.p.evalCtx) + if err != nil { + return false, err + } + if passed { + return true, nil + } + } +} + +func (n *versionedScanNode) Spans(spans core.Spans) { + // n.spans = spans + // we expect 1 span that includes both the DocKey and VersionCID + // if len(spans) != 1 { + // return + // } +} + +// Values returns the most recent result from Next() +func (n *versionedScanNode) Values() map[string]interface{} { + return n.doc +} + +func (n *versionedScanNode) Close() error { + return n.fetcher.Close() +} + +func (n *versionedScanNode) Source() planNode { return nil } + +// Merge implements mergeNode +func (n *versionedScanNode) Merge() bool { return true } + +func (p *Planner) VersionedScan() *versionedScanNode { + return &versionedScanNode{p: p} +} diff --git a/store/multi.go b/store/multi.go new file mode 100644 index 0000000000..441ec21a1d --- /dev/null +++ b/store/multi.go @@ -0,0 +1,74 @@ +// Copyright 2021 Source Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package store + +import ( + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/db/base" + + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" +) + +type multistore struct { + root core.DSReaderWriter + data core.DSReaderWriter + head core.DSReaderWriter + // block core.DSReaderWriter + dag core.DAGStore +} + +func MultiStoreFrom(rootstore ds.Datastore) core.MultiStore { + ms := &multistore{root: rootstore} + ms.data = namespace.Wrap(rootstore, base.DataStoreKey) + ms.head = namespace.Wrap(rootstore, base.HeadStoreKey) + block := namespace.Wrap(rootstore, base.BlockStoreKey) + ms.dag = NewDAGStore(block) + + return ms +} + +// Datastore implements core.MultiStore +func (ms multistore) Datastore() core.DSReaderWriter { + return ms.data +} + +// Headstore implements core.MultiStore +func (ms multistore) Headstore() core.DSReaderWriter { + return ms.head +} + +// DAGstore implements core.MultiStore +func (ms multistore) DAGstore() core.DAGStore { + return ms.dag +} + +// Rootstore implements core.MultiStore +func (ms multistore) Rootstore() core.DSReaderWriter { + return ms.root +} + +// func PrintStore(ctx context.Context, store core.DSReaderWriter) { +// q := dsq.Query{ +// Prefix: "", +// KeysOnly: false, +// Orders: []dsq.Order{dsq.OrderByKey{}}, +// } + +// results, err := store.Query(q) +// defer results.Close() +// if err != nil { +// panic(err) +// } + +// for r := range results.Next() { +// fmt.Println(r.Key, ": ", r.Value) +// } +// } diff --git a/store/txn.go b/store/txn.go new file mode 100644 index 0000000000..30a1a1a37c --- /dev/null +++ b/store/txn.go @@ -0,0 +1,146 @@ +package store + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + + "github.com/sourcenetwork/defradb/core" + "github.com/sourcenetwork/defradb/datastores/iterable" + "github.com/sourcenetwork/defradb/db/base" +) + +type txn struct { + iterable.IterableTxn + isBatch bool + + // wrapped DS + rootstore core.DSReaderWriter + systemstore core.DSReaderWriter // wrapped txn /system namespace + datastore core.DSReaderWriter // wrapped txn /data namespace + headstore core.DSReaderWriter // wrapped txn /heads namespace + dagstore core.DAGStore // wrapped txn /blocks namespace + + // @todo once we move all Txn creation from db to here + // successFns []func() + // errorFns []func() +} + +func NewTxnFrom(ctx context.Context, rootstore ds.Batching, readonly bool) (core.Txn, error) { + Txn := new(txn) + + // check if our datastore natively supports iterable transaction, transactions or batching + if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { + dstxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly) + if err != nil { + return nil, err + } + Txn.IterableTxn = dstxn + } else if txnStore, ok := rootstore.(ds.TxnDatastore); ok { + dstxn, err := txnStore.NewTransaction(ctx, readonly) + if err != nil { + return nil, err + } + Txn.IterableTxn = iterable.NewIterableTransaction(dstxn) + // Note: db.rootstore now has type `ds.Batching`. + } else { + batcher, err := rootstore.Batch(ctx) + if err != nil { + return nil, err + } + + // hide a ds.Batching store as a ds.Txn + rb := ShimBatcherTxn{ + Read: rootstore, + Batch: batcher, + } + Txn.IterableTxn = iterable.NewIterableTransaction(rb) + Txn.isBatch = true + } + + // add the wrapped datastores using the existing KeyTransform functions from the db + // @todo Check if KeyTransforms are nil beforehand + + // debug stuff... ignore + // + // txnid := RandStringRunes(5) + // txn.systemstore = ds.NewLogDatastore(ktds.Wrap(shimStore, db.ssKeyTransform), fmt.Sprintf("%s:systemstore", txnid)) + // txn.datastore = ds.NewLogDatastore(ktds.Wrap(shimStore, db.dsKeyTransform), fmt.Sprintf("%s:datastore", txnid)) + // txn.headstore = ds.NewLogDatastore(ktds.Wrap(shimStore, db.hsKeyTransform), fmt.Sprintf("%s:headstore", txnid)) + // batchstore := ds.NewLogDatastore(ktds.Wrap(shimStore, db.dagKeyTransform), fmt.Sprintf("%s:dagstore", txnid)) + + shimStore := ShimTxnStore{Txn.IterableTxn} + Txn.rootstore = shimStore + Txn.systemstore = namespace.Wrap(shimStore, base.SystemStoreKey) + Txn.datastore = namespace.Wrap(shimStore, base.DataStoreKey) + Txn.headstore = namespace.Wrap(shimStore, base.HeadStoreKey) + batchstore := namespace.Wrap(shimStore, base.BlockStoreKey) + + Txn.dagstore = NewDAGStore(batchstore) + + return Txn, nil +} + +// Systemstore returns the txn wrapped as a systemstore under the /system namespace +func (t *txn) Systemstore() core.DSReaderWriter { + return t.systemstore +} + +// Datastore returns the txn wrapped as a datastore under the /data namespace +func (t *txn) Datastore() core.DSReaderWriter { + return t.datastore +} + +// Headstore returns the txn wrapped as a headstore under the /heads namespace +func (t *txn) Headstore() core.DSReaderWriter { + return t.headstore +} + +// DAGstore returns the txn wrapped as a blockstore for a DAGStore under the /blocks namespace +func (t *txn) DAGstore() core.DAGStore { + return t.dagstore +} + +// Rootstore returns the underlying txn as a DSReaderWriter to implement +// the MultiStore interface +func (t *txn) Rootstore() core.DSReaderWriter { + return t.IterableTxn +} + +func (txn *txn) IsBatch() bool { + return txn.isBatch +} + +// func (txn *txn) Commit(ctx context.Context) error { +// if err := txn.IterableTxn.Commit(ctx); err != nil { +// txn.runErrorFns(ctx) +// return err +// } +// txn.runSuccessFns(ctx) +// return nil +// } + +// Shim to make ds.Txn support ds.Datastore +type ShimTxnStore struct { + ds.Txn +} + +func (ts ShimTxnStore) Sync(ctx context.Context, prefix ds.Key) error { + return ts.Txn.Commit(ctx) +} + +func (ts ShimTxnStore) Close() error { + ts.Discard(context.TODO()) + return nil +} + +// shim to make ds.Batch implement ds.Datastore +type ShimBatcherTxn struct { + ds.Read + ds.Batch +} + +func (ShimBatcherTxn) Discard(_ context.Context) { + // noop +}