Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix – sorted limited offset mount queries #124

Merged
merged 15 commits into from Mar 23, 2019
Merged
205 changes: 144 additions & 61 deletions mount/mount.go
Expand Up @@ -3,6 +3,7 @@
package mount

import (
"container/heap"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -49,6 +50,112 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}

type queryResults struct {
mount ds.Key
results query.Results
next query.Result
}

func (qr *queryResults) advance() bool {
if qr.results == nil {
return false
}

qr.next = query.Result{}
r, more := qr.results.NextSync()
if !more {
err := qr.results.Close()
qr.results = nil
if err != nil {
// One more result, the error.
qr.next = query.Result{Error: err}
return true
}
return false
}

r.Key = qr.mount.Child(ds.RawKey(r.Key)).String()
qr.next = r
return true
}

type querySet struct {
query query.Query
heads []*queryResults
}

func (h *querySet) Len() int {
return len(h.heads)
}

func (h *querySet) Less(i, j int) bool {
return query.Less(h.query.Orders, h.heads[i].next.Entry, h.heads[j].next.Entry)
}

func (h *querySet) Swap(i, j int) {
h.heads[i], h.heads[j] = h.heads[j], h.heads[i]
}

func (h *querySet) Push(x interface{}) {
h.heads = append(h.heads, x.(*queryResults))
}

func (h *querySet) Pop() interface{} {
i := len(h.heads) - 1
last := h.heads[i]
h.heads[i] = nil
h.heads = h.heads[:i]
return last
}

func (h *querySet) close() error {
var errs []error
for _, qr := range h.heads {
err := qr.results.Close()
if err != nil {
errs = append(errs, err)
}
}
h.heads = nil
if len(errs) > 0 {
return errs[0]
}
return nil
}

func (h *querySet) addResults(mount ds.Key, results query.Results) {
r := &queryResults{
results: results,
mount: mount,
}
if r.advance() {
heap.Push(h, r)
}
}

func (h *querySet) next() (query.Result, bool) {
if len(h.heads) == 0 {
return query.Result{}, false
}
head := h.heads[0]
next := head.next

for head.advance() {
if head.next.Error == nil {
for _, f := range h.query.Filters {
if !f.Filter(head.next.Entry) {
continue
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, we could probably switch to the naive filter as well. I'm not sure why I did it this way (well, it'll be slightly faster but not by much).

Copy link

@eingenito eingenito Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien Actually we're finding that we might have to use the NaiveFilter implementation because if the underlying datastores support filter they're going to be filtering only on the subkey (if they filter on key) meaning they'll exclude their mount prefix. So we probably have to treat filters in the same way we do offsets by stashing them and then removing them from the Query that is passed to each datastore, and then applying them at then end when the keys are full keys. So we're going to do as you suggest here and use NaiveFilter. It reads a little clearer too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a commit that fixes filtering by using NaiveFilter: 5b4de5b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. We definitely need to remove the filters from the query we're forwarding. Actually, we should probably just create a new query.

// can limit and offset be implemented here?
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
}
heap.Fix(h, 0)
return next, true
}
heap.Remove(h, 0)
return next, true
}

// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
Expand Down Expand Up @@ -122,72 +229,48 @@ func (d *Datastore) Delete(key ds.Key) error {
}

func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
prefix := ds.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)

// current itorator state
var res query.Results
var mount ds.Key
i := 0

return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool

for try := true; try; try = len(dses) > i {
if res == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}

dst := dses[i]
mount = mounts[i]
rest := rests[i]

q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}
// offset needs to be applied after the results are aggregated
offset := q.Offset
q.Offset = 0

r, more = res.NextSync()
if !more {
err := res.Close()
if err != nil {
return query.Result{Error: err}, false
}
res = nil

i++
more = len(dses) > i
} else {
break
}
}
queries := &querySet{
query: q,
heads: make([]*queryResults, 0, len(dses)),
}

r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i && res != nil {
return res.Close()
}
return nil
},
}), nil
for i := range dses {
mount := mounts[i]
dstore := dses[i]
rest := rests[i]

qi := q
qi.Prefix = rest.String()
results, err := dstore.Query(qi)

if err != nil {
_ = queries.close()
return nil, err
}
queries.addResults(mount, results)
}

qr := query.ResultsFromIterator(q, query.Iterator{
Next: queries.next,
Close: queries.close,
})

if offset > 0 {
qr = query.NaiveOffset(qr, offset)
}

if q.Limit > 0 {
qr = query.NaiveLimit(qr, q.Limit)
}

return qr, nil
}

func (d *Datastore) Close() error {
Expand Down