Skip to content

Commit

Permalink
[sort index] baseline implementation + simple test
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreeseaw committed Nov 27, 2022
1 parent 4057bea commit c72d725
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 44 deletions.
10 changes: 6 additions & 4 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (c *Collection) CreateIndex(indexName, columnName string, fn func(r Reader)
}

func (c *Collection) CreateSortIndex(indexName, columnName string) error {
if fn == nil || columnName == "" || indexName == "" {
return fmt.Errorf("column: create index must specify name, column and function")
if columnName == "" || indexName == "" {
return fmt.Errorf("column: create index must specify name & column")
}

// Prior to creating an index, we should have a column
Expand All @@ -244,17 +244,18 @@ func (c *Collection) CreateSortIndex(indexName, columnName string) error {
return fmt.Errorf("column: unable to create index, column '%v' does not exist", columnName)
}

// TODO
// Create and add the index column,
index := newSortIndex(indexName, columnName)
c.lock.Lock()
index.Grow(uint32(c.opts.Capacity))
// index.Grow(uint32(c.opts.Capacity))
c.cols.Store(indexName, index)
c.cols.Store(columnName, column, index)
c.lock.Unlock()

// TODO - utilize btree's Load method
// Iterate over all of the values of the target column, chunk by chunk and fill
// the index accordingly.
/*
chunks := c.chunks()
buffer := commit.NewBuffer(c.Count())
reader := commit.NewReader()
Expand All @@ -264,6 +265,7 @@ func (c *Collection) CreateSortIndex(indexName, columnName string) error {
index.Apply(chunk, reader)
}
}
*/

return nil
}
Expand Down
69 changes: 40 additions & 29 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package column

import (
"strings"
"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"

"github.com/tidwall/btree"
)

// --------------------------- Reader ---------------------------
Expand Down Expand Up @@ -100,76 +103,84 @@ func (c *columnIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
dst.PutBitmap(commit.PutTrue, chunk, c.fill)
}

// ----------------------- Sorted Index --------------------------

// TODO
// columnSortIndex represents a constantly sorted column
// SortIndexItem represents an offset sorted in a generic BTree
type SortIndexItem struct {
Key string
Value uint32
}

// columnSortIndex implements a constantly sorted column via BTree
type columnSortIndex struct {
bt *btree.BTree // simple b tree for now?
name string // The name of the target column
btree *btree.BTreeG[SortIndexItem] // 1 constantly sorted data structure
// TODO - look into a list of btree, 1 per chunk
name string // The name of the target column
}

// TODO
// newSortIndex creates a new bitmap index column.
func newSortIndex(indexName, columnName string) *column {
return columnFor(indexName, &columnIndex{
fill: make(bitmap.Bitmap, 0, 4),
name: columnName,
rule: rule,
byKeys := func (a, b SortIndexItem) bool {
return a.Key < b.Key
}
return columnFor(indexName, &columnSortIndex{
btree: btree.NewBTreeG[SortIndexItem](byKeys),
name: columnName,
})
}

// TODO
// Grow grows the size of the column until we have enough to store
func (c *columnSortIndex) Grow(idx uint32) {
c.fill.Grow(idx)
return
}

// Column returns the target name of the column on which this index should apply.
func (c *columnSortIndex) Column() string {
return c.name
}

// TODO
// Apply applies a set of operations to the column.
func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) {

// Index can only be updated based on the final stored value, so we can only work
// with put operations here. The trick is to update the final value after applying
// on the actual column.
// with put, merge, & delete operations here.

// TODO - account for updates to target column
// will require Delete @ old key + Set @ new key
for r.Next() {
sit := SortIndexItem{
Key: strings.Clone(r.String()), // alloc required
Value: uint32(r.Offset),
}
switch r.Type {
case commit.Put, commit.Merge:
if c.rule(r) {
c.fill.Set(uint32(r.Offset))
} else {
c.fill.Remove(uint32(r.Offset))
}
c.btree.Set(sit)
case commit.Delete:
c.fill.Remove(uint32(r.Offset))
c.btree.Delete(sit)
}
}
}

// TODO
// Value retrieves a value at a specified index.
func (c *columnIndex) Value(idx uint32) (v interface{}, ok bool) {
if idx < uint32(len(c.fill))<<6 {
v, ok = c.fill.Contains(idx), true
}
func (c *columnSortIndex) Value(idx uint32) (v interface{}, ok bool) {
return
}

// TODO
// Contains checks whether the column has a value at a specified index.
func (c *columnIndex) Contains(idx uint32) bool {
return c.fill.Contains(idx)
func (c *columnSortIndex) Contains(idx uint32) bool {
return true
}

// TODO
// Index returns the fill list for the column
func (c *columnIndex) Index(chunk commit.Chunk) bitmap.Bitmap {
return chunk.OfBitmap(c.fill)
func (c *columnSortIndex) Index(chunk commit.Chunk) bitmap.Bitmap {
return nil
}

// TODO
// Snapshot writes the entire column into the specified destination buffer
func (c *columnIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
func (c *columnSortIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) {
return
}
2 changes: 1 addition & 1 deletion commit/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *Reader) Rewind() {
r.Offset = r.start
}

// Use sets the buffer and resets the reader.
// use sets the buffer and resets the reader.
func (r *Reader) use(buffer []byte) {
r.buffer = buffer
r.head = 0
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kelindar/column
go 1.19

require (
github.com/imdario/mergo v0.3.13
github.com/kelindar/bitmap v1.4.1
github.com/kelindar/intmap v1.1.0
github.com/kelindar/iostream v1.3.0
Expand All @@ -13,6 +14,8 @@ require (
github.com/zeebo/xxh3 v1.0.2
)

require github.com/tidwall/btree v1.5.2 // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w=
github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
Expand Down
30 changes: 23 additions & 7 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,15 +388,31 @@ func (txn *Txn) Range(fn func(idx uint32)) error {
return nil
}

// TODO
func (txn *Txn) SortedRange(sortIndex string, fn func(idx uint32)) error {
// SortedRange ascends through a given SortedIndex and returns each offset
// remaining in the transaction's index
func (txn *Txn) SortedRange(sortIndexName string, fn func(idx uint32)) error {
txn.initialize()
txn.rangeRead(func(chunk commit.Chunk, index bitmap.Bitmap) {
offset := chunk.Min()
index.Range(func(x uint32) {
txn.cursor = offset + x
fn(offset + x)

sortIndex, ok := txn.owner.cols.Load(sortIndexName)
if !ok {
return fmt.Errorf("column: no sorted index named '%v'", sortIndexName)
}

// TODO - better solution for linear txn index check
sortIndexCol, _ := sortIndex.Column.(*columnSortIndex)
sortIndexCol.btree.Scan(func (item SortIndexItem) bool {
// For each btree key, check if the offset is still in
// the txn's index & return if true
txn.rangeRead(func (chunk commit.Chunk, index bitmap.Bitmap) {
offset := chunk.Min()
index.Range(func(x uint32) {
if item.Value == offset + x {
txn.cursor = item.Value
fn(item.Value)
}
})
})
return true
})
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,26 +262,31 @@ func TestIndexed(t *testing.T) {
func TestSortIndex(t *testing.T) {
c := NewCollection()
c.CreateColumn("col1", ForString())
c.CreateSortIndex("col1")
c.CreateSortIndex("sortedCol1", "col1")

c.InsertObject(map[string]interface{}{
"col1": "bob",
})
c.InsertObject(map[string]interface{}{
"col1": "carter",
})
c.InsertObject(map[string]interface{}{
"col1": "alice",
})

var res []string
c.Query(func (txn *Txn) {
c.Query(func (txn *Txn) error {
col1 := txn.String("col1")
txn.SortRange(func (i uint32) {
txn.SortedRange("sortedCol1", func (i uint32) {
name, _ := col1.Get()
res = append(res, name)
})
return nil
})

assert.Equal(t, "alice", res[0])
assert.Equal(t, "bob", res[1])
assert.Equal(t, "carter", res[2])
}

func TestDeleteAll(t *testing.T) {
Expand Down

0 comments on commit c72d725

Please sign in to comment.