Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ordering by key on mount query #121

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 62 additions & 65 deletions mount/mount.go
Expand Up @@ -49,6 +49,12 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}

type lookupResult struct {
ds ds.Datastore
mount ds.Key
rest ds.Key
}

// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
Expand All @@ -59,26 +65,32 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
func (d *Datastore) lookupAll(key ds.Key) (results []*lookupResult) {
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}

if strings.HasPrefix(p, key.String()) {
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/"))
result := &lookupResult{
m.Datastore,
m.Prefix,
ds.NewKey("/"),
}
results = append(results, result)
} else if strings.HasPrefix(key.String(), p) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())

dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r))
result := &lookupResult{
m.Datastore,
m.Prefix,
ds.NewKey(r),
}
results = append(results, result)
}
}
return dst, mountpoint, rest
return results
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
Expand Down Expand Up @@ -123,71 +135,56 @@ func (d *Datastore) Delete(key ds.Key) error {

func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
return nil, errors.New("mount does not support filters, limit, or offset")
}
prefix := ds.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)

// current itorator state
var res query.Results
var mount ds.Key
i := 0

return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool

for try := true; try; try = len(dses) > i {
if res == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}

dst := dses[i]
mount = mounts[i]
rest := rests[i]

q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}

r, more = res.NextSync()
if !more {
err := res.Close()
if err != nil {
return query.Result{Error: err}, false
}
res = nil

i++
more = len(dses) > i
} else {
break
}
lrs := d.lookupAll(prefix)

results := make(chan query.Result)
wg := sync.WaitGroup{}
for _, lr := range lrs {
wg.Add(1)
go func(lr *lookupResult) {
defer wg.Done()
rs, err := lr.ds.Query(q)
if err != nil {
results <- query.Result{Error: err}
return
}

r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i && res != nil {
return res.Close()
es, err := rs.Rest()
if err != nil {
results <- query.Result{Error: err}
return
}
return nil
},
}), nil
for _, e := range es {
e.Key = lr.mount.Child(ds.RawKey(e.Key)).String()
results <- query.Result{Entry: e, Error: nil}
}
}(lr)
}
rs := make([]query.Result, 0)
go func() {
for result := range results {
rs = append(rs, result)
}
}()
wg.Wait()
close(results)
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case query.OrderByKey:
sort.SliceStable(rs, func(i, j int) bool { return strings.Compare(rs[i].Key, rs[j].Key) < 0 })
case query.OrderByKeyDescending:
sort.SliceStable(rs, func(i, j int) bool { return strings.Compare(rs[i].Key, rs[j].Key) >= 0 })
default:
return nil, errors.New("mount only supports sorting keys in lexical order")
}
}
return query.ResultsWithResults(q, rs), nil
}

func (d *Datastore) Close() error {
Expand Down
49 changes: 49 additions & 0 deletions mount/mount_test.go
Expand Up @@ -297,6 +297,55 @@ func TestQueryCross(t *testing.T) {
}
}

func TestQueryCrossWithStableSort(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/zoo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/boo/5"), Datastore: mapds2},
{Prefix: datastore.NewKey("/boo"), Datastore: mapds0},
})

m.Put(datastore.NewKey("/zoo/0"), []byte("123"))
m.Put(datastore.NewKey("/zoo/1"), []byte("234"))
m.Put(datastore.NewKey("/boo/9"), []byte("345"))
m.Put(datastore.NewKey("/boo/3"), []byte("456"))
m.Put(datastore.NewKey("/boo/5/hello"), []byte("789"))

res, err := m.Query(query.Query{Orders: []query.Order{query.OrderByKey{}}})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}

expect := []string{
"/boo/3",
"/boo/5/hello",
"/boo/9",
"/zoo/0",
"/zoo/1",
}

if len(entries) != len(expect) {
t.Fatalf("expected %d entries, but got %d", len(expect), len(entries))
}

for i, e := range expect {
if e != entries[i].Key {
t.Errorf("expected key %s, but got %s", e, entries[i].Key)
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
}

func TestLookupPrio(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
Expand Down
20 changes: 20 additions & 0 deletions query/query.go
Expand Up @@ -251,6 +251,26 @@ func ResultsWithEntries(q Query, res []Entry) Results {
return b.Results()
}

// ResultsWithResults returns a Results object from a list of Result
func ResultsWithResults(q Query, res []Result) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for _, r := range res {
select {
case b.Output <- r:
case <-worker.Closing(): // client told us to close early
return
}
}
return
})

go b.Process.CloseAfterChildren()
return b.Results()
}

func ResultsReplaceQuery(r Results, q Query) Results {
switch r := r.(type) {
case *results:
Expand Down