Skip to content

Commit

Permalink
kvdb+etcd: assert on bucket/value key when putting value/bucket
Browse files Browse the repository at this point in the history
This commit extends compatibility with the bbolt kvdb implementation,
which returns ErrIncompatibleValue in case of a bucket/value key
collision. Furthermore the commit also adds an extra precondition to the
transaction when a key doesn't exist. This is needed as we fix reads to
a snapshot revision and other writers may commit the key otherwise.
  • Loading branch information
bhandras committed Jul 21, 2020
1 parent ac688d9 commit 9c62a42
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 22 deletions.
49 changes: 40 additions & 9 deletions channeldb/kvdb/etcd/readwrite_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ func (b *readWriteBucket) NestedReadWriteBucket(key []byte) walletdb.ReadWriteBu
return newReadWriteBucket(b.tx, bucketKey, bucketVal)
}

// assertNoValue checks if the value for the passed key exits.
func (b *readWriteBucket) assertNoValue(key []byte) error {
val, err := b.tx.stm.Get(string(makeValueKey(b.id, key)))
if err != nil {
return err
}

if val != nil {
return walletdb.ErrIncompatibleValue
}

return nil
}

// CreateBucket creates and returns a new nested bucket with the given
// key. Returns ErrBucketExists if the bucket already exists,
// ErrBucketNameRequired if the key is empty, or ErrIncompatibleValue
Expand All @@ -151,11 +165,15 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
return nil, walletdb.ErrBucketExists
}

if err := b.assertNoValue(key); err != nil {
return nil, err
}

// Create a deterministic bucket id from the bucket key.
newID := makeBucketID(bucketKey)

// Create the bucket.
b.tx.put(string(bucketKey), string(newID[:]))
b.tx.stm.Put(string(bucketKey), string(newID[:]))

return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
}
Expand All @@ -181,8 +199,12 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) (
}

if !isValidBucketID(bucketVal) {
if err := b.assertNoValue(key); err != nil {
return nil, err
}

newID := makeBucketID(bucketKey)
b.tx.put(string(bucketKey), string(newID[:]))
b.tx.stm.Put(string(bucketKey), string(newID[:]))

return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
}
Expand Down Expand Up @@ -230,7 +252,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
}

for kv != nil {
b.tx.del(kv.key)
b.tx.stm.Del(kv.key)

if isBucketKey(kv.key) {
queue = append(queue, []byte(kv.val))
Expand All @@ -243,12 +265,12 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
}

// Finally delete the sequence key for the bucket.
b.tx.del(string(makeSequenceKey(id)))
b.tx.stm.Del(string(makeSequenceKey(id)))
}

// Delete the top level bucket and sequence key.
b.tx.del(bucketKey)
b.tx.del(string(makeSequenceKey(bucketVal)))
b.tx.stm.Del(bucketKey)
b.tx.stm.Del(string(makeSequenceKey(bucketVal)))

return nil
}
Expand All @@ -260,8 +282,17 @@ func (b *readWriteBucket) Put(key, value []byte) error {
return walletdb.ErrKeyRequired
}

val, err := b.tx.stm.Get(string(makeBucketKey(b.id, key)))
if err != nil {
return err
}

if val != nil {
return walletdb.ErrIncompatibleValue
}

// Update the transaction with the new value.
b.tx.put(string(makeValueKey(b.id, key)), string(value))
b.tx.stm.Put(string(makeValueKey(b.id, key)), string(value))

return nil
}
Expand All @@ -274,7 +305,7 @@ func (b *readWriteBucket) Delete(key []byte) error {
}

// Update the transaction to delete the key/value.
b.tx.del(string(makeValueKey(b.id, key)))
b.tx.stm.Del(string(makeValueKey(b.id, key)))

return nil
}
Expand Down Expand Up @@ -304,7 +335,7 @@ func (b *readWriteBucket) SetSequence(v uint64) error {
val := strconv.FormatUint(v, 10)

// Update the transaction with the new value for the sequence key.
b.tx.put(string(makeSequenceKey(b.id)), val)
b.tx.stm.Put(string(makeSequenceKey(b.id)), val)

return nil
}
Expand Down
57 changes: 57 additions & 0 deletions channeldb/kvdb/etcd/readwrite_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,60 @@ func TestBucketSequence(t *testing.T) {

require.Nil(t, err)
}

func TestKeyClash(t *testing.T) {
t.Parallel()

f := NewEtcdTestFixture(t)
defer f.Cleanup()

db, err := newEtcdBackend(f.BackendConfig())
require.NoError(t, err)

err = db.Update(func(tx walletdb.ReadWriteTx) error {
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
require.Nil(t, err)
require.NotNil(t, apple)

require.NoError(t, apple.Put([]byte("key"), []byte("val")))

banana, err := apple.CreateBucket([]byte("banana"))
require.Nil(t, err)
require.NotNil(t, banana)

return nil
})

require.Nil(t, err)

err = db.Update(func(tx walletdb.ReadWriteTx) error {
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
require.Nil(t, err)
require.NotNil(t, apple)

require.Error(t,
walletdb.ErrIncompatibleValue,
apple.Put([]byte("banana"), []byte("val")),
)

b, err := apple.CreateBucket([]byte("key"))
require.Nil(t, b)
require.Error(t, walletdb.ErrIncompatibleValue, b)

b, err = apple.CreateBucketIfNotExists([]byte("key"))
require.Nil(t, b)
require.Error(t, walletdb.ErrIncompatibleValue, b)

return nil
})

require.Nil(t, err)

expected := map[string]string{
bkey("apple"): bval("apple"),
bkey("apple", "banana"): bval("apple", "banana"),
vkey("key", "apple"): "val",
}
require.Equal(t, expected, f.Dump())

}
10 changes: 0 additions & 10 deletions channeldb/kvdb/etcd/readwrite_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ func rootBucket(tx *readWriteTx) *readWriteBucket {
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
}

// put updates the passed key/value.
func (tx *readWriteTx) put(key, val string) {
tx.stm.Put(key, val)
}

// del marks the passed key deleted.
func (tx *readWriteTx) del(key string) {
tx.stm.Del(key)
}

// ReadBucket opens the root bucket for read only access. If the bucket
// described by the key does not exist, nil is returned.
func (tx *readWriteTx) ReadBucket(key []byte) walletdb.ReadBucket {
Expand Down
22 changes: 19 additions & 3 deletions channeldb/kvdb/etcd/stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,23 @@ func (s *stm) Get(key string) ([]byte, error) {
// the prefetch set.
if getValue, ok := s.prefetch[key]; ok {
delete(s.prefetch, key)
s.rset[key] = getValue

// Use the prefetched value only if it is for
// an existing key.
if getValue.rev != 0 {
s.rset[key] = getValue
}
}

// Return value if alread in read set.
if getVal, ok := s.rset[key]; ok {
return []byte(getVal.val), nil
if getValue, ok := s.rset[key]; ok {
// Return the value if the rset contains
// an existing key.
if getValue.rev != 0 {
return []byte(getValue.val), nil
} else {
return nil, nil
}
}

// Fetch and return value.
Expand All @@ -419,6 +430,11 @@ func (s *stm) Get(key string) ([]byte, error) {

if len(kvs) > 0 {
return []byte(kvs[0].val), nil
} else {
// Add assertion to the read set.
s.rset[key] = stmGet{
rev: 0,
}
}

// Return empty result if key not in DB.
Expand Down

0 comments on commit 9c62a42

Please sign in to comment.