Skip to content

Commit

Permalink
transaction commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Jun 15, 2021
1 parent 1799385 commit bd56e3f
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 353 deletions.
143 changes: 59 additions & 84 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@ type Object = map[string]interface{}

// Collection represents a collection of objects in a columnar format
type Collection struct {
lock sync.RWMutex // The collection lock
qlock sync.Mutex // The lock for updates & delete queues
fill bitmap.Bitmap // The fill-list
cols columns // The map of columns
updates map[string][]Update // The update queue
deletes bitmap.Bitmap // The delete queue
lock sync.RWMutex // The lock for fill list
sort uint64 // Optimization flag
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
}

// NewCollection creates a new columnar collection.
func NewCollection() *Collection {
return &Collection{
fill: make(bitmap.Bitmap, 0, 4),
cols: makeColumns(8),
updates: make(map[string][]Update, 8),
deletes: make(bitmap.Bitmap, 0, 4),
cols: makeColumns(8),
fill: make(bitmap.Bitmap, 0, 4),
}
}

Expand Down Expand Up @@ -67,9 +63,8 @@ func (c *Collection) Insert(obj Object) uint32 {
// UpdateAt updates a specific row/column combination and sets the value. It is also
// possible to update during the query, which is much more convenient to use.
func (c *Collection) UpdateAt(idx uint32, columnName string, value interface{}) {
if column, computed, ok := c.cols.LoadWithIndex(columnName); ok {
column.Update(idx, value)
for _, v := range computed {
if columns, ok := c.cols.LoadWithIndex(columnName); ok {
for _, v := range columns {
v.Update(idx, value)
}
}
Expand Down Expand Up @@ -165,75 +160,33 @@ func (c *Collection) DropIndex(indexName string) {
// columns in this collection. It also allows for individual rows to be modified or
// deleted during iteration (range), but the actual operations will be queued and
// executed after the iteration.
func (c *Collection) Query(fn func(txn Txn) error) error {
func (c *Collection) Query(fn func(txn *Txn) error) error {
c.lock.RLock()
r := aquireBitmap(&c.fill)
txn := aquireTxn(c)
c.lock.RUnlock()

// Execute the query and keep the error for later
err := fn(Txn{
owner: c,
index: r,
})
err := fn(txn)

// TODO: should we have txn.Commit() ?

// Now that the iteration has finished, we can range over the pending action
// queue and apply all of the actions that were requested by the cursor.
c.updatePending()
c.deletePending()
releaseBitmap(r)
return err
}
txn.Commit()

// updatePending updates the pending entries that were modified during the query
func (c *Collection) updatePending() {
c.qlock.Lock()
defer c.qlock.Unlock()
releaseTxn(txn)

// Process the pending updates column by column
for columnName, updates := range c.updates {
if len(updates) == 0 {
continue // No updates for this column
}

// Get the column that needs to be updated
column, computed, exists := c.cols.LoadWithIndex(columnName)
if !exists {
continue
}

// Range through all of the pending updates and apply them to the column
// and its associated computed columns.
column.UpdateMany(updates)
for _, v := range computed {
v.UpdateMany(updates)
}

// Reset the update queue but keep the key
c.updates[columnName] = c.updates[columnName][:0]
}
//c.optimize()
return err
}

// deletePending removes all of the entries marked as to be deleted
func (c *Collection) deletePending() {
c.qlock.Lock()
defer c.qlock.Unlock()
if len(c.deletes) == 0 {
return // Nothing to delete
/*
func (c *Collection) optimize() {
if v := atomic.AddUint64(&c.sort, 1); v%1000 == 0 {
c.cols.Optimize()
}

// Apply a batch delete on all of the columns
c.lock.Lock()
defer c.lock.Unlock()
c.cols.Range(func(column Column) {
column.DeleteMany(&c.deletes)
})

// Clear the items in the collection and reinitialize the purge list
c.fill.AndNot(c.deletes)
c.deletes.Clear()
}
*/

// Fetch retrieves an object by its handle and returns a selector for it.
func (c *Collection) Fetch(idx uint32) (Cursor, bool) {
Expand Down Expand Up @@ -270,24 +223,23 @@ func makeColumns(capacity int) columns {

// columnEntry represents a column entry in the registry.
type columnEntry struct {
name string // The column name
col Column // The column data
index []Column // The computed columns
name string // The column name
cols []Column // The columns and its computed
}

// Range iterates over columns in the registry.
func (c *columns) Range(fn func(column Column)) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
fn(v.col)
fn(v.cols[0])
}
}

// RangeName iterates over columns in the registry together with their names.
func (c *columns) RangeName(fn func(columnName string, column Column)) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
fn(v.name, v.col)
fn(v.name, v.cols[0])
}
}

Expand All @@ -296,21 +248,28 @@ func (c *columns) Load(columnName string) (Column, bool) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
if v.name == columnName {
return v.col, true
if col := v.cols[0]; col != nil {
col.Hits()
return col, true
}

return nil, false
//return v.cols[0], true
}
}
return nil, false
}

// LoadWithIndex loads a column by its name along with their computed indices.
func (c *columns) LoadWithIndex(columnName string) (Column, []Column, bool) {
func (c *columns) LoadWithIndex(columnName string) ([]Column, bool) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
if v.name == columnName {
return v.col, v.index, true
//v.cols[0].Hits()
return v.cols, true
}
}
return nil, nil, false
return nil, false
}

// Store stores a column into the registry.
Expand All @@ -325,21 +284,22 @@ func (c *columns) Store(columnName string, column Column, index ...Column) {

// If we found an existing entry, update it and we're done
if column != nil {
columns[i].col = column
columns[i].cols[0] = column
}
if index != nil {
columns[i].index = append(columns[i].index, index...)
columns[i].cols = append(columns[i].cols, index...)
}
c.cols.Store(columns)

return
}

// No entry found, create a new one
value := []Column{column}
value = append(value, index...)
columns = append(columns, columnEntry{
name: columnName,
col: column,
index: index,
name: columnName,
cols: value,
})
c.cols.Store(columns)
return
Expand Down Expand Up @@ -371,14 +331,29 @@ func (c *columns) DeleteIndex(columnName, indexName string) {
}

// If this is the target column, update its computed columns
filtered := make([]Column, 0, cap(columns[i].index))
for _, idx := range v.index {
filtered := make([]Column, 0, cap(columns[i].cols))
filtered = append(filtered, columns[i].cols[0])
for _, idx := range v.cols[1:] {
if idx != index {
filtered = append(filtered, idx)
}
}
columns[i].index = filtered
columns[i].cols = filtered
}

c.cols.Store(columns)
}

// Optimize sorts the columns for faster access
/*func (c *columns) Optimize() {
columns := c.cols.Load().([]columnEntry)
sort.Slice(columns, func(i, j int) bool {
return columns[i].cols[0].Hits() > columns[j].cols[0].Hits()
})
for _, c := range columns {
c.cols[0].HitsReset()
}
c.cols.Store(columns)
}
*/
46 changes: 27 additions & 19 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"github.com/stretchr/testify/assert"
)

// BenchmarkCollection/insert-8 30649539 46.93 ns/op 1 B/op 0 allocs/op
// BenchmarkCollection/fetch-8 29002880 36.59 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/count-slow-8 105439 11065 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/count-8 7963664 142.3 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/range-8 1539128 791.5 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/select-8 2198622 541.3 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/select-many-8 2053243 577.9 ns/op 32 B/op 1 allocs/op
// BenchmarkCollection/update-at-8 23730843 47.92 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/update-all-8 92749 12609 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/delete-at-8 2281185 527.3 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/delete-all-8 168591 7405 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/insert-8 28124318 42.16 ns/op 1 B/op 0 allocs/op
// BenchmarkCollection/fetch-8 25258159 48.32 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/count-slow-8 106300 10820 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/count-8 9076928 132.0 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/range-8 1000000 1046 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/select-8 2005491 603.2 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/select-many-8 2006061 597.5 ns/op 48 B/op 1 allocs/op
// BenchmarkCollection/update-at-8 29553374 41.97 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/update-all-8 146821 8500 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/delete-at-8 2629766 448.8 ns/op 0 B/op 0 allocs/op
// BenchmarkCollection/delete-all-8 297520 3497 ns/op 0 B/op 0 allocs/op
func BenchmarkCollection(b *testing.B) {
players := loadPlayers()
obj := Object{
Expand Down Expand Up @@ -61,7 +61,7 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
players.Query(func(txn *Txn) error {
txn.WithString("race", func(v string) bool {
return v == "human"
}).WithString("class", func(v string) bool {
Expand All @@ -78,7 +78,7 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
players.Query(func(txn *Txn) error {
txn.With("human", "mage", "old").Count()
return nil
})
Expand All @@ -90,7 +90,7 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
players.Query(func(txn *Txn) error {
txn.With("human", "mage", "old").Range(func(v Cursor) bool {
count++
name = v.StringOf("name")
Expand All @@ -107,7 +107,7 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
players.Query(func(txn *Txn) error {
txn.With("human", "mage", "old").Select(func(v Selector) bool {
count++
name = v.String()
Expand All @@ -124,7 +124,7 @@ func BenchmarkCollection(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
players.Query(func(txn *Txn) error {
txn.With("human", "mage", "old").SelectMany(func(v []Selector) bool {
count++
name = v[0].String()
Expand All @@ -145,12 +145,20 @@ func BenchmarkCollection(b *testing.B) {
})

b.Run("update-all", func(b *testing.B) {
var columns []string
players.cols.RangeName(func(columnName string, c Column) {
if _, ok := c.(numerical); ok {
columns = append(columns, columnName)
}
})

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
players.Query(func(txn Txn) error {
columnName := columns[n%len(columns)]
players.Query(func(txn *Txn) error {
txn.Range(func(v Cursor) bool {
v.Update("balance", 1.0)
v.Update(columnName, 1.0)
return true
})
return nil
Expand All @@ -175,7 +183,7 @@ func BenchmarkCollection(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
fill.Clone(&c.fill) // Restore
c.Query(func(txn Txn) error {
c.Query(func(txn *Txn) error {
txn.Range(func(v Cursor) bool {
v.Delete()
return true
Expand Down

0 comments on commit bd56e3f

Please sign in to comment.