Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the keytransform datastore's query implementation #127

Merged
merged 7 commits into from Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 0 additions & 21 deletions keytransform/interface.go
Expand Up @@ -11,24 +11,3 @@ type KeyTransform interface {
ConvertKey(ds.Key) ds.Key
InvertKey(ds.Key) ds.Key
}

// Datastore is a keytransform.Datastore
type Datastore interface {
ds.Shim
KeyTransform
}

// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) *ktds {
if t == nil {
panic("t (KeyTransform) is nil")
}

if child == nil {
panic("child (ds.Datastore) is nil")
}

return &ktds{child: child, KeyTransform: t}
}
167 changes: 138 additions & 29 deletions keytransform/keytransform.go
Expand Up @@ -5,68 +5,72 @@ import (
dsq "github.com/ipfs/go-datastore/query"
)

type Pair struct {
Convert KeyMapping
Invert KeyMapping
}
// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) *Datastore {
if t == nil {
panic("t (KeyTransform) is nil")
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}
if child == nil {
panic("child (ds.Datastore) is nil")
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
return &Datastore{child: child, KeyTransform: t}
}

// ktds keeps a KeyTransform function
type ktds struct {
// Datastore keeps a KeyTransform function
type Datastore struct {
child ds.Datastore

KeyTransform
}

// Children implements ds.Shim
func (d *ktds) Children() []ds.Datastore {
func (d *Datastore) Children() []ds.Datastore {
return []ds.Datastore{d.child}
}

// Put stores the given value, transforming the key first.
func (d *ktds) Put(key ds.Key, value []byte) (err error) {
func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value)
}

// Get returns the value for given key, transforming the key first.
func (d *ktds) Get(key ds.Key) (value []byte, err error) {
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key))
}

// Has returns whether the datastore has a value for a given key, transforming
// the key first.
func (d *ktds) Has(key ds.Key) (exists bool, err error) {
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
return d.child.Has(d.ConvertKey(key))
}

// GetSize returns the size of the value named by the given key, transforming
// the key first.
func (d *ktds) GetSize(key ds.Key) (size int, err error) {
func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return d.child.GetSize(d.ConvertKey(key))
}

// Delete removes the value for given key
func (d *ktds) Delete(key ds.Key) (err error) {
func (d *Datastore) Delete(key ds.Key) (err error) {
return d.child.Delete(d.ConvertKey(key))
}

// Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
nq, cq := d.prepareQuery(q)

cqr, err := d.child.Query(cq)
if err != nil {
return nil, err
}

return dsq.ResultsFromIterator(q, dsq.Iterator{
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync()
r, ok := cqr.NextSync()
if !ok {
return r, false
}
Expand All @@ -76,21 +80,120 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return r, true
},
Close: func() error {
return qr.Close()
return cqr.Close()
},
}), nil
})
return dsq.NaiveQueryApply(nq, qr), nil
}

func (d *ktds) Close() error {
// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *Datastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {

// First, put everything in the child query. Then, start taking things
// out.
child = q

// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()

// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}

// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// if the key transform preserves order, we can delegate
// to the child datastore.
if orderPreserving {
// When sorting, we compare with the first
// Order, then, if equal, we compare with the
// second Order, etc. However, keys are _unique_
// so we'll never apply any additional orders
// after ordering by key.
child.Orders = child.Orders[:i+1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I'm missing something here. Why drop other orders here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sort hierarchically. That is, when comparing two entries, we compare with the first "order", then the second order if equal, then the third order if still equal, etc. Given that keys are unique, "order by key" will never return "equal" so there's no point in applying further orders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment and improved the test coverage more to get codecov to stop complaining.

break orders
}
}

// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}

// Try to let the child handle the filters.

// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)

for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}

// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}

func (d *Datastore) Close() error {
return d.child.Close()
}

// DiskUsage implements the PersistentDatastore interface.
func (d *ktds) DiskUsage() (uint64, error) {
func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child)
}

func (d *ktds) Batch() (ds.Batch, error) {
func (d *Datastore) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
Expand Down Expand Up @@ -124,23 +227,29 @@ func (t *transformBatch) Commit() error {
return t.dst.Commit()
}

func (d *ktds) Check() error {
func (d *Datastore) Check() error {
if c, ok := d.child.(ds.CheckedDatastore); ok {
return c.Check()
}
return nil
}

func (d *ktds) Scrub() error {
func (d *Datastore) Scrub() error {
if c, ok := d.child.(ds.ScrubbedDatastore); ok {
return c.Scrub()
}
return nil
}

func (d *ktds) CollectGarbage() error {
func (d *Datastore) CollectGarbage() error {
if c, ok := d.child.(ds.GCDatastore); ok {
return c.CollectGarbage()
}
return nil
}

var _ ds.Datastore = (*Datastore)(nil)
var _ ds.GCDatastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
var _ ds.PersistentDatastore = (*Datastore)(nil)
var _ ds.ScrubbedDatastore = (*Datastore)(nil)
42 changes: 28 additions & 14 deletions keytransform/keytransform_test.go
Expand Up @@ -20,22 +20,24 @@ type DSSuite struct{}

var _ = Suite(&DSSuite{})

func (ks *DSSuite) TestBasic(c *C) {
func testDatastore() {
}

pair := &kt.Pair{
Convert: func(k ds.Key) ds.Key {
return ds.NewKey("/abc").Child(k)
},
Invert: func(k ds.Key) ds.Key {
// remove abc prefix
l := k.List()
if l[0] != "abc" {
panic("key does not have prefix. convert failed?")
}
return ds.KeyWithNamespaces(l[1:])
},
}
var pair = &kt.Pair{
Convert: func(k ds.Key) ds.Key {
return ds.NewKey("/abc").Child(k)
},
Invert: func(k ds.Key) ds.Key {
// remove abc prefix
l := k.List()
if l[0] != "abc" {
panic("key does not have prefix. convert failed?")
}
return ds.KeyWithNamespaces(l[1:])
},
}

func (ks *DSSuite) TestBasic(c *C) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair)

Expand Down Expand Up @@ -110,3 +112,15 @@ func strsToKeys(strs []string) []ds.Key {
}
return keys
}

func TestSuiteDefaultPair(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair)
dstest.SubtestAll(t, ktds)
}

func TestSuitePrefixTransform(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, kt.PrefixTransform{Prefix: ds.NewKey("/foo")})
dstest.SubtestAll(t, ktds)
}
49 changes: 49 additions & 0 deletions keytransform/transforms.go
@@ -0,0 +1,49 @@
package keytransform

import ds "github.com/ipfs/go-datastore"

// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}

var _ KeyTransform = (*Pair)(nil)

// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
type PrefixTransform struct {
Prefix ds.Key
}

// ConvertKey adds the prefix.
func (p PrefixTransform) ConvertKey(k ds.Key) ds.Key {
return p.Prefix.Child(k)
}

// InvertKey removes the prefix. panics if prefix not found.
func (p PrefixTransform) InvertKey(k ds.Key) ds.Key {
if p.Prefix.String() == "/" {
return k
}

if !p.Prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}

s := k.String()[len(p.Prefix.String()):]
return ds.RawKey(s)
}

var _ KeyTransform = (*PrefixTransform)(nil)