Skip to content

Commit

Permalink
SortedIndex baseline implementation (#75)
Browse files Browse the repository at this point in the history
This PR introduces a new Sorted Index that keeps an actively sorted
b-tree (github.com/tidwall/btree) for a column of the user's choosing
(currently limited to string-type only). The index holds one b-tree that
is not copied between transactions (mutexed).

Future work would consider other type columns being sorted (currently
only string columns), PK sorting, and custom `Less()` functionality for
users.
  • Loading branch information
Dreeseaw committed Dec 17, 2022
1 parent 6cf7de3 commit 3e795a1
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 6 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ players.Query(func(txn *column.Txn) error {
})
```

## Sorted Indexes

Along with bitmap indexing, collections support consistently sorted indexes. These indexes are transient, and must be recreated when a collection is loading a snapshot.

In the example below, we create a SortedIndex object and use it to sort filtered records in a transaction.

```go
// Create the sorted index "sortedNames" in advance
out.CreateSortIndex("richest", "balance")

// This filters the transaction with the `rouge` index before
// ranging through the remaining balances in ascending order
players.Query(func(txn *column.Txn) error {
name := txn.String("name")
balance := txn.Float64("balance")

txn.With("rogue").Ascend("richest", func (i uint32) {
// save or do something with sorted record
curName, _ := name.Get()
balance.Set(newBalance(curName))
})
return nil
})
```

## Updating Values

In order to update certain items in the collection, you can simply call `Range()` method and use column accessor's `Set()` or `Add()` methods to update a value of a certain column atomically. The updates won't be instantly reflected given that our store supports transactions. Only when transaction is commited, then the update will be applied to the collection, allowing for isolation and rollbacks.
Expand Down
40 changes: 40 additions & 0 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,46 @@ func (c *Collection) CreateIndex(indexName, columnName string, fn func(r Reader)
return nil
}

func (c *Collection) CreateSortIndex(indexName, columnName string) error {
if columnName == "" || indexName == "" {
return fmt.Errorf("column: create index must specify name & column")
}

// Prior to creating an index, we should have a column
column, ok := c.cols.Load(columnName)
if !ok {
return fmt.Errorf("column: unable to create index, column '%v' does not exist", columnName)
}

// Check to make sure index does not already exist
_, ok = c.cols.Load(indexName)
if ok {
return fmt.Errorf("column: unable to create index, index '%v' already exist", indexName)
}

// Create and add the index column,
index := newSortIndex(indexName, columnName)
c.lock.Lock()
// index.Grow(uint32(c.opts.Capacity))
c.cols.Store(indexName, index)
c.cols.Store(columnName, column, index)
c.lock.Unlock()

// Iterate over all of the values of the target column, chunk by chunk and fill
// the index accordingly.
chunks := c.chunks()
buffer := commit.NewBuffer(c.Count())
reader := commit.NewReader()
for chunk := commit.Chunk(0); int(chunk) < chunks; chunk++ {
if column.Snapshot(chunk, buffer) {
reader.Seek(buffer)
index.Apply(chunk, reader)
}
}

return nil
}

// DropIndex removes the index column with the specified name. If the index with this
// name does not exist, this operation is a no-op.
func (c *Collection) DropIndex(indexName string) error {
Expand Down
101 changes: 96 additions & 5 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package column

import (
"strings"
"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"

"github.com/tidwall/btree"
)

// --------------------------- Reader ---------------------------
Expand Down Expand Up @@ -119,16 +122,16 @@ func newTrigger(indexName, columnName string, callback func(r Reader)) *column {
})
}

// Grow grows the size of the column until we have enough to store
func (c *columnTrigger) Grow(idx uint32) {
// Noop
}

// Column returns the target name of the column on which this index should apply.
func (c *columnTrigger) Column() string {
return c.name
}

// Grow grows the size of the column until we have enough to store
func (c *columnTrigger) Grow(idx uint32) {
// Noop
}

// Apply applies a set of operations to the column.
func (c *columnTrigger) Apply(chunk commit.Chunk, r *commit.Reader) {
for r.Next() {
Expand Down Expand Up @@ -157,3 +160,91 @@ func (c *columnTrigger) Index(chunk commit.Chunk) bitmap.Bitmap {
func (c *columnTrigger) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
// Noop
}

// ----------------------- Sorted Index --------------------------

type SortIndexItem struct {
Key string
Value uint32
}

// columnSortIndex implements a constantly sorted column via BTree
type columnSortIndex struct {
btree *btree.BTreeG[SortIndexItem] // 1 constantly sorted data structure
backMap map[uint32]string // for constant key lookups
name string // The name of the target column
}

// newSortIndex creates a new bitmap index column.
func newSortIndex(indexName, columnName string) *column {
byKeys := func (a, b SortIndexItem) bool {
return a.Key < b.Key
}
return columnFor(indexName, &columnSortIndex{
btree: btree.NewBTreeG[SortIndexItem](byKeys),
backMap: make(map[uint32]string),
name: columnName,
})
}

// Grow grows the size of the column until we have enough to store
func (c *columnSortIndex) Grow(idx uint32) {
return
}

// Column returns the target name of the column on which this index should apply.
func (c *columnSortIndex) Column() string {
return c.name
}


// Apply applies a set of operations to the column.
func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) {

// Index can only be updated based on the final stored value, so we can only work
// with put, merge, & delete operations here.
for r.Next() {
switch r.Type {
case commit.Put:
if delKey, exists := c.backMap[r.Index()]; exists {
c.btree.Delete(SortIndexItem{
Key: delKey,
Value: r.Index(),
})
}
upsertKey := strings.Clone(r.String()) // alloc required
c.backMap[r.Index()] = upsertKey
c.btree.Set(SortIndexItem{
Key: upsertKey,
Value: r.Index(),
})
case commit.Delete:
delKey, _ := c.backMap[r.Index()]
c.btree.Delete(SortIndexItem{
Key: delKey,
Value: r.Index(),
})
}
}
}


// Value retrieves a value at a specified index.
func (c *columnSortIndex) Value(idx uint32) (v interface{}, ok bool) {
return nil, false
}

// Contains checks whether the column has a value at a specified index.
func (c *columnSortIndex) Contains(idx uint32) bool {
return false
}

// Index returns the fill list for the column
func (c *columnSortIndex) Index(chunk commit.Chunk) bitmap.Bitmap {
return nil
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnSortIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
// No-op
}
2 changes: 1 addition & 1 deletion commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reader) Rewind() {
r.Offset = r.start
}

// Use sets the buffer and resets the reader.
// use sets the buffer and resets the reader.
func (r *Reader) use(buffer []byte) {
r.buffer = buffer
r.headString = 0
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kelindar/column
go 1.19

require (
github.com/imdario/mergo v0.3.13
github.com/kelindar/bitmap v1.4.1
github.com/kelindar/intmap v1.1.0
github.com/kelindar/iostream v1.3.0
Expand All @@ -13,6 +14,8 @@ require (
github.com/zeebo/xxh3 v1.0.2
)

require github.com/tidwall/btree v1.6.0 // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/kelindar/async v1.0.0 h1:oJiFAt3fVB/b5zVZKPBU+pP9lR3JVyeox9pYlpdnIK8=
github.com/kelindar/async v1.0.0/go.mod h1:bJRlwaRiqdHi+4dpVDNHdwgyRyk6TxpA21fByLf7hIY=
github.com/kelindar/bitmap v1.4.1 h1:Ih0BWMYXkkZxPMU536DsQKRhdvqFl7tuNjImfLJWC6E=
Expand Down Expand Up @@ -31,6 +32,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w=
github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg=
github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
Expand All @@ -43,5 +48,6 @@ golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
29 changes: 29 additions & 0 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,35 @@ func (txn *Txn) Range(fn func(idx uint32)) error {
return nil
}

// Ascend through a given SortedIndex and returns each offset
// remaining in the transaction's index
func (txn *Txn) Ascend(sortIndexName string, fn func(idx uint32)) error {
txn.initialize()
txn.owner.lock.RLock() // protect against writes on btree
defer txn.owner.lock.RUnlock()
// lock := txn.owner.slock

sortIndex, ok := txn.owner.cols.Load(sortIndexName)
if !ok {
return fmt.Errorf("column: no sorted index named '%v'", sortIndexName)
}

// For each btree key, check if the offset is still in
// the txn's index & return if true
sortIndexCol, _ := sortIndex.Column.(*columnSortIndex)
sortIndexCol.btree.Scan(func (item SortIndexItem) bool {
if txn.index.Contains(item.Value) {
// chunk := commit.ChunkAt(item.Value)
// lock.RLock(uint(chunk))
txn.cursor = item.Value
fn(item.Value)
// lock.RUnlock(uint(chunk))
}
return true
})
return nil
}

// DeleteAll marks all of the items currently selected by this transaction for deletion. The
// actual delete will take place once the transaction is committed.
func (txn *Txn) DeleteAll() {
Expand Down

0 comments on commit 3e795a1

Please sign in to comment.