Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix – sorted limited offset mount queries #124

Merged
merged 15 commits into from Mar 23, 2019
Merged
212 changes: 149 additions & 63 deletions mount/mount.go
Expand Up @@ -3,6 +3,7 @@
package mount

import (
"container/heap"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -49,6 +50,105 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}

type queryResults struct {
mount ds.Key
results query.Results
next query.Result
}

func (qr *queryResults) advance() bool {
if qr.results == nil {
return false
}

qr.next = query.Result{}
r, more := qr.results.NextSync()
if !more {
err := qr.results.Close()
qr.results = nil
if err != nil {
// One more result, the error.
qr.next = query.Result{Error: err}
return true
}
return false
}

r.Key = qr.mount.Child(ds.RawKey(r.Key)).String()
qr.next = r
return true
}

type querySet struct {
query query.Query
heads []*queryResults
}

func (h *querySet) Len() int {
return len(h.heads)
}

func (h *querySet) Less(i, j int) bool {
return query.Less(h.query.Orders, h.heads[i].next.Entry, h.heads[j].next.Entry)
}

func (h *querySet) Swap(i, j int) {
h.heads[i], h.heads[j] = h.heads[j], h.heads[i]
}

func (h *querySet) Push(x interface{}) {
h.heads = append(h.heads, x.(*queryResults))
}

func (h *querySet) Pop() interface{} {
i := len(h.heads) - 1
last := h.heads[i]
h.heads[i] = nil
h.heads = h.heads[:i]
return last
}

func (h *querySet) close() error {
var errs []error
for _, qr := range h.heads {
err := qr.results.Close()
if err != nil {
errs = append(errs, err)
}
}
h.heads = nil
if len(errs) > 0 {
return errs[0]
}
return nil
}

func (h *querySet) addResults(mount ds.Key, results query.Results) {
r := &queryResults{
results: results,
mount: mount,
}
if r.advance() {
heap.Push(h, r)
}
}

func (h *querySet) next() (query.Result, bool) {
if len(h.heads) == 0 {
return query.Result{}, false
}
head := h.heads[0]
next := head.next

if head.advance() {
heap.Fix(h, 0)
} else {
heap.Remove(h, 0)
}

return next, true
}

// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
Expand Down Expand Up @@ -121,73 +221,59 @@ func (d *Datastore) Delete(key ds.Key) error {
return cds.Delete(k)
}

func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
func (d *Datastore) Query(master query.Query) (query.Results, error) {
childQuery := query.Query{
Prefix: master.Prefix,
Limit: master.Limit,
Orders: master.Orders,
KeysOnly: master.KeysOnly,
ReturnExpirations: master.ReturnExpirations,
}
prefix := ds.NewKey(q.Prefix)

prefix := ds.NewKey(childQuery.Prefix)
dses, mounts, rests := d.lookupAll(prefix)

// current itorator state
var res query.Results
var mount ds.Key
i := 0

return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool

for try := true; try; try = len(dses) > i {
if res == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}

dst := dses[i]
mount = mounts[i]
rest := rests[i]

q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}

r, more = res.NextSync()
if !more {
err := res.Close()
if err != nil {
return query.Result{Error: err}, false
}
res = nil

i++
more = len(dses) > i
} else {
break
}
}
queries := &querySet{
query: childQuery,
heads: make([]*queryResults, 0, len(dses)),
}

r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i && res != nil {
return res.Close()
}
return nil
},
}), nil
for i := range dses {
mount := mounts[i]
dstore := dses[i]
rest := rests[i]

qi := childQuery
qi.Prefix = rest.String()
results, err := dstore.Query(qi)

if err != nil {
_ = queries.close()
return nil, err
}
queries.addResults(mount, results)
}

qr := query.ResultsFromIterator(childQuery, query.Iterator{
Next: queries.next,
Close: queries.close,
})

if len(master.Filters) > 0 {
for _, f := range master.Filters {
qr = query.NaiveFilter(qr, f)
}
}

if master.Offset > 0 {
qr = query.NaiveOffset(qr, master.Offset)
}

if childQuery.Limit > 0 {
qr = query.NaiveLimit(qr, childQuery.Limit)
}

return qr, nil
}

func (d *Datastore) Close() error {
Expand Down