From 5d626ecba688715108c70efb37d3de0a25a5c048 Mon Sep 17 00:00:00 2001 From: Roman Atachiants Date: Sat, 11 Sep 2021 02:39:49 +0400 Subject: [PATCH] Fixed point read latch --- collection.go | 14 ++++----- collection_test.go | 66 +++++++++++++++++++++++++++++++++++------ examples/bench/bench.go | 39 +++++++++++++----------- txn.go | 15 +++------- txn_test.go | 7 ++--- 5 files changed, 91 insertions(+), 50 deletions(-) diff --git a/collection.go b/collection.go index e31fdf1..e4d033e 100644 --- a/collection.go +++ b/collection.go @@ -141,18 +141,18 @@ func (c *Collection) UpdateAt(idx uint32, columnName string, fn func(v Cursor) e }) } -// SelectAt performs a selection on a specific row specified by its index. +// SelectAt performs a selection on a specific row specified by its index. It returns +// a boolean value indicating whether an element is present at the index or not. func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool { - c.lock.RLock() - contains := c.fill.Contains(idx) - c.lock.RUnlock() - - // If it's empty or over the sequence, not found - if idx >= uint32(len(c.fill))<<6 || !contains { + chunk := uint(idx >> chunkShift) + if idx >= uint32(len(c.fill))<<6 || !c.fill.Contains(idx) { return false } + // Lock the chunk which we are about to read and call the selector delegate + c.slock.RLock(chunk) fn(Selector{idx: idx, col: c}) + c.slock.RUnlock(chunk) return true } diff --git a/collection_test.go b/collection_test.go index afe9efd..4a5eabc 100644 --- a/collection_test.go +++ b/collection_test.go @@ -256,15 +256,15 @@ func runReplication(t *testing.T, updates, inserts, concurrency int) { return txn.Range("float64", func(v Cursor) { v1, v2 := v.FloatAt("float64"), v.IntAt("int32") if v1 != 0 { - clone, ok := txn.ReadAt(v.idx) - assert.True(t, ok) - assert.Equal(t, v.FloatAt("float64"), clone.FloatAt("float64")) + assert.True(t, txn.SelectAt(v.idx, func(s Selector) { + assert.Equal(t, v.FloatAt("float64"), s.FloatAt("float64")) + })) } if v2 != 0 { - clone, ok := txn.ReadAt(v.idx) - assert.True(t, ok) - assert.Equal(t, v.IntAt("int32"), clone.IntAt("int32")) + assert.True(t, txn.SelectAt(v.idx, func(s Selector) { + assert.Equal(t, v.IntAt("int32"), s.IntAt("int32")) + })) } }) }) @@ -341,9 +341,9 @@ func TestInsertObject(t *testing.T) { assert.Equal(t, 2, col.Count()) assert.NoError(t, col.Query(func(txn *Txn) error { - selector, ok := txn.ReadAt(0) - assert.True(t, ok) - assert.Equal(t, "A", selector.StringAt("name")) + assert.True(t, txn.SelectAt(0, func(v Selector) { + assert.Equal(t, "A", v.StringAt("name")) + })) return nil })) } @@ -491,6 +491,54 @@ func TestInsertParallel(t *testing.T) { })) } +func TestConcurrentPointReads(t *testing.T) { + obj := Object{ + "name": "Roman", + "age": 35, + "wallet": 50.99, + "health": 100, + "mana": 200, + } + + col := NewCollection() + col.CreateColumnsOf(obj) + for i := 0; i < 1000; i++ { + col.Insert(obj) + } + + var ops int64 + var wg sync.WaitGroup + wg.Add(2) + + // Reader + go func() { + for i := 0; i < 10000; i++ { + col.SelectAt(99, func(v Selector) { + _ = v.StringAt("name") + }) + atomic.AddInt64(&ops, 1) + runtime.Gosched() + } + wg.Done() + }() + + // Writer + go func() { + for i := 0; i < 10000; i++ { + col.UpdateAt(99, "name", func(v Cursor) error { + v.SetString("test") + return nil + }) + atomic.AddInt64(&ops, 1) + runtime.Gosched() + } + wg.Done() + }() + + wg.Wait() + assert.Equal(t, 20000, int(atomic.LoadInt64(&ops))) +} + // loadPlayers loads a list of players from the fixture func loadPlayers(amount int) *Collection { out := NewCollection(Options{ diff --git a/examples/bench/bench.go b/examples/bench/bench.go index 4b1e2f5..dfa3471 100644 --- a/examples/bench/bench.go +++ b/examples/bench/bench.go @@ -7,7 +7,7 @@ import ( "context" "encoding/json" "fmt" - "hash/crc32" + "math/bits" "os" "sync" "sync/atomic" @@ -31,13 +31,13 @@ func main() { createCollection(players, amount) // This runs point query benchmarks - runBenchmark("Point Reads/Writes", func(writeTxn bool) (reads int, writes int) { + runBenchmark("Point Reads/Writes", func(v uint32, writeTxn bool) (reads int, writes int) { // To avoid task granuarity problem, load up a bit more work on each // of the goroutines, a few hundred reads should be enough to amortize // the cost of scheduling goroutines, so we can actually test our code. for i := 0; i < 1000; i++ { - offset := randN(amount - 1) + offset := randN(v, amount-1) if writeTxn { players.UpdateAt(offset, "balance", func(v column.Cursor) error { v.SetFloat64(0) @@ -56,7 +56,7 @@ func main() { } // runBenchmark runs a benchmark -func runBenchmark(name string, fn func(bool) (int, int)) { +func runBenchmark(name string, fn func(uint32, bool) (int, int)) { fmt.Printf("Benchmarking %v ...\n", name) fmt.Printf("%7v\t%6v\t%17v\t%13v\n", "WORK", "PROCS", "READ RATE", "WRITE RATE") for _, workload := range []int{0, 10, 50, 90, 100} { @@ -69,12 +69,12 @@ func runBenchmark(name string, fn func(bool) (int, int)) { var reads, writes int64 var wg sync.WaitGroup start := time.Now() - for time.Since(start) < time.Second { + for i := uint32(0); time.Since(start) < time.Second; i++ { wg.Add(1) work <- async.NewTask(func(ctx context.Context) (interface{}, error) { defer wg.Done() - r, w := fn(chanceOf(workload)) + r, w := fn(i, chanceOf(i, workload)) atomic.AddInt64(&reads, int64(r)) atomic.AddInt64(&writes, int64(w)) return nil, nil @@ -145,21 +145,24 @@ func createCollection(out *column.Collection, amount int) *column.Collection { return out } -var epoch uint32 - // This random number generator not the most amazing one, but much better // than using math.Rand for our benchmarks, since it would create a lock // contention and bias the results. -func randN(n int) uint32 { - v := atomic.AddUint32(&epoch, 1) - return crc32.ChecksumIEEE([]byte{ - byte(v >> 24), - byte(v >> 16), - byte(v >> 8), - byte(v), - }) % uint32(n) +func randN(v uint32, n int) uint32 { + return uint32(xxhash(v) % uint64(n)) +} + +func chanceOf(v uint32, chance int) bool { + return randN(v, 100) < uint32(chance) } -func chanceOf(chance int) bool { - return randN(100) < uint32(chance) +func xxhash(v uint32) uint64 { + packed := uint64(v) + uint64(v)<<32 + x := packed ^ (0x1cad21f72c81017c ^ 0xdb979083e96dd4de) + x ^= bits.RotateLeft64(x, 49) ^ bits.RotateLeft64(x, 24) + x *= 0x9fb21c651e98df25 + x ^= (x >> 35) + 4 // len + x *= 0x9fb21c651e98df25 + x ^= (x >> 28) + return x } diff --git a/txn.go b/txn.go index dca63e2..25bff68 100644 --- a/txn.go +++ b/txn.go @@ -257,17 +257,10 @@ func (txn *Txn) UpdateAt(index uint32, columnName string, fn func(v Cursor) erro return fn(cursor) } -// ReadAt returns a selector for a specified index together with a boolean value that indicates -// whether an element is present at the specified index or not. -func (txn *Txn) ReadAt(index uint32) (Selector, bool) { - if !txn.index.Contains(index) { - return Selector{}, false - } - - return Selector{ - idx: index, - txn: txn, - }, true +// SelectAt performs a selection on a specific row specified by its index. It returns +// a boolean value indicating whether an element is present at the index or not. +func (txn *Txn) SelectAt(index uint32, fn func(v Selector)) bool { + return txn.owner.SelectAt(index, fn) } // DeleteAt attempts to delete an item at the specified index for this transaction. If the item diff --git a/txn_test.go b/txn_test.go index 4778a79..39bf60c 100644 --- a/txn_test.go +++ b/txn_test.go @@ -130,11 +130,8 @@ func TestIndexInvalid(t *testing.T) { })) players.Query(func(txn *Txn) error { - _, ok := txn.ReadAt(999999) - assert.False(t, ok) - - _, ok = txn.ReadAt(0) - assert.True(t, ok) + assert.False(t, txn.SelectAt(999999, func(v Selector) {})) + assert.True(t, txn.SelectAt(0, func(v Selector) {})) return nil })