Skip to content

Commit

Permalink
embed keytransform.Datastore to support more methods
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Oct 19, 2023
1 parent fd1232c commit 9d4e4ce
Showing 1 changed file with 15 additions and 155 deletions.
170 changes: 15 additions & 155 deletions keytransform/txndatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ func WrapTxnDatastore(child ds.TxnDatastore, t KeyTransform) *TxnDatastore {
panic("child (ds.TxnDatastore) is nil")
}

return &TxnDatastore{child: child, KeyTransform: t}
return &TxnDatastore{ds: Wrap(child, t), child: child, KeyTransform: t}
}

// TxnDatastore keeps a KeyTransform function
type TxnDatastore struct {
ds *Datastore
child ds.TxnDatastore

KeyTransform
Expand All @@ -45,205 +46,64 @@ func (d *TxnDatastore) Children() []ds.Datastore {

// Put stores the given value, transforming the key first.
func (d *TxnDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) {
return d.child.Put(ctx, d.ConvertKey(key), value)
return d.ds.Put(ctx, d.ConvertKey(key), value)
}

// Sync implements Datastore.Sync
func (d *TxnDatastore) Sync(ctx context.Context, prefix ds.Key) error {
return d.child.Sync(ctx, d.ConvertKey(prefix))
return d.ds.Sync(ctx, d.ConvertKey(prefix))
}

// Get returns the value for given key, transforming the key first.
func (d *TxnDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
return d.child.Get(ctx, d.ConvertKey(key))
return d.ds.Get(ctx, d.ConvertKey(key))
}

// Has returns whether the datastore has a value for a given key, transforming
// the key first.
func (d *TxnDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
return d.child.Has(ctx, d.ConvertKey(key))
return d.ds.Has(ctx, d.ConvertKey(key))
}

// GetSize returns the size of the value named by the given key, transforming
// the key first.
func (d *TxnDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
return d.child.GetSize(ctx, d.ConvertKey(key))
return d.ds.GetSize(ctx, d.ConvertKey(key))
}

// Delete removes the value for given key
func (d *TxnDatastore) Delete(ctx context.Context, key ds.Key) (err error) {
return d.child.Delete(ctx, d.ConvertKey(key))
return d.ds.Delete(ctx, d.ConvertKey(key))
}

// Query implements Query, inverting keys on the way back out.
func (d *TxnDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
nq, cq := d.prepareQuery(q)

cqr, err := d.child.Query(ctx, cq)
if err != nil {
return nil, err
}

qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := cqr.NextSync()
if !ok {
return r, false
}
if r.Error == nil {
r.Entry.Key = d.InvertKey(ds.RawKey(r.Entry.Key)).String()
}
return r, true
},
Close: func() error {
return cqr.Close()
},
})
return dsq.NaiveQueryApply(nq, qr), nil
}

// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *TxnDatastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {

// First, put everything in the child query. Then, start taking things
// out.
child = q

// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()

// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}

// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// if the key transform preserves order, we can delegate
// to the child datastore.
if orderPreserving {
// When sorting, we compare with the first
// Order, then, if equal, we compare with the
// second Order, etc. However, keys are _unique_
// so we'll never apply any additional orders
// after ordering by key.
child.Orders = child.Orders[:i+1]
break orders
}
}

// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}

// Try to let the child handle the filters.

// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)

for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}

// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
return d.ds.Query(ctx, q)
}

func (d *TxnDatastore) Close() error {
return d.child.Close()
return d.ds.Close()
}

// DiskUsage implements the PersistentTxnDatastore interface.
func (d *TxnDatastore) DiskUsage(ctx context.Context) (uint64, error) {
return ds.DiskUsage(ctx, d.child)
return d.DiskUsage(ctx)
}

func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
}

childbatch, err := bds.Batch(ctx)
if err != nil {
return nil, err
}
return &transformBatch{
dst: childbatch,
f: d.ConvertKey,
}, nil
return d.ds.Batch(ctx)
}

func (d *TxnDatastore) Check(ctx context.Context) error {
if c, ok := d.child.(ds.CheckedDatastore); ok {
return c.Check(ctx)
}
return nil
return d.ds.Check(ctx)
}

func (d *TxnDatastore) Scrub(ctx context.Context) error {
if c, ok := d.child.(ds.ScrubbedDatastore); ok {
return c.Scrub(ctx)
}
return nil
return d.ds.Scrub(ctx)
}

func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
if c, ok := d.child.(ds.GCDatastore); ok {
return c.CollectGarbage(ctx)
}
return nil
return d.ds.CollectGarbage(ctx)
}

func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
Expand Down

0 comments on commit 9d4e4ce

Please sign in to comment.