Skip to content

Commit

Permalink
feat: TimeTraveling (History Traversing) query engine and doc fetcher (
Browse files Browse the repository at this point in the history
…#59)

* Implemented versionFetcher

* Update Fetcher interface

* Created Txn utility

* Added versionedScan to query planner

* Added tests for versionFetcher
  • Loading branch information
jsimnz committed Feb 7, 2022
1 parent 48fa15b commit dcf28e9
Show file tree
Hide file tree
Showing 37 changed files with 1,937 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .golangci.sourceinc.yaml
Expand Up @@ -447,7 +447,7 @@ linters-settings:

govet:
# report about shadowed variables
check-shadowing: true
check-shadowing: false

# settings per analyzer
settings:
Expand Down
11 changes: 11 additions & 0 deletions core/store.go
Expand Up @@ -17,8 +17,19 @@ import (
// MultiStore is an interface wrapper around the 3 main types of stores needed for
// MerkleCRDTs
type MultiStore interface {
Rootstore() DSReaderWriter

// Datastore is a wrapped root DSReaderWriter
// under the /data namespace
Datastore() DSReaderWriter

// Headstore is a wrapped root DSReaderWriter
// under the /head namespace
Headstore() DSReaderWriter

// DAGstore is a wrapped root DSReaderWriter
// as a Blockstore, embedded into a DAGStore
// under the /blocks namespace
DAGstore() DAGStore
}

Expand Down
1 change: 1 addition & 0 deletions core/txn.go
Expand Up @@ -18,4 +18,5 @@ type Txn interface {
iterable.IterableTxn
MultiStore
Systemstore() DSReaderWriter
IsBatch() bool
}
6 changes: 6 additions & 0 deletions core/type.go
Expand Up @@ -33,3 +33,9 @@ var (
byte(3): COMPOSITE,
}
)

// reserved names
const (
HEAD = "_head"
COMPOSITE_ID = "C"
)
51 changes: 50 additions & 1 deletion db/base/descriptions.go
Expand Up @@ -10,22 +10,36 @@
package base

import (
"errors"
"fmt"

ds "github.com/ipfs/go-datastore"
"github.com/sourcenetwork/defradb/core"
)

const (
ObjectMarker = byte(0xff) // @todo: Investigate object marker values
)

// Reserved system index identifiers. Any index that defra requires
// to enable any internal feature, requires a reserved index ID.
// Changing an existing system index ID is A BREAKING CHANGE.
// Adding new IDs incrementally is not breaking.
const (
PrimaryIndex int32 = 0

// VersionIndex is a Reserved secondary index that represents a
// versioned instance of the PrimaryIndex
VersionIndex int32 = -1 * iota
)

// CollectionDescription describes a Collection and
// all its associated metadata
type CollectionDescription struct {
Name string
ID uint32
Schema SchemaDescription
Indexes []IndexDescription
Indexes []IndexDescription // @todo: New system reserved indexes are NEGATIVE. Maybe we need a map here
}

// IDString returns the collection ID as a string
Expand All @@ -44,6 +58,32 @@ func (col CollectionDescription) GetField(name string) (FieldDescription, bool)
return FieldDescription{}, false
}

func (c CollectionDescription) GetIndexDocKey(key ds.Key, indexID uint32) ds.Key {
return ds.NewKey(c.IDString()).ChildString(fmt.Sprint(indexID)).Child(key)
}

func (c CollectionDescription) GetPrimaryIndexDocKey(key ds.Key) ds.Key {
return c.GetIndexDocKey(key, c.Indexes[0].ID)
}

func (c CollectionDescription) GetFieldKey(key ds.Key, fieldName string) ds.Key {
if !c.Schema.IsEmpty() {
return key.ChildString(fmt.Sprint(c.Schema.GetFieldKey(fieldName)))
}
return key.ChildString(fieldName)
}

func (c CollectionDescription) GetPrimaryIndexDocKeyForCRDT(ctype core.CType, key ds.Key, fieldName string) (ds.Key, error) {
switch ctype {
case core.COMPOSITE:
return c.GetPrimaryIndexDocKey(key).ChildString(core.COMPOSITE_ID), nil
case core.LWW_REGISTER:
fieldKey := c.GetFieldKey(key, fieldName)
return c.GetPrimaryIndexDocKey(fieldKey), nil
}
return ds.Key{}, errors.New("Invalid CRDT type")
}

// IndexDescription describes an Index on a Collection
// and its assocatied metadata.
type IndexDescription struct {
Expand Down Expand Up @@ -99,6 +139,15 @@ func (sd SchemaDescription) IsEmpty() bool {
return len(sd.Fields) == 0
}

func (sd SchemaDescription) GetFieldKey(fieldName string) uint32 {
for _, field := range sd.Fields {
if field.Name == fieldName {
return uint32(field.ID)
}
}
return uint32(0)
}

type FieldKind uint8

// Note: These values are serialized and persisted in the database, avoid modifying existing values
Expand Down
14 changes: 14 additions & 0 deletions db/base/keys.go
@@ -0,0 +1,14 @@
package base

import (
ds "github.com/ipfs/go-datastore"
)

var (
// Individual Store Keys
RootStoreKey = ds.NewKey("/db")
SystemStoreKey = RootStoreKey.ChildString("/system")
DataStoreKey = RootStoreKey.ChildString("/data")
HeadStoreKey = RootStoreKey.ChildString("/heads")
BlockStoreKey = RootStoreKey.ChildString("/blocks")
)
9 changes: 9 additions & 0 deletions db/base/maker.go
Expand Up @@ -15,6 +15,7 @@ import (
)

var (
ROOT = "/db"
SYSTEM = "/db/system"
DATA = "/db/data"
BLOCK = "/db/block"
Expand Down Expand Up @@ -49,3 +50,11 @@ func MakeCollectionSystemKey(name string) core.Key {
func MakeSchemaSystemKey(name string) core.Key {
return core.Key{Key: schemaNs.ChildString(name)}
}

// MakeIndexPrefixKeyRaw is the same as MakeIndexPrefixKey but it takes as inputs
// the raw datastore keys, instead of the collection and index objects respectively.
func MakeIndexPrefixKeyRaw(collectionID ds.Key, indexID ds.Key) core.Key {
return core.Key{Key: core.NewKey(DATA).
Child(collectionID).
Child(indexID)}
}
27 changes: 23 additions & 4 deletions db/collection.go
Expand Up @@ -426,10 +426,12 @@ func (c *Collection) save(ctx context.Context, txn *Txn, doc *document.Document)
doc.Clean()
})

links = append(links, core.DAGLink{
link := core.DAGLink{
Name: k,
Cid: c,
})
}
links = append(links, link)
// fmt.Println("links:", link)
}
}
// Update CompositeDAG
Expand All @@ -441,8 +443,17 @@ func (c *Collection) save(ctx context.Context, txn *Txn, doc *document.Document)
if err != nil {
return nil
}
_, err = c.saveValueToMerkleCRDT(ctx, txn, c.getPrimaryIndexDocKey(dockey), core.COMPOSITE, buf, links)
return err

headCID, err := c.saveValueToMerkleCRDT(ctx, txn, c.getPrimaryIndexDocKey(dockey), core.COMPOSITE, buf, links)
if err != nil {
return nil
}

txn.OnSuccess(func() {
doc.SetHead(headCID)
})
// fmt.Printf("final: %s\n\n", docCid)
return nil
}

// Delete will attempt to delete a document by key
Expand Down Expand Up @@ -627,10 +638,18 @@ func (c *Collection) commitImplicitTxn(ctx context.Context, txn *Txn) error {
return nil
}

func (c *Collection) GetIndexDocKey(key ds.Key, indexID uint32) ds.Key {
return c.getIndexDocKey(key, indexID)
}

func (c *Collection) getIndexDocKey(key ds.Key, indexID uint32) ds.Key {
return c.colIDKey.ChildString(fmt.Sprint(indexID)).Child(key)
}

func (c *Collection) GetPrimaryIndexDocKey(key ds.Key) ds.Key {
return c.getPrimaryIndexDocKey(key)
}

func (c *Collection) getPrimaryIndexDocKey(key ds.Key) ds.Key {
return c.getIndexDocKey(key, c.PrimaryIndex().ID)
}
Expand Down
15 changes: 13 additions & 2 deletions db/collection_update.go
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/sourcenetwork/defradb/query/graphql/parser"
"github.com/sourcenetwork/defradb/query/graphql/planner"

"github.com/fxamacker/cbor/v2"
cbor "github.com/fxamacker/cbor/v2"
ds "github.com/ipfs/go-datastore"
)

Expand Down Expand Up @@ -185,7 +185,6 @@ func (c *Collection) updateWithKey(ctx context.Context, txn *Txn, key key.DocKey
}

func (c *Collection) updateWithKeys(ctx context.Context, txn *Txn, keys []key.DocKey, updater interface{}, opts ...client.UpdateOpt) (*client.UpdateResult, error) {
fmt.Println("updating keys:", keys)
patch, err := parseUpdater(updater)
if err != nil {
return nil, err
Expand Down Expand Up @@ -384,6 +383,18 @@ func (c *Collection) applyMerge(ctx context.Context, txn *Txn, doc map[string]in
return err
}

// handle Int/Float case
// JSON is annoying in that it represents all numbers
// as Float64s. So our merge object contains float64s
// even for fields defined as Ints, which causes issues
// when we serialize that in CBOR. To generate the delta
// payload.
// So lets just make sure ints are ints
// ref: https://play.golang.org/p/djThEqGXtvR
if fd.Kind == base.FieldKind_INT {
merge[mfield] = int64(mval.(float64))
}

val := document.NewCBORValue(fd.Typ, cval)
fieldKey := c.getFieldKey(key, mfield)
c, err := c.saveDocValue(ctx, txn, c.getPrimaryIndexDocKey(fieldKey), val)
Expand Down
16 changes: 5 additions & 11 deletions db/db.go
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/merkle/crdt"
"github.com/sourcenetwork/defradb/query/graphql/planner"
"github.com/sourcenetwork/defradb/query/graphql/schema"
Expand All @@ -37,13 +38,6 @@ var (

ErrOptionsEmpty = errors.New("Empty options configuration provided")

// Individual Store Keys
rootStoreKey = ds.NewKey("/db")
systemStoreKey = rootStoreKey.ChildString("/system")
dataStoreKey = rootStoreKey.ChildString("/data")
headStoreKey = rootStoreKey.ChildString("/heads")
blockStoreKey = rootStoreKey.ChildString("/blocks")

log = logging.Logger("defra.db")
)

Expand Down Expand Up @@ -90,10 +84,10 @@ type DB struct {
// NewDB creates a new instance of the DB using the given options
func NewDB(rootstore ds.Batching, options interface{}) (*DB, error) {
log.Debug("loading: internal datastores")
systemstore := namespace.Wrap(rootstore, systemStoreKey)
datastore := namespace.Wrap(rootstore, dataStoreKey)
headstore := namespace.Wrap(rootstore, headStoreKey)
blockstore := namespace.Wrap(rootstore, blockStoreKey)
systemstore := namespace.Wrap(rootstore, base.SystemStoreKey)
datastore := namespace.Wrap(rootstore, base.DataStoreKey)
headstore := namespace.Wrap(rootstore, base.HeadStoreKey)
blockstore := namespace.Wrap(rootstore, base.BlockStoreKey)
dagstore := store.NewDAGStore(blockstore)
crdtFactory := crdt.DefaultFactory.WithStores(datastore, headstore, dagstore)

Expand Down
13 changes: 13 additions & 0 deletions db/fetcher/dag.go
Expand Up @@ -68,6 +68,11 @@ func (hf *HeadFetcher) Start(ctx context.Context, txn core.Txn, spans core.Spans
}

var err error
if hf.kvIter != nil {
if err := hf.kvIter.Close(); err != nil {
return err
}
}
hf.kvIter, err = txn.Headstore().Query(ctx, q)
if err != nil {
return err
Expand Down Expand Up @@ -142,6 +147,14 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
return hf.cid, nil
}

func (hf *HeadFetcher) Close() error {
if hf.kvIter == nil {
return nil
}

return hf.kvIter.Close()
}

/*
// List returns the list of current heads plus the max height.
// @todo Document Heads.List function
Expand Down
42 changes: 27 additions & 15 deletions db/fetcher/fetcher.go
Expand Up @@ -22,20 +22,22 @@ import (
"github.com/sourcenetwork/defradb/document"
)

/*
var DocumentFetcher DocumentFetcher = &Fetcher{}
DocumentFetcher.Init()
*/
// type DocumentFetcher interface {
// Init(col *base.CollectionDescription, index *base.IndexDescription, fields []*base.FieldDescription, reverse bool) error
// Start(txn core.Txn, spans core.Spans) error
// FetchNext() (*document.EncodedDocument, error)
// FetchNextDecoded() (*document.Document, error)
// }

// var (
// _ DocumentFetcher = &DocFetcher{}
// )
// Fetcher is the interface for collecting documents
// from the underlying data store. It handles all
// the key/value scanning, aggregation, and document
// encoding.
type Fetcher interface {
Init(col *base.CollectionDescription, index *base.IndexDescription, fields []*base.FieldDescription, reverse bool) error
Start(ctx context.Context, txn core.Txn, spans core.Spans) error
FetchNext(ctx context.Context) (*document.EncodedDocument, error)
FetchNextDecoded(ctx context.Context) (*document.Document, error)
FetchNextMap(ctx context.Context) ([]byte, map[string]interface{}, error)
Close() error
}

var (
_ Fetcher = (*DocumentFetcher)(nil)
)

type DocumentFetcher struct {
col *base.CollectionDescription
Expand Down Expand Up @@ -76,7 +78,18 @@ func (df *DocumentFetcher) Init(col *base.CollectionDescription, index *base.Ind
df.initialized = true
df.doc = new(document.EncodedDocument)
df.doc.Schema = &col.Schema

if df.kvResultsIter != nil {
if err := df.kvResultsIter.Close(); err != nil {
return err
}
}
df.kvResultsIter = nil
if df.kvIter != nil {
if err := df.kvIter.Close(); err != nil {
return err
}
}
df.kvIter = nil

df.schemaFields = make(map[uint32]base.FieldDescription)
Expand Down Expand Up @@ -112,7 +125,6 @@ func (df *DocumentFetcher) Start(ctx context.Context, txn core.Txn, spans core.S
}
}
}

df.indexKey = nil
df.spans = uniqueSpans
df.curSpanIndex = -1
Expand Down

0 comments on commit dcf28e9

Please sign in to comment.