From 42a00bc01ca385dbcdf9f11519eb24aa367f8f46 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Mar 2019 12:46:55 -0700 Subject: [PATCH 01/15] mount: add sorting support --- mount/mount.go | 171 ++++++++++++++++++++++++++++++-------------- mount/mount_test.go | 12 ++-- query/order.go | 18 +++++ query/query_impl.go | 17 +---- 4 files changed, 141 insertions(+), 77 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 9d26f46..5d51b8e 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -3,6 +3,7 @@ package mount import ( + "container/heap" "errors" "fmt" "sort" @@ -49,6 +50,102 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) { return nil, ds.NewKey("/"), key } +type queryResults struct { + mount ds.Key + results query.Results + next query.Result +} + +func (qr *queryResults) advance() bool { + if qr.results == nil { + return false + } + + qr.next = query.Result{} + r, more := qr.results.NextSync() + if !more { + err := qr.results.Close() + qr.results = nil + if err != nil { + // One more result, the error. + qr.next = query.Result{Error: err} + return true + } + return false + } + + r.Key = qr.mount.Child(ds.RawKey(r.Key)).String() + qr.next = r + return true +} + +type querySet struct { + order []query.Order + heads []*queryResults +} + +func (h *querySet) Len() int { + return len(h.heads) +} + +func (h *querySet) Less(i, j int) bool { + return query.Less(h.order, h.heads[i].next.Entry, h.heads[j].next.Entry) +} + +func (h *querySet) Swap(i, j int) { + h.heads[i], h.heads[j] = h.heads[j], h.heads[i] +} + +func (h *querySet) Push(x interface{}) { + h.heads = append(h.heads, x.(*queryResults)) +} + +func (h *querySet) Pop() interface{} { + i := len(h.heads) - 1 + last := h.heads[i] + h.heads[i] = nil + h.heads = h.heads[:i] + return last +} + +func (h *querySet) close() error { + var errs []error + for _, qr := range h.heads { + err := qr.results.Close() + if err != nil { + errs = append(errs, err) + } + } + h.heads = nil + if len(errs) > 0 { + return errs[0] + } + return nil +} + +func (h *querySet) addResults(mount ds.Key, results query.Results) { + r := &queryResults{ + results: results, + mount: mount, + } + if r.advance() { + heap.Push(h, r) + } +} + +func (h *querySet) next() (query.Result, bool) { + if len(h.heads) == 0 { + return query.Result{}, false + } + next := h.heads[0].next + if h.heads[0].advance() { + heap.Fix(h, 0) + } else { + heap.Remove(h, 0) + } + return next, true +} + // lookupAll returns all mounts that might contain keys that are descendant of // // Matching: /ao/e @@ -123,7 +220,6 @@ func (d *Datastore) Delete(key ds.Key) error { func (d *Datastore) Query(q query.Query) (query.Results, error) { if len(q.Filters) > 0 || - len(q.Orders) > 0 || q.Limit > 0 || q.Offset > 0 { // TODO this is still overly simplistic, but the only callers are @@ -133,60 +229,29 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { prefix := ds.NewKey(q.Prefix) dses, mounts, rests := d.lookupAll(prefix) - // current itorator state - var res query.Results - var mount ds.Key - i := 0 + queries := &querySet{ + order: q.Orders, + heads: make([]*queryResults, 0, len(dses)), + } - return query.ResultsFromIterator(q, query.Iterator{ - Next: func() (query.Result, bool) { - var r query.Result - var more bool - - for try := true; try; try = len(dses) > i { - if res == nil { - if len(dses) <= i { - //This should not happen normally - return query.Result{}, false - } - - dst := dses[i] - mount = mounts[i] - rest := rests[i] - - q2 := q - q2.Prefix = rest.String() - r, err := dst.Query(q2) - if err != nil { - return query.Result{Error: err}, false - } - res = r - } - - r, more = res.NextSync() - if !more { - err := res.Close() - if err != nil { - return query.Result{Error: err}, false - } - res = nil - - i++ - more = len(dses) > i - } else { - break - } - } + for i := range dses { + mount := mounts[i] + dstore := dses[i] + rest := rests[i] - r.Key = mount.Child(ds.RawKey(r.Key)).String() - return r, more - }, - Close: func() error { - if len(mounts) > i && res != nil { - return res.Close() - } - return nil - }, + qi := q + qi.Prefix = rest.String() + results, err := dstore.Query(qi) + if err != nil { + _ = queries.close() + return nil, err + } + queries.addResults(mount, results) + } + + return query.ResultsFromIterator(q, query.Iterator{ + Next: queries.next, + Close: queries.close, }), nil } diff --git a/mount/mount_test.go b/mount/mount_test.go index 9f63676..618e1e0 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -353,14 +353,10 @@ func TestErrQueryClose(t *testing.T) { m.Put(datastore.NewKey("/baz"), []byte("123")) - qr, err := m.Query(query.Query{}) - if err != nil { - t.Fatalf("Query error: %v", err) - } - - e, ok := qr.NextSync() - if ok != false || e.Error == nil { - t.Errorf("Query was ok or q.Error was nil") + _, err := m.Query(query.Query{}) + if err == nil { + t.Fatal("expected query to fail") + return } } diff --git a/query/order.go b/query/order.go index 2ad1c99..f661554 100644 --- a/query/order.go +++ b/query/order.go @@ -46,3 +46,21 @@ type OrderByKeyDescending struct{} func (o OrderByKeyDescending) Compare(a, b Entry) int { return -strings.Compare(a.Key, b.Key) } + +// Less returns true if a comes before b with the requested orderings. +func Less(orders []Order, a, b Entry) bool { + for _, cmp := range orders { + switch cmp.Compare(a, b) { + case 0: + case -1: + return true + case 1: + return false + } + } + + // This gives us a *stable* sort for free. We don't care + // preserving the order from the underlying datastore + // because it's undefined. + return a.Key < b.Key +} diff --git a/query/query_impl.go b/query/query_impl.go index d29b0e3..5ea27cc 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -97,22 +97,7 @@ func NaiveOrder(qr Results, orders ...Order) Results { entries = append(entries, e.Entry) } sort.Slice(entries, func(i int, j int) bool { - a, b := entries[i], entries[j] - - for _, cmp := range orders { - switch cmp.Compare(a, b) { - case 0: - case -1: - return true - case 1: - return false - } - } - - // This gives us a *stable* sort for free. We don't care - // preserving the order from the underlying datastore - // because it's undefined. - return a.Key < b.Key + return Less(orders, entries[i], entries[j]) }) for _, e := range entries { From 0ab21bf75b1353aec978df54de586f5ca801fbe9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Mar 2019 12:56:06 -0700 Subject: [PATCH 02/15] mount: add filter support --- mount/mount.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 5d51b8e..bfb4c1c 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -80,7 +80,7 @@ func (qr *queryResults) advance() bool { } type querySet struct { - order []query.Order + query query.Query heads []*queryResults } @@ -89,7 +89,7 @@ func (h *querySet) Len() int { } func (h *querySet) Less(i, j int) bool { - return query.Less(h.order, h.heads[i].next.Entry, h.heads[j].next.Entry) + return query.Less(h.query.Orders, h.heads[i].next.Entry, h.heads[j].next.Entry) } func (h *querySet) Swap(i, j int) { @@ -137,12 +137,20 @@ func (h *querySet) next() (query.Result, bool) { if len(h.heads) == 0 { return query.Result{}, false } - next := h.heads[0].next - if h.heads[0].advance() { + head := h.heads[0] + next := head.next + for head.advance() { + if head.next.Error == nil { + for _, f := range h.query.Filters { + if !f.Filter(head.next.Entry) { + continue + } + } + } heap.Fix(h, 0) - } else { - heap.Remove(h, 0) + return next, true } + heap.Remove(h, 0) return next, true } @@ -219,8 +227,7 @@ func (d *Datastore) Delete(key ds.Key) error { } func (d *Datastore) Query(q query.Query) (query.Results, error) { - if len(q.Filters) > 0 || - q.Limit > 0 || + if q.Limit > 0 || q.Offset > 0 { // TODO this is still overly simplistic, but the only callers are // `ipfs refs local` and ipfs-ds-convert. @@ -230,7 +237,7 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { dses, mounts, rests := d.lookupAll(prefix) queries := &querySet{ - order: q.Orders, + query: q, heads: make([]*queryResults, 0, len(dses)), } @@ -242,6 +249,7 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { qi := q qi.Prefix = rest.String() results, err := dstore.Query(qi) + if err != nil { _ = queries.close() return nil, err From 32214c5d2aa14735c921f8e1b5ebca0ac18d2599 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 19 Mar 2019 08:45:23 -0700 Subject: [PATCH 03/15] Update test to account for nested mounts --- mount/mount_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/mount/mount_test.go b/mount/mount_test.go index 618e1e0..3f44912 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -297,6 +297,55 @@ func TestQueryCross(t *testing.T) { } } +func TestQueryCrossWithSort(t *testing.T) { + mapds0 := datastore.NewMapDatastore() + mapds1 := datastore.NewMapDatastore() + mapds2 := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/boo/5"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/boo"), Datastore: mapds0}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/zoo/1"), []byte("234")) + m.Put(datastore.NewKey("/boo/9"), []byte("345")) + m.Put(datastore.NewKey("/boo/3"), []byte("456")) + m.Put(datastore.NewKey("/boo/5/hello"), []byte("789")) + + res, err := m.Query(query.Query{Orders: []query.Order{query.OrderByKey{}}}) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/boo/3", + "/boo/5/hello", + "/boo/9", + "/zoo/0", + "/zoo/1", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + func TestLookupPrio(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() From 28e5b7b14d78ce72e102912e9ccb80da2bf979a1 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Thu, 21 Mar 2019 15:41:25 -0700 Subject: [PATCH 04/15] Add NaiveLimit solution --- mount/mount.go | 12 +++------ mount/mount_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++-- query/query.go | 2 +- query/query_impl.go | 7 ++++-- 4 files changed, 67 insertions(+), 13 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index bfb4c1c..9e2c6b8 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -227,12 +227,6 @@ func (d *Datastore) Delete(key ds.Key) error { } func (d *Datastore) Query(q query.Query) (query.Results, error) { - if q.Limit > 0 || - q.Offset > 0 { - // TODO this is still overly simplistic, but the only callers are - // `ipfs refs local` and ipfs-ds-convert. - return nil, errors.New("mount only supports listing all prefixed keys in random order") - } prefix := ds.NewKey(q.Prefix) dses, mounts, rests := d.lookupAll(prefix) @@ -257,10 +251,12 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { queries.addResults(mount, results) } - return query.ResultsFromIterator(q, query.Iterator{ + qr := query.ResultsFromIterator(q, query.Iterator{ Next: queries.next, Close: queries.close, - }), nil + }) + + return query.NaiveLimit(qr, q.Limit), nil } func (d *Datastore) Close() error { diff --git a/mount/mount_test.go b/mount/mount_test.go index 3f44912..7770444 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -2,12 +2,13 @@ package mount_test import ( "errors" + "github.com/ipfs/go-datastore/sync" "testing" "github.com/ipfs/go-datastore" - mount "github.com/ipfs/go-datastore/mount" + "github.com/ipfs/go-datastore/mount" "github.com/ipfs/go-datastore/query" - dstest "github.com/ipfs/go-datastore/test" + "github.com/ipfs/go-datastore/test" ) func TestPutBadNothing(t *testing.T) { @@ -262,6 +263,10 @@ func TestQueryCross(t *testing.T) { } entries, err := res.Rest() if err != nil { + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } t.Fatalf("Query Results.Rest fail: %v\n", err) } seen := 0 @@ -346,6 +351,56 @@ func TestQueryCrossWithSort(t *testing.T) { } } +func TestQueryLimitCrossWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + q := query.Query{Limit: 2, Orders: []query.Order{query.OrderByKeyDescending{}}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/zoo/3", + "/zoo/2", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + func TestLookupPrio(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() diff --git a/query/query.go b/query/query.go index 691fbce..135fa9e 100644 --- a/query/query.go +++ b/query/query.go @@ -3,7 +3,7 @@ package query import ( "time" - goprocess "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess" ) /* diff --git a/query/query_impl.go b/query/query_impl.go index 5ea27cc..f962474 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -1,6 +1,8 @@ package query -import "sort" +import ( + "sort" +) func DerivedResults(qr Results, ch <-chan Result) Results { return &results{ @@ -48,7 +50,7 @@ func NaiveLimit(qr Results, limit int) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveOffset skips a given number of results @@ -122,6 +124,7 @@ func NaiveQueryApply(q Query, qr Results) Results { qr = NaiveOffset(qr, q.Offset) } if q.Limit != 0 { + // TODO: Offset? qr = NaiveLimit(qr, q.Offset) } return qr From 7a8c70e586ca817a7b08e25ddeb4bdbc7be20823 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Thu, 21 Mar 2019 16:12:37 -0700 Subject: [PATCH 05/15] Add another naive solution for offset --- mount/mount.go | 6 +++++- mount/mount_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++ query/query_impl.go | 2 +- 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 9e2c6b8..900a635 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -230,6 +230,10 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { prefix := ds.NewKey(q.Prefix) dses, mounts, rests := d.lookupAll(prefix) + // offset needs to be applied after the results are aggregated + offset := q.Offset + q.Offset = 0 + queries := &querySet{ query: q, heads: make([]*queryResults, 0, len(dses)), @@ -256,7 +260,7 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { Close: queries.close, }) - return query.NaiveLimit(qr, q.Limit), nil + return query.NaiveLimit(query.NaiveOffset(qr, offset), q.Limit), nil } func (d *Datastore) Close() error { diff --git a/mount/mount_test.go b/mount/mount_test.go index 7770444..d02a016 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -401,6 +401,57 @@ func TestQueryLimitCrossWithSort(t *testing.T) { } } +func TestQueryLimitAndOffsetCrossWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + q := query.Query{Limit: 3, Offset: 2, Orders: []query.Order{query.OrderByKey{}}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/rok/3", + "/zoo/0", + "/zoo/1", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + func TestLookupPrio(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() diff --git a/query/query_impl.go b/query/query_impl.go index f962474..d4804ab 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -74,7 +74,7 @@ func NaiveOffset(qr Results, offset int) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveOrder reorders results according to given orders. From 2cec99fa3b5cde5055a3834e33cb77fa4c33a4ac Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 08:17:29 -0700 Subject: [PATCH 06/15] Add note for where implementation may need to go --- mount/mount.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mount/mount.go b/mount/mount.go index 900a635..a46118b 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -139,6 +139,7 @@ func (h *querySet) next() (query.Result, bool) { } head := h.heads[0] next := head.next + for head.advance() { if head.next.Error == nil { for _, f := range h.query.Filters { @@ -146,6 +147,7 @@ func (h *querySet) next() (query.Result, bool) { continue } } + // can limit and offset be implemented here? } heap.Fix(h, 0) return next, true From 9550a82b0c2ab6b719b9eaa919574827b048437d Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 09:44:13 -0700 Subject: [PATCH 07/15] Only apply NaiveOffset and NaiveLimit when appropriate --- mount/mount.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mount/mount.go b/mount/mount.go index a46118b..5653604 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -262,7 +262,15 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { Close: queries.close, }) - return query.NaiveLimit(query.NaiveOffset(qr, offset), q.Limit), nil + if offset > 0 { + qr = query.NaiveOffset(qr, offset) + } + + if q.Limit > 0 { + qr = query.NaiveLimit(qr, q.Limit) + } + + return qr, nil } func (d *Datastore) Close() error { From 5b4de5b291b89f2c73d5e391f931acc556f0b2e6 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 10:40:23 -0700 Subject: [PATCH 08/15] Fix filters by using NaiveFilter --- mount/mount.go | 16 ++++---- mount/mount_test.go | 91 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 5653604..0ef103d 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -141,14 +141,6 @@ func (h *querySet) next() (query.Result, bool) { next := head.next for head.advance() { - if head.next.Error == nil { - for _, f := range h.query.Filters { - if !f.Filter(head.next.Entry) { - continue - } - } - // can limit and offset be implemented here? - } heap.Fix(h, 0) return next, true } @@ -235,6 +227,8 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { // offset needs to be applied after the results are aggregated offset := q.Offset q.Offset = 0 + filters := q.Filters + q.Filters = []query.Filter{} queries := &querySet{ query: q, @@ -262,6 +256,12 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { Close: queries.close, }) + if len(filters) > 0 { + for _, f := range filters { + qr = query.NaiveFilter(qr, f) + } + } + if offset > 0 { qr = query.NaiveOffset(qr, offset) } diff --git a/mount/mount_test.go b/mount/mount_test.go index d02a016..9d19813 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -452,6 +452,97 @@ func TestQueryLimitAndOffsetCrossWithSort(t *testing.T) { } } +func TestQueryFilterAcrossMountsWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + f := &query.FilterKeyCompare{Op: query.Equal, Key: "/rok/3"} + q := query.Query{Filters: []query.Filter{f}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/rok/3", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimit(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + + q := query.Query{Limit: 1, Orders: []query.Order{query.OrderByKeyDescending{}}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/rok/3", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + func TestLookupPrio(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() From 37e058e7cbf5c76c79e41f788962756344948731 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 11:03:28 -0700 Subject: [PATCH 09/15] Update mount query test names --- mount/mount_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mount/mount_test.go b/mount/mount_test.go index 9d19813..af776e4 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -239,7 +239,7 @@ func TestQuerySimple(t *testing.T) { } } -func TestQueryCross(t *testing.T) { +func TestQueryAcrossMounts(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() mapds2 := datastore.NewMapDatastore() @@ -302,7 +302,7 @@ func TestQueryCross(t *testing.T) { } } -func TestQueryCrossWithSort(t *testing.T) { +func TestQueryAcrossMountsWithSort(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() mapds2 := datastore.NewMapDatastore() @@ -351,7 +351,7 @@ func TestQueryCrossWithSort(t *testing.T) { } } -func TestQueryLimitCrossWithSort(t *testing.T) { +func TestQueryLimitAcrossMountsWithSort(t *testing.T) { mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) @@ -401,7 +401,7 @@ func TestQueryLimitCrossWithSort(t *testing.T) { } } -func TestQueryLimitAndOffsetCrossWithSort(t *testing.T) { +func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) From 1633052084eec0ca950bfc8907cd55495e339150 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 11:03:52 -0700 Subject: [PATCH 10/15] Fix bug with NaiveQueryApply and add corresponding tests --- query/query_impl.go | 4 +-- query/query_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/query/query_impl.go b/query/query_impl.go index d4804ab..b0dcfaf 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -26,7 +26,7 @@ func NaiveFilter(qr Results, filter Filter) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveLimit truncates the results to a given int limit @@ -125,7 +125,7 @@ func NaiveQueryApply(q Query, qr Results) Results { } if q.Limit != 0 { // TODO: Offset? - qr = NaiveLimit(qr, q.Offset) + qr = NaiveLimit(qr, q.Limit) } return qr } diff --git a/query/query_test.go b/query/query_test.go index 28e5301..228c69a 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -40,6 +40,70 @@ func testResults(t *testing.T, res Results, expect []string) { } } +func TestNaiveQueryApply(t *testing.T) { + testNaiveQueryApply := func(t *testing.T, query Query, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(query, e) + res = NaiveQueryApply(query, res) + + testResults(t, res, expect) + } + + q := Query{Limit: 2} + + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + }) + + q = Query{Offset: 3, Limit: 2} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/abce", + "/abcf", + }) + + f := &FilterKeyCompare{Op: Equal, Key: "/ab"} + q = Query{Filters: []Filter{f}} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab", + }) + + q = Query{Prefix: "/ab"} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + "/abce", + "/abcf", + "/ab", + }) + + q = Query{Orders: []Order{OrderByKeyDescending{}}} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/abcf", + "/abce", + "/ab/cd", + "/ab/c", + "/ab", + "/a", + }) + + q = Query{ + Limit: 3, + Offset: 2, + Prefix: "/ab", + Orders: []Order{OrderByKey{}}, + } + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/cd", + "/abce", + "/abcf", + }) +} + func TestLimit(t *testing.T) { testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) { e := make([]Entry, len(keys)) From d37201991a05a3806d08f311de5cc2002f7d97bd Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 14:03:37 -0700 Subject: [PATCH 11/15] Make new query for children --- mount/mount.go | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 0ef103d..62787bf 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -220,18 +220,20 @@ func (d *Datastore) Delete(key ds.Key) error { return cds.Delete(k) } -func (d *Datastore) Query(q query.Query) (query.Results, error) { - prefix := ds.NewKey(q.Prefix) - dses, mounts, rests := d.lookupAll(prefix) +func (d *Datastore) Query(master query.Query) (query.Results, error) { + childQuery := query.Query{ + Prefix: master.Prefix, + Limit: master.Limit, + Orders: master.Orders, + KeysOnly: master.KeysOnly, + ReturnExpirations: master.ReturnExpirations, + } - // offset needs to be applied after the results are aggregated - offset := q.Offset - q.Offset = 0 - filters := q.Filters - q.Filters = []query.Filter{} + prefix := ds.NewKey(childQuery.Prefix) + dses, mounts, rests := d.lookupAll(prefix) queries := &querySet{ - query: q, + query: childQuery, heads: make([]*queryResults, 0, len(dses)), } @@ -240,7 +242,7 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { dstore := dses[i] rest := rests[i] - qi := q + qi := childQuery qi.Prefix = rest.String() results, err := dstore.Query(qi) @@ -251,23 +253,23 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { queries.addResults(mount, results) } - qr := query.ResultsFromIterator(q, query.Iterator{ + qr := query.ResultsFromIterator(childQuery, query.Iterator{ Next: queries.next, Close: queries.close, }) - if len(filters) > 0 { - for _, f := range filters { + if len(master.Filters) > 0 { + for _, f := range master.Filters { qr = query.NaiveFilter(qr, f) } } - if offset > 0 { - qr = query.NaiveOffset(qr, offset) + if master.Offset > 0 { + qr = query.NaiveOffset(qr, master.Offset) } - if q.Limit > 0 { - qr = query.NaiveLimit(qr, q.Limit) + if childQuery.Limit > 0 { + qr = query.NaiveLimit(qr, childQuery.Limit) } return qr, nil From f50f59c1fbbed31756847f639af209c126799d3e Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 14:03:48 -0700 Subject: [PATCH 12/15] Add more tests --- mount/mount_test.go | 81 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 10 deletions(-) diff --git a/mount/mount_test.go b/mount/mount_test.go index af776e4..0b2f087 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -502,17 +502,49 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) { } } -func TestQueryLimit(t *testing.T) { +func TestQueryLimitAndOffsetWithNoData(t *testing.T) { mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) m := mount.New([]mount.Mount{ {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, }) - m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) - m.Put(datastore.NewKey("/rok/1"), []byte("def")) - m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + q := query.Query{Limit: 4, Offset: 3} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{} + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimitWithNotEnoughData(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("167")) - q := query.Query{Limit: 1, Orders: []query.Order{query.OrderByKeyDescending{}}} + q := query.Query{Limit: 4} res, err := m.Query(q) if err != nil { t.Fatalf("Query fail: %v\n", err) @@ -524,17 +556,46 @@ func TestQueryLimit(t *testing.T) { } expect := []string{ - "/rok/3", + "/zoo/0", + "/rok/1", } if len(entries) != len(expect) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - for i, e := range expect { - if e != entries[i].Key { - t.Errorf("expected key %s, but got %s", e, entries[i].Key) - } + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryOffsetWithNotEnoughData(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("167")) + + q := query.Query{Offset: 4} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{} + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } err = res.Close() From b94a6236710ceb086b0a3401af9060b9b5bbc0f8 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 14:06:37 -0700 Subject: [PATCH 13/15] Run go fmt on mount.go --- mount/mount.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 62787bf..0e7011b 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -222,10 +222,10 @@ func (d *Datastore) Delete(key ds.Key) error { func (d *Datastore) Query(master query.Query) (query.Results, error) { childQuery := query.Query{ - Prefix: master.Prefix, - Limit: master.Limit, - Orders: master.Orders, - KeysOnly: master.KeysOnly, + Prefix: master.Prefix, + Limit: master.Limit, + Orders: master.Orders, + KeysOnly: master.KeysOnly, ReturnExpirations: master.ReturnExpirations, } From e192b45b22850d5fec2b7ac707f51624ccd7a38e Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 16:30:50 -0700 Subject: [PATCH 14/15] Cleanup --- mount/mount_test.go | 10 +++++----- query/query.go | 2 +- query/query_impl.go | 5 +---- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/mount/mount_test.go b/mount/mount_test.go index 0b2f087..03a854e 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -2,13 +2,13 @@ package mount_test import ( "errors" - "github.com/ipfs/go-datastore/sync" "testing" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/mount" - "github.com/ipfs/go-datastore/query" - "github.com/ipfs/go-datastore/test" + datastore "github.com/ipfs/go-datastore" + mount "github.com/ipfs/go-datastore/mount" + query "github.com/ipfs/go-datastore/query" + sync "github.com/ipfs/go-datastore/sync" + dstest "github.com/ipfs/go-datastore/test" ) func TestPutBadNothing(t *testing.T) { diff --git a/query/query.go b/query/query.go index 135fa9e..691fbce 100644 --- a/query/query.go +++ b/query/query.go @@ -3,7 +3,7 @@ package query import ( "time" - "github.com/jbenet/goprocess" + goprocess "github.com/jbenet/goprocess" ) /* diff --git a/query/query_impl.go b/query/query_impl.go index b0dcfaf..440513c 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -1,8 +1,6 @@ package query -import ( - "sort" -) +import "sort" func DerivedResults(qr Results, ch <-chan Result) Results { return &results{ @@ -124,7 +122,6 @@ func NaiveQueryApply(q Query, qr Results) Results { qr = NaiveOffset(qr, q.Offset) } if q.Limit != 0 { - // TODO: Offset? qr = NaiveLimit(qr, q.Limit) } return qr From 18ef6442361166e9e4f543d51cd298246b0113c4 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 22 Mar 2019 16:36:30 -0700 Subject: [PATCH 15/15] Remove unnecessary for --- mount/mount.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mount/mount.go b/mount/mount.go index 0e7011b..326277a 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -140,11 +140,12 @@ func (h *querySet) next() (query.Result, bool) { head := h.heads[0] next := head.next - for head.advance() { + if head.advance() { heap.Fix(h, 0) - return next, true + } else { + heap.Remove(h, 0) } - heap.Remove(h, 0) + return next, true }