Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SortedIndex baseline implementation #75

Merged
merged 11 commits into from
Dec 17, 2022
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ players.Query(func(txn *column.Txn) error {
})
```

## Sorted Indexes

Along with bitmap indexing, collections support consistently sorted indexes. These indexes are transient, and must be recreated when a collection is loading a snapshot.

In the example below, we create a SortedIndex object and use it to sort filtered records in a transaction.

```go
// Create the sorted index "sortedNames" in advance
out.CreateSortIndex("richest", "balance")

// This filters the transaction with the `rouge` index before
// ranging through the remaining balances in ascending order
players.Query(func(txn *column.Txn) error {
name := txn.String("name")
balance := txn.Float64("balance")

txn.With("rogue").SortedRange("richest", func (i uint32) {
// save or do something with sorted record
curName, _ := name.Get()
balance.Set(newBalance(curName))
})
return nil
})
```

## Updating Values

In order to update certain items in the collection, you can simply call `Range()` method and use column accessor's `Set()` or `Add()` methods to update a value of a certain column atomically. The updates won't be instantly reflected given that our store supports transactions. Only when transaction is commited, then the update will be applied to the collection, allowing for isolation and rollbacks.
Expand Down
52 changes: 52 additions & 0 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,46 @@ func (c *Collection) CreateIndex(indexName, columnName string, fn func(r Reader)
return nil
}

func (c *Collection) CreateSortIndex(indexName, columnName string) error {
if columnName == "" || indexName == "" {
return fmt.Errorf("column: create index must specify name & column")
}

// Prior to creating an index, we should have a column
column, ok := c.cols.Load(columnName)
if !ok {
return fmt.Errorf("column: unable to create index, column '%v' does not exist", columnName)
}

// Check to make sure index does not already exist
_, ok = c.cols.Load(indexName)
if ok {
return fmt.Errorf("column: unable to create index, index '%v' already exist", indexName)
}

// Create and add the index column,
index := newSortIndex(indexName, columnName)
c.lock.Lock()
// index.Grow(uint32(c.opts.Capacity))
c.cols.Store(indexName, index)
c.cols.Store(columnName, column, index)
c.lock.Unlock()

// Iterate over all of the values of the target column, chunk by chunk and fill
// the index accordingly.
chunks := c.chunks()
buffer := commit.NewBuffer(c.Count())
reader := commit.NewReader()
for chunk := commit.Chunk(0); int(chunk) < chunks; chunk++ {
if column.Snapshot(chunk, buffer) {
reader.Seek(buffer)
index.Apply(chunk, reader)
}
}

return nil
}

// DropIndex removes the index column with the specified name. If the index with this
// name does not exist, this operation is a no-op.
func (c *Collection) DropIndex(indexName string) error {
Expand Down Expand Up @@ -435,6 +475,18 @@ func (c *columns) LoadWithIndex(columnName string) ([]*column, bool) {
return nil, false
}

// LoadIndex loads an index column by its name.
func (c *columns) LoadIndex(indexName string) (*column, bool) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
if v.name == indexName {
col := v.cols[0]
return col, col != nil
}
}
return nil, false
}

// Store stores a column into the registry.
func (c *columns) Store(columnName string, main *column, index ...*column) {

Expand Down
114 changes: 109 additions & 5 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package column

import (
"strings"
"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"

"github.com/tidwall/btree"
)

// --------------------------- Reader ---------------------------
Expand Down Expand Up @@ -119,16 +122,16 @@ func newTrigger(indexName, columnName string, callback func(r Reader)) *column {
})
}

// Grow grows the size of the column until we have enough to store
func (c *columnTrigger) Grow(idx uint32) {
// Noop
}

// Column returns the target name of the column on which this index should apply.
func (c *columnTrigger) Column() string {
return c.name
}

// Grow grows the size of the column until we have enough to store
func (c *columnTrigger) Grow(idx uint32) {
// Noop
}

// Apply applies a set of operations to the column.
func (c *columnTrigger) Apply(chunk commit.Chunk, r *commit.Reader) {
for r.Next() {
Expand Down Expand Up @@ -157,3 +160,104 @@ func (c *columnTrigger) Index(chunk commit.Chunk) bitmap.Bitmap {
func (c *columnTrigger) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
// Noop
}

// ----------------------- Sorted Index --------------------------

// SortIndexItem represents an offset sorted in a generic BTree
type SortIndexItem struct {
Dreeseaw marked this conversation as resolved.
Show resolved Hide resolved
Key string
Value uint32
}

// columnSortIndex implements a constantly sorted column via BTree
type columnSortIndex struct {
btree *btree.BTreeG[SortIndexItem] // 1 constantly sorted data structure
backMap map[uint32]string // for constant key lookups
name string // The name of the target column
}

// newSortIndex creates a new bitmap index column.
func newSortIndex(indexName, columnName string) *column {
byKeys := func (a, b SortIndexItem) bool {
return a.Key < b.Key
}
return columnFor(indexName, &columnSortIndex{
btree: btree.NewBTreeG[SortIndexItem](byKeys),
backMap: make(map[uint32]string),
name: columnName,
})
}

// Grow grows the size of the column until we have enough to store
func (c *columnSortIndex) Grow(idx uint32) {
return
}

// Column returns the target name of the column on which this index should apply.
func (c *columnSortIndex) Column() string {
return c.name
}


// Apply applies a set of operations to the column.
func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) {

// Index can only be updated based on the final stored value, so we can only work
// with put, merge, & delete operations here.
for r.Next() {
switch r.Type {
case commit.Put:
/*delItem := SortIndexItem{"", 0}
c.btree.Scan(func (item SortIndexItem) bool {
if item.Value == r.Index() {
delItem.Key = item.Key
delItem.Value = item.Value
return false
}
return true
})
if delItem.Key != "" {
c.btree.Delete(delItem)
}*/
if delKey, exists := c.backMap[r.Index()]; exists {
c.btree.Delete(SortIndexItem{
Key: delKey,
Value: r.Index(),
})
}
upsertKey := strings.Clone(r.String()) // alloc required
c.backMap[r.Index()] = upsertKey
c.btree.Set(SortIndexItem{
Key: upsertKey,
Value: r.Index(),
})
case commit.Delete:
delKey, _ := c.backMap[r.Index()]
c.btree.Delete(SortIndexItem{
Key: delKey,
Value: r.Index(),
})
}
}
}


// Value retrieves a value at a specified index.
func (c *columnSortIndex) Value(idx uint32) (v interface{}, ok bool) {
return nil, false
}

// Contains checks whether the column has a value at a specified index.
func (c *columnSortIndex) Contains(idx uint32) bool {
return false
}

// Index returns the fill list for the column
func (c *columnSortIndex) Index(chunk commit.Chunk) bitmap.Bitmap {
return nil
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnSortIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
// No op
}
2 changes: 1 addition & 1 deletion commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reader) Rewind() {
r.Offset = r.start
}

// Use sets the buffer and resets the reader.
// use sets the buffer and resets the reader.
func (r *Reader) use(buffer []byte) {
r.buffer = buffer
r.headString = 0
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kelindar/column
go 1.19

require (
github.com/imdario/mergo v0.3.13
github.com/kelindar/bitmap v1.4.1
github.com/kelindar/intmap v1.1.0
github.com/kelindar/iostream v1.3.0
Expand All @@ -13,6 +14,8 @@ require (
github.com/zeebo/xxh3 v1.0.2
)

require github.com/tidwall/btree v1.5.2 // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w=
github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
Expand Down
35 changes: 35 additions & 0 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,41 @@ func (txn *Txn) Range(fn func(idx uint32)) error {
return nil
}

// SortedRange ascends through a given SortedIndex and returns each offset
// remaining in the transaction's index
func (txn *Txn) SortedRange(sortIndexName string, fn func(idx uint32)) error {
Dreeseaw marked this conversation as resolved.
Show resolved Hide resolved
txn.initialize()

sortIndex, ok := txn.owner.cols.Load(sortIndexName)
if !ok {
return fmt.Errorf("column: no sorted index named '%v'", sortIndexName)
}

// TODO - better solution for linear txn index check
sortIndexCol, _ := sortIndex.Column.(*columnSortIndex)
sortIndexCol.btree.Scan(func (item SortIndexItem) bool {
/*
if txn.index.Contains(item.Value) {
fn(item.Value)
}
*/
// For each btree key, check if the offset is still in
// the txn's index & return if true

txn.rangeRead(func (chunk commit.Chunk, index bitmap.Bitmap) {
offset := chunk.Min()
index.Range(func(x uint32) {
if item.Value == offset + x {
txn.cursor = item.Value
fn(item.Value)
}
})
})
return true
})
return nil
}

// DeleteAll marks all of the items currently selected by this transaction for deletion. The
// actual delete will take place once the transaction is committed.
func (txn *Txn) DeleteAll() {
Expand Down
Loading