From 10cf841a7e793a89fc62df2ed56cfcb7bb46984b Mon Sep 17 00:00:00 2001 From: Tim Shannon Date: Mon, 28 Jan 2019 19:13:46 -0600 Subject: [PATCH 1/5] WIP getting subqueries to work with a single iterator --- README.md | 9 +++++---- delete_test.go | 4 ---- find_test.go | 13 +++++-------- index.go | 26 +++++++++++++++++++------- put.go | 2 +- put_test.go | 4 ---- query.go | 37 ++++++++++++++++--------------------- 7 files changed, 46 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 3ef5d5f..9c6fe3f 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,13 @@ Badger DB for customizing as you wish. By default the encoding used is Gob, so interface for faster serialization. Or, alternately, you can use any serialization you want by supplying encode / decode funcs to the `Options` struct on Open. -One Go Type will have one bucket, and multiple index buckets in a BadgerDB file, so you can store multiple Go Types in the -same database. +One Go Type will be prefixed with it's type name, so you can store multiple types in a single Badger database with +conflicts. This project is a rewrite of the [BoltHold](https://github.com/timshannon/bolthold) project on the Badger KV database -instead of [Bolt](https://github.com/etcd-io/bbolt). For a comparison between bolt and badger, see -https://blog.dgraph.io/post/badger-lmdb-boltdb/. +instead of [Bolt](https://github.com/etcd-io/bbolt). For a performance comparison between bolt and badger, see +https://blog.dgraph.io/post/badger-lmdb-boltdb/. I've written up my own comparison of the two focusing on +characteristics *other* than performance here: https://tech.townsourced.com/post/bolddb-vs-badger/. ## Indexes Indexes allow you to skip checking any records that don't meet your index criteria. If you have 1000 records and only diff --git a/delete_test.go b/delete_test.go index eedd921..177115a 100644 --- a/delete_test.go +++ b/delete_test.go @@ -49,10 +49,6 @@ func TestDeleteMatching(t *testing.T) { err := store.DeleteMatching(&ItemTest{}, tst.query) if err != nil { - if tst.writeError { - // error is expected on this test - return - } t.Fatalf("Error deleting data from badgerhold: %s", err) } diff --git a/find_test.go b/find_test.go index be90ad6..9378c99 100644 --- a/find_test.go +++ b/find_test.go @@ -181,10 +181,9 @@ var testData = []ItemTest{ } type test struct { - name string - query *badgerhold.Query - result []int // indices of test data to be found - writeError bool // if the query will error on writable transactions + name string + query *badgerhold.Query + result []int // indices of test data to be found } var testResults = []test{ @@ -347,8 +346,7 @@ var testResults = []test{ return false, nil }), - result: []int{14, 15}, - writeError: true, + result: []int{14, 15}, }, test{ name: "Time Comparison", @@ -471,8 +469,7 @@ var testResults = []test{ grp[0].Max("ID", max) return ra.Field().(int) == max.ID, nil }), - result: []int{11, 14, 15}, - writeError: true, + result: []int{11, 14, 15}, }, test{ name: "Indexed in", diff --git a/index.go b/index.go index d2d97d5..7cd51bb 100644 --- a/index.go +++ b/index.go @@ -164,18 +164,28 @@ func indexExists(it *badger.Iterator, typeName, indexName string) bool { } type iterator struct { - keyCache [][]byte - nextKeys func(*badger.Iterator) ([][]byte, error) - iter *badger.Iterator - tx *badger.Txn - err error + keyCache [][]byte + nextKeys func(*badger.Iterator) ([][]byte, error) + iter *badger.Iterator + lastSeek []byte + subQueryLastSeek []byte + tx *badger.Txn + err error } func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { i := &iterator{ - tx: tx, - iter: tx.NewIterator(badger.DefaultIteratorOptions), + tx: tx, } + + if query.iterator == nil { + query.iterator = tx.NewIterator(badger.DefaultIteratorOptions) + } else { + + } + + i.iter = query.iterator + var prefix []byte if query.index != "" { @@ -203,6 +213,7 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { item := iter.Item() key := item.KeyCopy(nil) + i.lastSeek = key ok := false if len(criteria) == 0 { // nothing to check return key for value testing @@ -248,6 +259,7 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { } item := iter.Item() + i.lastSeek = item.KeyCopy(nil) // no currentRow on indexes as it refers to multiple rows // remove index prefix for matching diff --git a/put.go b/put.go index b520167..188e63e 100644 --- a/put.go +++ b/put.go @@ -12,7 +12,7 @@ import ( ) // ErrKeyExists is the error returned when data is being Inserted for a Key that already exists -var ErrKeyExists = errors.New("This Key already exists in this badgerhold for this type") +var ErrKeyExists = errors.New("This Key already exists in badgerhold for this type") // sequence tells badgerhold to insert the key as the next sequence in the bucket type sequence struct{} diff --git a/put_test.go b/put_test.go index 602ab6d..e6d1bdb 100644 --- a/put_test.go +++ b/put_test.go @@ -236,10 +236,6 @@ func TestUpdateMatching(t *testing.T) { }) if err != nil { - if tst.writeError { - // error is expected - return - } t.Fatalf("Error updating data from badgerhold: %s", err) } diff --git a/query.go b/query.go index 6244c55..ae1a751 100644 --- a/query.go +++ b/query.go @@ -47,7 +47,6 @@ type Query struct { dataType reflect.Type tx *badger.Txn iterator *badger.Iterator - writable bool // if the query is part of a writable transaction limit int skip int @@ -333,10 +332,10 @@ type MatchFunc func(ra *RecordAccess) (bool, error) // RecordAccess allows access to the current record, field or allows running a subquery within a // MatchFunc type RecordAccess struct { - tx *badger.Txn - record interface{} - field interface{} - writeable bool + tx *badger.Txn + record interface{} + field interface{} + iterator *badger.Iterator } // Field is the current field being queried @@ -352,18 +351,14 @@ func (r *RecordAccess) Record() interface{} { // SubQuery allows you to run another query in the same transaction for each // record in a parent query func (r *RecordAccess) SubQuery(result interface{}, query *Query) error { - if r.writeable { - return fmt.Errorf("Subqueries are currently not supported from within writable transactions") - } + query.iterator = r.iterator return findQuery(r.tx, result, query) } // SubAggregateQuery allows you to run another aggregate query in the same transaction for each // record in a parent query func (r *RecordAccess) SubAggregateQuery(query *Query, groupBy ...string) ([]*AggregateResult, error) { - if r.writeable { - return nil, fmt.Errorf("Subqueries are currently not supported from within writable transactions") - } + query.iterator = r.iterator return aggregateQuery(r.tx, r.record, query, groupBy...) } @@ -428,10 +423,10 @@ func (c *Criterion) test(testValue interface{}, encoded bool, keyType string, cu return c.value.(*regexp.Regexp).Match([]byte(fmt.Sprintf("%s", value))), nil case fn: return c.value.(MatchFunc)(&RecordAccess{ - field: value, - record: currentRow, - tx: c.query.tx, - writeable: c.query.writable, + field: value, + record: currentRow, + tx: c.query.tx, + iterator: c.query.iterator, }) case isnil: return reflect.ValueOf(value).IsNil(), nil @@ -463,6 +458,7 @@ func (c *Criterion) test(testValue interface{}, encoded bool, keyType string, cu func matchesAllCriteria(criteria []*Criterion, value interface{}, encoded bool, keyType string, currentRow interface{}) (bool, error) { + for i := range criteria { ok, err := criteria[i].test(value, encoded, keyType, currentRow) if err != nil { @@ -564,7 +560,11 @@ func runQuery(tx *badger.Txn, dataType interface{}, query *Query, retrievedKeys } iter := newIterator(tx, storer.Type(), query) - defer iter.Close() + query.iterator = iter.iter + defer func() { + iter.Close() + query.iterator = nil + }() if query.index != "" && query.badIndex { return fmt.Errorf("The index %s does not exist", query.index) @@ -762,7 +762,6 @@ func findQuery(tx *badger.Txn, result interface{}, query *Query) error { query = &Query{} } - query.writable = false resultVal := reflect.ValueOf(result) if resultVal.Kind() != reflect.Ptr || resultVal.Elem().Kind() != reflect.Slice { panic("result argument must be a slice address") @@ -826,7 +825,6 @@ func deleteQuery(tx *badger.Txn, dataType interface{}, query *Query) error { if query == nil { query = &Query{} } - query.writable = true var records []*record @@ -864,8 +862,6 @@ func updateQuery(tx *badger.Txn, dataType interface{}, query *Query, update func query = &Query{} } - query.writable = true - var records []*record err := runQuery(tx, dataType, query, nil, query.skip, @@ -920,7 +916,6 @@ func aggregateQuery(tx *badger.Txn, dataType interface{}, query *Query, groupBy query = &Query{} } - query.writable = false var result []*AggregateResult if len(groupBy) == 0 { From 65f8fa6e51c043da47bd1c8a32654c0d20edc95f Mon Sep 17 00:00:00 2001 From: Tim Shannon Date: Mon, 28 Jan 2019 19:34:20 -0600 Subject: [PATCH 2/5] Go Report Card fixes --- index.go | 2 +- query.go | 2 +- store.go | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/index.go b/index.go index 7cd51bb..b2f5db8 100644 --- a/index.go +++ b/index.go @@ -214,7 +214,7 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { item := iter.Item() key := item.KeyCopy(nil) i.lastSeek = key - ok := false + var ok bool if len(criteria) == 0 { // nothing to check return key for value testing ok = true diff --git a/query.go b/query.go index ae1a751..e3e8fd5 100644 --- a/query.go +++ b/query.go @@ -165,7 +165,7 @@ func (q *Query) SortBy(fields ...string) *Query { if fields[i] == Key { panic("Cannot sort by Key.") } - found := false + var found bool for k := range q.sort { if q.sort[k] == fields[i] { found = true diff --git a/store.go b/store.go index ccb5a03..ede0b99 100644 --- a/store.go +++ b/store.go @@ -28,6 +28,8 @@ type Options struct { badger.Options } +// DefaultOptions are a default set of options for opening a BadgerHold database +// Includes badgers own default options var DefaultOptions = Options{ Options: badger.DefaultOptions, Encoder: DefaultEncode, @@ -76,7 +78,7 @@ func (s *Store) Close() error { /* NOTE: Not going to implement ReIndex and Remove index - I had originally created these to make the transision from a plain bolt or badger DB easier + I had originally created these to make the transition from a plain bolt or badger DB easier but there is too much chance for lost data, and it's probably better that any conversion be done by the developer so they can directly manage how they want data to be migrated. If you disagree, feel free to open an issue and we can revisit this. From b931499d38103794b2488a5fe68f766102f55d35 Mon Sep 17 00:00:00 2001 From: Tim Shannon Date: Mon, 28 Jan 2019 20:28:07 -0600 Subject: [PATCH 3/5] Trying out different options for tracking subqueries --- index.go | 33 ++++++++++++++------------------- query.go | 24 +++++++----------------- 2 files changed, 21 insertions(+), 36 deletions(-) diff --git a/index.go b/index.go index b2f5db8..4552d10 100644 --- a/index.go +++ b/index.go @@ -164,28 +164,26 @@ func indexExists(it *badger.Iterator, typeName, indexName string) bool { } type iterator struct { - keyCache [][]byte - nextKeys func(*badger.Iterator) ([][]byte, error) - iter *badger.Iterator - lastSeek []byte - subQueryLastSeek []byte - tx *badger.Txn - err error + keyCache [][]byte + nextKeys func(*badger.Iterator) ([][]byte, error) + iter *badger.Iterator + tx *badger.Txn + err error +} + +// seekTree is for keeping track of an iterator's last seek point, since there can only be one iterator in a RW +// transaction +type seekTree struct { + parent *seekTree + currentKey []byte } func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { i := &iterator{ - tx: tx, - } - - if query.iterator == nil { - query.iterator = tx.NewIterator(badger.DefaultIteratorOptions) - } else { - + tx: tx, + iter: tx.NewIterator(badger.DefaultIteratorOptions), } - i.iter = query.iterator - var prefix []byte if query.index != "" { @@ -213,7 +211,6 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { item := iter.Item() key := item.KeyCopy(nil) - i.lastSeek = key var ok bool if len(criteria) == 0 { // nothing to check return key for value testing @@ -259,8 +256,6 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { } item := iter.Item() - i.lastSeek = item.KeyCopy(nil) - // no currentRow on indexes as it refers to multiple rows // remove index prefix for matching ok, err := matchesAllCriteria(criteria, item.Key()[len(prefix):], true, "", nil) diff --git a/query.go b/query.go index e3e8fd5..1e8e7bf 100644 --- a/query.go +++ b/query.go @@ -46,7 +46,6 @@ type Query struct { badIndex bool dataType reflect.Type tx *badger.Txn - iterator *badger.Iterator limit int skip int @@ -332,10 +331,9 @@ type MatchFunc func(ra *RecordAccess) (bool, error) // RecordAccess allows access to the current record, field or allows running a subquery within a // MatchFunc type RecordAccess struct { - tx *badger.Txn - record interface{} - field interface{} - iterator *badger.Iterator + tx *badger.Txn + record interface{} + field interface{} } // Field is the current field being queried @@ -351,15 +349,12 @@ func (r *RecordAccess) Record() interface{} { // SubQuery allows you to run another query in the same transaction for each // record in a parent query func (r *RecordAccess) SubQuery(result interface{}, query *Query) error { - query.iterator = r.iterator return findQuery(r.tx, result, query) } // SubAggregateQuery allows you to run another aggregate query in the same transaction for each // record in a parent query func (r *RecordAccess) SubAggregateQuery(query *Query, groupBy ...string) ([]*AggregateResult, error) { - query.iterator = r.iterator - return aggregateQuery(r.tx, r.record, query, groupBy...) } @@ -423,10 +418,9 @@ func (c *Criterion) test(testValue interface{}, encoded bool, keyType string, cu return c.value.(*regexp.Regexp).Match([]byte(fmt.Sprintf("%s", value))), nil case fn: return c.value.(MatchFunc)(&RecordAccess{ - field: value, - record: currentRow, - tx: c.query.tx, - iterator: c.query.iterator, + field: value, + record: currentRow, + tx: c.query.tx, }) case isnil: return reflect.ValueOf(value).IsNil(), nil @@ -560,11 +554,7 @@ func runQuery(tx *badger.Txn, dataType interface{}, query *Query, retrievedKeys } iter := newIterator(tx, storer.Type(), query) - query.iterator = iter.iter - defer func() { - iter.Close() - query.iterator = nil - }() + defer iter.Close() if query.index != "" && query.badIndex { return fmt.Errorf("The index %s does not exist", query.index) From c95d48809ca55269c2c0120dc56f8fd40065dd40 Mon Sep 17 00:00:00 2001 From: Tim Shannon Date: Wed, 30 Jan 2019 21:05:05 -0600 Subject: [PATCH 4/5] Added concept of a bookmark on an iterator Changed so that when iterating on an update/insert/delete transaction, a bookmark gets created on the RW iterator to track where that iterator last left off. --- index.go | 41 ++++++++++++++++++++++++++++++++--------- query.go | 35 +++++++++++++++++++++++++++++------ 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/index.go b/index.go index 4552d10..d5ab617 100644 --- a/index.go +++ b/index.go @@ -167,21 +167,28 @@ type iterator struct { keyCache [][]byte nextKeys func(*badger.Iterator) ([][]byte, error) iter *badger.Iterator + bookmark *iterBookmark + lastSeek []byte tx *badger.Txn err error } -// seekTree is for keeping track of an iterator's last seek point, since there can only be one iterator in a RW -// transaction -type seekTree struct { - parent *seekTree - currentKey []byte +// iterBookmark stores a seek location in a specific iterator +// so that a single RW iterator can be shared within a single transaction +type iterBookmark struct { + iter *badger.Iterator + seekKey []byte } -func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { +func newIterator(tx *badger.Txn, typeName string, query *Query, bookmark *iterBookmark) *iterator { i := &iterator{ - tx: tx, - iter: tx.NewIterator(badger.DefaultIteratorOptions), + tx: tx, + } + + if bookmark != nil { + i.iter = bookmark.iter + } else { + i.iter = tx.NewIterator(badger.DefaultIteratorOptions) } var prefix []byte @@ -236,6 +243,7 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { nKeys = append(nKeys, key) } + i.lastSeek = key iter.Next() } return nKeys, nil @@ -256,9 +264,10 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { } item := iter.Item() + key := item.KeyCopy(nil) // no currentRow on indexes as it refers to multiple rows // remove index prefix for matching - ok, err := matchesAllCriteria(criteria, item.Key()[len(prefix):], true, "", nil) + ok, err := matchesAllCriteria(criteria, key[len(prefix):], true, "", nil) if err != nil { return nil, err } @@ -276,6 +285,8 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { return nil }) } + + i.lastSeek = key iter.Next() } @@ -286,6 +297,13 @@ func newIterator(tx *badger.Txn, typeName string, query *Query) *iterator { return i } +func (i *iterator) createBookmark() *iterBookmark { + return &iterBookmark{ + iter: i.iter, + seekKey: i.lastSeek, + } +} + // Next returns the next key value that matches the iterators criteria // If no more kv's are available the return nil, if there is an error, they return nil // and iterator.Error() will return the error @@ -335,5 +353,10 @@ func (i *iterator) Error() error { } func (i *iterator) Close() { + if i.bookmark != nil { + i.iter.Seek(i.bookmark.seekKey) + return + } + i.iter.Close() } diff --git a/query.go b/query.go index 1e8e7bf..ba2c0d1 100644 --- a/query.go +++ b/query.go @@ -46,6 +46,9 @@ type Query struct { badIndex bool dataType reflect.Type tx *badger.Txn + writable bool + subquery bool + bookmark *iterBookmark limit int skip int @@ -331,9 +334,9 @@ type MatchFunc func(ra *RecordAccess) (bool, error) // RecordAccess allows access to the current record, field or allows running a subquery within a // MatchFunc type RecordAccess struct { - tx *badger.Txn record interface{} field interface{} + query *Query } // Field is the current field being queried @@ -349,13 +352,21 @@ func (r *RecordAccess) Record() interface{} { // SubQuery allows you to run another query in the same transaction for each // record in a parent query func (r *RecordAccess) SubQuery(result interface{}, query *Query) error { - return findQuery(r.tx, result, query) + if r.query.writable { + query.bookmark = r.query.bookmark + } + query.subquery = true + return findQuery(r.query.tx, result, query) } // SubAggregateQuery allows you to run another aggregate query in the same transaction for each // record in a parent query func (r *RecordAccess) SubAggregateQuery(query *Query, groupBy ...string) ([]*AggregateResult, error) { - return aggregateQuery(r.tx, r.record, query, groupBy...) + if r.query.writable { + query.bookmark = r.query.bookmark + } + query.subquery = true + return aggregateQuery(r.query.tx, r.record, query, groupBy...) } // MatchFunc will test if a field matches the passed in function @@ -420,7 +431,7 @@ func (c *Criterion) test(testValue interface{}, encoded bool, keyType string, cu return c.value.(MatchFunc)(&RecordAccess{ field: value, record: currentRow, - tx: c.query.tx, + query: c.query, }) case isnil: return reflect.ValueOf(value).IsNil(), nil @@ -553,8 +564,15 @@ func runQuery(tx *badger.Txn, dataType interface{}, query *Query, retrievedKeys return runQuerySort(tx, dataType, query, action) } - iter := newIterator(tx, storer.Type(), query) - defer iter.Close() + iter := newIterator(tx, storer.Type(), query, query.bookmark) + if query.writable || query.subquery { + query.bookmark = iter.createBookmark() + } + + defer func() { + iter.Close() + query.bookmark = nil + }() if query.index != "" && query.badIndex { return fmt.Errorf("The index %s does not exist", query.index) @@ -752,6 +770,8 @@ func findQuery(tx *badger.Txn, result interface{}, query *Query) error { query = &Query{} } + query.writable = false + resultVal := reflect.ValueOf(result) if resultVal.Kind() != reflect.Ptr || resultVal.Elem().Kind() != reflect.Slice { panic("result argument must be a slice address") @@ -815,6 +835,7 @@ func deleteQuery(tx *badger.Txn, dataType interface{}, query *Query) error { if query == nil { query = &Query{} } + query.writable = true var records []*record @@ -852,6 +873,7 @@ func updateQuery(tx *badger.Txn, dataType interface{}, query *Query, update func query = &Query{} } + query.writable = true var records []*record err := runQuery(tx, dataType, query, nil, query.skip, @@ -906,6 +928,7 @@ func aggregateQuery(tx *badger.Txn, dataType interface{}, query *Query, groupBy query = &Query{} } + query.writable = false var result []*AggregateResult if len(groupBy) == 0 { From 81afa11016b70fe723c4b79f62c769e041ef9760 Mon Sep 17 00:00:00 2001 From: Tim Shannon Date: Thu, 31 Jan 2019 00:26:49 -0600 Subject: [PATCH 5/5] Fixed a few issues with multiple levels of subqueries --- query.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/query.go b/query.go index ba2c0d1..3b51ba6 100644 --- a/query.go +++ b/query.go @@ -352,20 +352,16 @@ func (r *RecordAccess) Record() interface{} { // SubQuery allows you to run another query in the same transaction for each // record in a parent query func (r *RecordAccess) SubQuery(result interface{}, query *Query) error { - if r.query.writable { - query.bookmark = r.query.bookmark - } query.subquery = true + query.bookmark = r.query.bookmark return findQuery(r.query.tx, result, query) } // SubAggregateQuery allows you to run another aggregate query in the same transaction for each // record in a parent query func (r *RecordAccess) SubAggregateQuery(query *Query, groupBy ...string) ([]*AggregateResult, error) { - if r.query.writable { - query.bookmark = r.query.bookmark - } query.subquery = true + query.bookmark = r.query.bookmark return aggregateQuery(r.query.tx, r.record, query, groupBy...) } @@ -565,7 +561,7 @@ func runQuery(tx *badger.Txn, dataType interface{}, query *Query, retrievedKeys } iter := newIterator(tx, storer.Type(), query, query.bookmark) - if query.writable || query.subquery { + if (query.writable || query.subquery) && query.bookmark == nil { query.bookmark = iter.createBookmark() }