Skip to content

Commit

Permalink
Refactor API (#47)
Browse files Browse the repository at this point in the history
This PR introduces major breaking change in the API by removing the existing cursor and changing the way how the columns are accessed during transactions. Since columnar storage is intended to work on ranges of values, this speeds up range queries by around **~20-40%**.

In order to access the results of the iteration, prior to calling `Range()` method, we need to **first load column reader(s)** we are going to need, using methods such as `txn.String()`, `txn.Float64()`, etc. These prepare read/write buffers necessary to perform efficient lookups while iterating.

In the example below we select all of the rogues from our collection and print out their name by using the `Range()` method and accessing the "name" column using a column reader which is created by calling `txn.String("name")` method.

```go
players.Query(func(txn *Txn) error {
	names := txn.String("name") // Create a column reader

	return txn.With("rogue").Range(func(i uint32) {
		name, _ := names.Get()
		println("rogue name", name)
	})
})
```

Similarly, if you need to access more columns, you can simply create the appropriate column reader(s) and use them as shown in the example before.

```go
players.Query(func(txn *Txn) error {
	names := txn.String("name")
	ages  := txn.Int64("age")

	return txn.With("rogue").Range(func(i uint32) {
		name, _ := names.Get()
		age,  _ := ages.Get()

		println("rogue name", name)
		println("rogue age", age)
	})
})
```
  • Loading branch information
kelindar committed Jan 16, 2022
1 parent 45c34f5 commit 02b2c66
Show file tree
Hide file tree
Showing 25 changed files with 2,317 additions and 1,371 deletions.
228 changes: 123 additions & 105 deletions README.md

Large diffs are not rendered by default.

78 changes: 25 additions & 53 deletions collection.go
Expand Up @@ -139,66 +139,24 @@ func (c *Collection) InsertObjectWithTTL(obj Object, ttl time.Duration) (index u
}

// Insert executes a mutable cursor trasactionally at a new offset.
func (c *Collection) Insert(columnName string, fn func(v Cursor) error) (index uint32, err error) {
func (c *Collection) Insert(fn func(Row) error) (index uint32, err error) {
err = c.Query(func(txn *Txn) (innerErr error) {
index, innerErr = txn.Insert(columnName, fn)
index, innerErr = txn.Insert(fn)
return
})
return
}

// InsertWithTTL executes a mutable cursor trasactionally at a new offset and sets the expiration time
// based on the specified time-to-live and returns the allocated index.
func (c *Collection) InsertWithTTL(columnName string, ttl time.Duration, fn func(v Cursor) error) (index uint32, err error) {
func (c *Collection) InsertWithTTL(ttl time.Duration, fn func(Row) error) (index uint32, err error) {
err = c.Query(func(txn *Txn) (innerErr error) {
index, innerErr = txn.InsertWithTTL(columnName, ttl, fn)
index, innerErr = txn.InsertWithTTL(ttl, fn)
return
})
return
}

// UpdateAt updates a specific row by initiating a separate transaction for the update.
func (c *Collection) UpdateAt(idx uint32, columnName string, fn func(v Cursor) error) error {
return c.Query(func(txn *Txn) error {
return txn.UpdateAt(idx, columnName, fn)
})
}

// UpdateAtKey updates a specific row by initiating a separate transaction for the update.
func (c *Collection) UpdateAtKey(key, columnName string, fn func(v Cursor) error) error {
return c.Query(func(txn *Txn) error {
return txn.UpdateAtKey(key, columnName, fn)
})
}

// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool {
chunk := uint(idx >> chunkShift)
if idx >= uint32(len(c.fill))<<6 || !c.fill.Contains(idx) {
return false
}

// Lock the chunk which we are about to read and call the selector delegate
c.slock.RLock(chunk)
fn(Selector{idx: idx, col: c})
c.slock.RUnlock(chunk)
return true
}

// SelectAtKey performs a selection on a specific row specified by its key. It returns
// a boolean value indicating whether an element is present at the key or not.
func (c *Collection) SelectAtKey(key string, fn func(v Selector)) (found bool) {
if c.pk == nil {
return false
}

if idx, ok := c.pk.OffsetOf(key); ok {
found = c.SelectAt(idx, fn)
}
return
}

// DeleteAt attempts to delete an item at the specified index for this collection. If the item
// exists, it marks at as deleted and returns true, otherwise it returns false.
func (c *Collection) DeleteAt(idx uint32) (deleted bool) {
Expand Down Expand Up @@ -318,14 +276,28 @@ func (c *Collection) DropIndex(indexName string) error {
return nil
}

// QueryAt jumps at a particular offset in the collection, sets the cursor to the
// provided position and executes given callback fn.
func (c *Collection) QueryAt(idx uint32, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.QueryAt(idx, fn)
})
}

// QueryAt jumps at a particular key in the collection, sets the cursor to the
// provided position and executes given callback fn.
func (c *Collection) QueryKey(key string, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.QueryKey(key, fn)
})
}

// Query creates a transaction which allows for filtering and iteration over the
// columns in this collection. It also allows for individual rows to be modified or
// deleted during iteration (range), but the actual operations will be queued and
// executed after the iteration.
func (c *Collection) Query(fn func(txn *Txn) error) error {
c.lock.RLock()
txn := c.txns.acquire(c)
c.lock.RUnlock()

// Execute the query and keep the error for later
if err := fn(txn); err != nil {
Expand All @@ -344,7 +316,6 @@ func (c *Collection) Query(fn func(txn *Txn) error) error {
// Close closes the collection and clears up all of the resources.
func (c *Collection) Close() error {
c.cancel()

return nil
}

Expand All @@ -357,11 +328,12 @@ func (c *Collection) vacuum(ctx context.Context, interval time.Duration) {
ticker.Stop()
return
case <-ticker.C:
now := int(time.Now().UnixNano())
now := time.Now().UnixNano()
c.Query(func(txn *Txn) error {
return txn.With(expireColumn).Range(expireColumn, func(v Cursor) {
if expirateAt := v.Int(); expirateAt != 0 && now >= v.Int() {
v.Delete()
expire := txn.Int64(expireColumn)
return txn.With(expireColumn).Range(func(idx uint32) {
if expirateAt, ok := expire.Get(); ok && expirateAt != 0 && now >= expirateAt {
txn.DeleteAt(idx)
}
})
})
Expand Down

0 comments on commit 02b2c66

Please sign in to comment.