Skip to content

Commit

Permalink
Fixed point read latch
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Sep 10, 2021
1 parent 6389779 commit 5d626ec
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 50 deletions.
14 changes: 7 additions & 7 deletions collection.go
Expand Up @@ -141,18 +141,18 @@ func (c *Collection) UpdateAt(idx uint32, columnName string, fn func(v Cursor) e
})
}

// SelectAt performs a selection on a specific row specified by its index.
// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool {
c.lock.RLock()
contains := c.fill.Contains(idx)
c.lock.RUnlock()

// If it's empty or over the sequence, not found
if idx >= uint32(len(c.fill))<<6 || !contains {
chunk := uint(idx >> chunkShift)
if idx >= uint32(len(c.fill))<<6 || !c.fill.Contains(idx) {
return false
}

// Lock the chunk which we are about to read and call the selector delegate
c.slock.RLock(chunk)
fn(Selector{idx: idx, col: c})
c.slock.RUnlock(chunk)
return true
}

Expand Down
66 changes: 57 additions & 9 deletions collection_test.go
Expand Up @@ -256,15 +256,15 @@ func runReplication(t *testing.T, updates, inserts, concurrency int) {
return txn.Range("float64", func(v Cursor) {
v1, v2 := v.FloatAt("float64"), v.IntAt("int32")
if v1 != 0 {
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.FloatAt("float64"), clone.FloatAt("float64"))
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.FloatAt("float64"), s.FloatAt("float64"))
}))
}

if v2 != 0 {
clone, ok := txn.ReadAt(v.idx)
assert.True(t, ok)
assert.Equal(t, v.IntAt("int32"), clone.IntAt("int32"))
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.IntAt("int32"), s.IntAt("int32"))
}))
}
})
})
Expand Down Expand Up @@ -341,9 +341,9 @@ func TestInsertObject(t *testing.T) {

assert.Equal(t, 2, col.Count())
assert.NoError(t, col.Query(func(txn *Txn) error {
selector, ok := txn.ReadAt(0)
assert.True(t, ok)
assert.Equal(t, "A", selector.StringAt("name"))
assert.True(t, txn.SelectAt(0, func(v Selector) {
assert.Equal(t, "A", v.StringAt("name"))
}))
return nil
}))
}
Expand Down Expand Up @@ -491,6 +491,54 @@ func TestInsertParallel(t *testing.T) {
}))
}

func TestConcurrentPointReads(t *testing.T) {
obj := Object{
"name": "Roman",
"age": 35,
"wallet": 50.99,
"health": 100,
"mana": 200,
}

col := NewCollection()
col.CreateColumnsOf(obj)
for i := 0; i < 1000; i++ {
col.Insert(obj)
}

var ops int64
var wg sync.WaitGroup
wg.Add(2)

// Reader
go func() {
for i := 0; i < 10000; i++ {
col.SelectAt(99, func(v Selector) {
_ = v.StringAt("name")
})
atomic.AddInt64(&ops, 1)
runtime.Gosched()
}
wg.Done()
}()

// Writer
go func() {
for i := 0; i < 10000; i++ {
col.UpdateAt(99, "name", func(v Cursor) error {
v.SetString("test")
return nil
})
atomic.AddInt64(&ops, 1)
runtime.Gosched()
}
wg.Done()
}()

wg.Wait()
assert.Equal(t, 20000, int(atomic.LoadInt64(&ops)))
}

// loadPlayers loads a list of players from the fixture
func loadPlayers(amount int) *Collection {
out := NewCollection(Options{
Expand Down
39 changes: 21 additions & 18 deletions examples/bench/bench.go
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"hash/crc32"
"math/bits"
"os"
"sync"
"sync/atomic"
Expand All @@ -31,13 +31,13 @@ func main() {
createCollection(players, amount)

// This runs point query benchmarks
runBenchmark("Point Reads/Writes", func(writeTxn bool) (reads int, writes int) {
runBenchmark("Point Reads/Writes", func(v uint32, writeTxn bool) (reads int, writes int) {

// To avoid task granuarity problem, load up a bit more work on each
// of the goroutines, a few hundred reads should be enough to amortize
// the cost of scheduling goroutines, so we can actually test our code.
for i := 0; i < 1000; i++ {
offset := randN(amount - 1)
offset := randN(v, amount-1)
if writeTxn {
players.UpdateAt(offset, "balance", func(v column.Cursor) error {
v.SetFloat64(0)
Expand All @@ -56,7 +56,7 @@ func main() {
}

// runBenchmark runs a benchmark
func runBenchmark(name string, fn func(bool) (int, int)) {
func runBenchmark(name string, fn func(uint32, bool) (int, int)) {
fmt.Printf("Benchmarking %v ...\n", name)
fmt.Printf("%7v\t%6v\t%17v\t%13v\n", "WORK", "PROCS", "READ RATE", "WRITE RATE")
for _, workload := range []int{0, 10, 50, 90, 100} {
Expand All @@ -69,12 +69,12 @@ func runBenchmark(name string, fn func(bool) (int, int)) {
var reads, writes int64
var wg sync.WaitGroup
start := time.Now()
for time.Since(start) < time.Second {
for i := uint32(0); time.Since(start) < time.Second; i++ {
wg.Add(1)
work <- async.NewTask(func(ctx context.Context) (interface{}, error) {
defer wg.Done()

r, w := fn(chanceOf(workload))
r, w := fn(i, chanceOf(i, workload))
atomic.AddInt64(&reads, int64(r))
atomic.AddInt64(&writes, int64(w))
return nil, nil
Expand Down Expand Up @@ -145,21 +145,24 @@ func createCollection(out *column.Collection, amount int) *column.Collection {
return out
}

var epoch uint32

// This random number generator not the most amazing one, but much better
// than using math.Rand for our benchmarks, since it would create a lock
// contention and bias the results.
func randN(n int) uint32 {
v := atomic.AddUint32(&epoch, 1)
return crc32.ChecksumIEEE([]byte{
byte(v >> 24),
byte(v >> 16),
byte(v >> 8),
byte(v),
}) % uint32(n)
func randN(v uint32, n int) uint32 {
return uint32(xxhash(v) % uint64(n))
}

func chanceOf(v uint32, chance int) bool {
return randN(v, 100) < uint32(chance)
}

func chanceOf(chance int) bool {
return randN(100) < uint32(chance)
func xxhash(v uint32) uint64 {
packed := uint64(v) + uint64(v)<<32
x := packed ^ (0x1cad21f72c81017c ^ 0xdb979083e96dd4de)
x ^= bits.RotateLeft64(x, 49) ^ bits.RotateLeft64(x, 24)
x *= 0x9fb21c651e98df25
x ^= (x >> 35) + 4 // len
x *= 0x9fb21c651e98df25
x ^= (x >> 28)
return x
}
15 changes: 4 additions & 11 deletions txn.go
Expand Up @@ -257,17 +257,10 @@ func (txn *Txn) UpdateAt(index uint32, columnName string, fn func(v Cursor) erro
return fn(cursor)
}

// ReadAt returns a selector for a specified index together with a boolean value that indicates
// whether an element is present at the specified index or not.
func (txn *Txn) ReadAt(index uint32) (Selector, bool) {
if !txn.index.Contains(index) {
return Selector{}, false
}

return Selector{
idx: index,
txn: txn,
}, true
// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (txn *Txn) SelectAt(index uint32, fn func(v Selector)) bool {
return txn.owner.SelectAt(index, fn)
}

// DeleteAt attempts to delete an item at the specified index for this transaction. If the item
Expand Down
7 changes: 2 additions & 5 deletions txn_test.go
Expand Up @@ -130,11 +130,8 @@ func TestIndexInvalid(t *testing.T) {
}))

players.Query(func(txn *Txn) error {
_, ok := txn.ReadAt(999999)
assert.False(t, ok)

_, ok = txn.ReadAt(0)
assert.True(t, ok)
assert.False(t, txn.SelectAt(999999, func(v Selector) {}))
assert.True(t, txn.SelectAt(0, func(v Selector) {}))
return nil
})

Expand Down

0 comments on commit 5d626ec

Please sign in to comment.