Skip to content

Commit

Permalink
Add Snapshot and Restore
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 22, 2021
1 parent 46b180b commit bd09c3f
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 111 deletions.
52 changes: 25 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ This package contains a **high-performance, columnar, in-memory storage engine**

## Features

* Optimized, cache-friendly **columnar data layout** that minimizes cache-misses.
* Optimized for **zero heap allocation** during querying (see benchmarks below).
* Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`.
* Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap).
* Support for **columnar projection** (i.e. "select" clause) for fast retrieval.
* Support for **computed indexes** that are dynamically calculated based on provided predicate.
* Support for **concurrent updates** using sharded latches to keep things fast.
* Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
* Support for **expiration** of rows based on time-to-live or expiration column.
* Support for **atomic increment/decrement** of numerical values, transactionally.
* Support for **change data stream** that streams all commits consistently.
- Optimized, cache-friendly **columnar data layout** that minimizes cache-misses.
- Optimized for **zero heap allocation** during querying (see benchmarks below).
- Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`.
- Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap).
- Support for **columnar projection** (i.e. "select" clause) for fast retrieval.
- Support for **computed indexes** that are dynamically calculated based on provided predicate.
- Support for **concurrent updates** using sharded latches to keep things fast.
- Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
- Support for **expiration** of rows based on time-to-live or expiration column.
- Support for **atomic increment/decrement** of numerical values, transactionally.
- Support for **change data stream** that streams all commits consistently.
- Support for **concurrent snapshotting** allowing to store the entire collection into a file.

## Documentation

The general idea is to leverage cache-friendly ways of organizing data in [structures of arrays (SoA)](https://en.wikipedia.org/wiki/AoS_and_SoA) otherwise known "columnar" storage in database design. This, in turn allows us to iterate and filter over columns very efficiently. On top of that, this package also adds [bitmap indexing](https://en.wikipedia.org/wiki/Bitmap_index) to the columnar storage, allowing to build filter queries using binary `and`, `and not`, `or` and `xor` (see [kelindar/bitmap](https://github.com/kelindar/bitmap) with SIMD support).
The general idea is to leverage cache-friendly ways of organizing data in [structures of arrays (SoA)](https://en.wikipedia.org/wiki/AoS_and_SoA) otherwise known "columnar" storage in database design. This, in turn allows us to iterate and filter over columns very efficiently. On top of that, this package also adds [bitmap indexing](https://en.wikipedia.org/wiki/Bitmap_index) to the columnar storage, allowing to build filter queries using binary `and`, `and not`, `or` and `xor` (see [kelindar/bitmap](https://github.com/kelindar/bitmap) with SIMD support).

- [Collection and Columns](#collection-and-columns)
- [Querying and Indexing](#querying-and-indexing)
Expand All @@ -43,7 +44,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc

## Collection and Columns

In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.
In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.

In the example below we're loading some `JSON` data by using `json.Unmarshal()` and auto-creating colums based on the first element on the loaded slice. After this is done, we can then load our data by inserting the objects one by one into the collection. This is accomplished by calling `Insert()` method on the collection itself repeatedly.

Expand Down Expand Up @@ -89,7 +90,7 @@ players.Query(func(txn *Txn) error {

## Querying and Indexing

The store allows you to query the data based on a presence of certain attributes or their values. In the example below we are querying our collection and applying a *filtering* operation bu using `WithValue()` method on the transaction. This method scans the values and checks whether a certain predicate evaluates to `true`. In this case, we're scanning through all of the players and looking up their `class`, if their class is equal to "rogue", we'll take it. At the end, we're calling `Count()` method that simply counts the result set.
The store allows you to query the data based on a presence of certain attributes or their values. In the example below we are querying our collection and applying a _filtering_ operation bu using `WithValue()` method on the transaction. This method scans the values and checks whether a certain predicate evaluates to `true`. In this case, we're scanning through all of the players and looking up their `class`, if their class is equal to "rogue", we'll take it. At the end, we're calling `Count()` method that simply counts the result set.

```go
// This query performs a full scan of "class" column
Expand All @@ -101,7 +102,7 @@ players.Query(func(txn *column.Txn) error {
})
```

Now, what if we'll need to do this query very often? It is possible to simply *create an index* with the same predicate and have this computation being applied every time (a) an object is inserted into the collection and (b) an value of the dependent column is updated. Let's look at the example below, we're fist creating a `rogue` index which depends on "class" column. This index applies the same predicate which only returns `true` if a class is "rogue". We then can query this by simply calling `With()` method and providing the index name.
Now, what if we'll need to do this query very often? It is possible to simply _create an index_ with the same predicate and have this computation being applied every time (a) an object is inserted into the collection and (b) an value of the dependent column is updated. Let's look at the example below, we're fist creating a `rogue` index which depends on "class" column. This index applies the same predicate which only returns `true` if a class is "rogue". We then can query this by simply calling `With()` method and providing the index name.

An index is essentially akin to a boolean column, so you could technically also select it's value when querying it. Now, in this example the query would be around `10-100x` faster to execute as behind the scenes it uses [bitmap indexing](https://github.com/kelindar/bitmap) for the "rogue" index and performs a simple logical `AND` operation on two bitmaps when querying. This avoid the entire scanning and applying of a predicate during the `Query`.

Expand Down Expand Up @@ -140,7 +141,7 @@ players.Query(func(txn *Txn) error {
})
```

Now, you can combine all of the methods and keep building more complex queries. When querying indexed and non-indexed fields together it is important to know that as every scan will apply to only the selection, speeding up the query. So if you have a filter on a specific index that selects 50% of players and then you perform a scan on that (e.g. `WithValue()`), it will only scan 50% of users and hence will be 2x faster.
Now, you can combine all of the methods and keep building more complex queries. When querying indexed and non-indexed fields together it is important to know that as every scan will apply to only the selection, speeding up the query. So if you have a filter on a specific index that selects 50% of players and then you perform a scan on that (e.g. `WithValue()`), it will only scan 50% of users and hence will be 2x faster.

```go
// How many rogues that are over 30 years old?
Expand All @@ -156,8 +157,8 @@ players.Query(func(txn *Txn) error {

In all of the previous examples, we've only been doing `Count()` operation which counts the number of elements in the result set. In this section we'll look how we can iterate over the result set. In short, there's 2 main methods that allow us to do it:

1. `Range()` method which takes in a column name as an argument and allows faster get/set of the values for that column.
2. `Select()` method which doesn't pre-select any specific column, so it's usually a bit slower and it also does not allow any updates.
1. `Range()` method which takes in a column name as an argument and allows faster get/set of the values for that column.
2. `Select()` method which doesn't pre-select any specific column, so it's usually a bit slower and it also does not allow any updates.

Let's first examine the `Range()` method. In the example below we select all of the rogues from our collection and print out their name by using the `Range()` method and providing "name" column to it. The callback containing the `Cursor` allows us to quickly get the value of the column by calling `String()` method to retrieve a string value. It also contains methods such as `Int()`, `Uint()`, `Float()` or more generic `Value()` to pull data of different types.

Expand Down Expand Up @@ -194,7 +195,6 @@ players.Query(func(txn *Txn) error {
})
```


Now, what if you need to quickly delete all some of the data in the collection? In this case `DeleteAll()` or `DeleteIf()` methods come in handy. These methods are very fast (especially `DeleteAll()`) and allow you to quickly delete the appropriate results, transactionally. In the example below we delete all of the rogues from the collection by simply selecting them in the transaction and calling the `DeleteAll()` method.

```go
Expand All @@ -220,7 +220,7 @@ players.Query(func(txn *Txn) error {
})
```

In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add..()` or `Add..At()` operations of the `Cursor` or `Selector`. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by *500* atomically.
In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add..()` or `Add..At()` operations of the `Cursor` or `Selector`. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.

```go
players.Query(func(txn *Txn) error {
Expand All @@ -235,7 +235,7 @@ players.Query(func(txn *Txn) error {

Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `InsertWithTTL()` method on the collection that allows to insert an object with a time-to-live duration defined.

In the example below we are inserting an object to the collection and setting the time-to-live to *5 seconds* from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.
In the example below we are inserting an object to the collection and setting the time-to-live to _5 seconds_ from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.

```go
players.InsertWithTTL(map[string]interface{}{
Expand Down Expand Up @@ -284,11 +284,10 @@ players.Query(func(txn *column.Txn) error {
})

// Returns an error, transaction will be rolled back
return fmt.Errorf("bug")
return fmt.Errorf("bug")
})
```


## Streaming Changes

This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Writer` interface during the creation of the collection.
Expand Down Expand Up @@ -334,7 +333,6 @@ go func() {
}()
```


## Complete Example

```go
Expand Down Expand Up @@ -365,7 +363,7 @@ func main(){
players.Insert(v)
}

// This performs a full scan on 3 different columns and compares them given the
// This performs a full scan on 3 different columns and compares them given the
// specified predicates. This is not indexed, but does a columnar scan which is
// cache-friendly.
players.Query(func(txn *column.Txn) error {
Expand All @@ -388,7 +386,7 @@ func main(){
return nil
})

// Same condition as above, but we also select the actual names of those
// Same condition as above, but we also select the actual names of those
// players and iterate through them.
players.Query(func(txn *column.Txn) error {
txn.With("human", "mage", "old").Range("name", func(v column.Cursor) {
Expand Down Expand Up @@ -444,7 +442,7 @@ running update of balance of everyone...
running update of age of mages...
-> updated 6040000 rows
-> update took 85.669422ms
-> update took 85.669422ms
```

## Contributing
Expand Down
53 changes: 31 additions & 22 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ const (

// Collection represents a collection of objects in a columnar format
type Collection struct {
count uint64 // The current count of elements
txns *txnPool // The transaction pool
lock sync.RWMutex // The mutex to guard the fill-list
slock *smutex.SMutex128 // The sharded mutex for the collection
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
opts Options // The options configured
codec codec // The compression codec
logger commit.Logger // The commit writer
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
count uint64 // The current count of elements
txns *txnPool // The transaction pool
lock sync.RWMutex // The mutex to guard the fill-list
slock *smutex.SMutex128 // The sharded mutex for the collection
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
opts Options // The options configured
codec codec // The compression codec
logger commit.Logger // The commit logger for CDC
record *commit.Log // The commit logger for snapshot
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
commits []uint64 // The array of commit IDs for corresponding chunk
}

// Options represents the options for a collection.
Expand Down Expand Up @@ -71,14 +73,15 @@ func NewCollection(opts ...Options) *Collection {
// Create a new collection
ctx, cancel := context.WithCancel(context.Background())
store := &Collection{
cols: makeColumns(8),
txns: newTxnPool(),
opts: options,
slock: new(smutex.SMutex128),
fill: make(bitmap.Bitmap, 0, options.Capacity>>6),
logger: options.Writer,
codec: newCodec(&options),
cancel: cancel,
cols: makeColumns(8),
txns: newTxnPool(),
opts: options,
slock: new(smutex.SMutex128),
fill: make(bitmap.Bitmap, 0, options.Capacity>>6),
logger: options.Writer,
codec: newCodec(&options),
cancel: cancel,
commits: make([]uint64, 128),
}

// Create an expiration column and start the cleanup goroutine
Expand Down Expand Up @@ -327,6 +330,7 @@ func (c *Collection) Query(fn func(txn *Txn) error) error {
// Close closes the collection and clears up all of the resources.
func (c *Collection) Close() error {
c.cancel()

return nil
}

Expand Down Expand Up @@ -373,10 +377,15 @@ type columnEntry struct {
cols []*column // The columns and its computed
}

// Count returns the number of columns
func (c *columns) Count() int {
// Count returns the number of columns, excluding indexes.
func (c *columns) Count() (count int) {
cols := c.cols.Load().([]columnEntry)
return len(cols)
for _, v := range cols {
if !v.cols[0].IsIndex() {
count++
}
}
return
}

// Range iterates over columns in the registry. This is faster than RangeUntil
Expand Down
29 changes: 18 additions & 11 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,25 @@ func TestInsertWithTTL(t *testing.T) {

// loadPlayers loads a list of players from the fixture
func loadPlayers(amount int) *Collection {
out := newEmpty(amount)

// Load and copy until we reach the amount required
data := loadFixture("players.json")
for i := 0; i < amount/len(data); i++ {
out.Query(func(txn *Txn) error {
for _, p := range data {
txn.InsertObject(p)
}
return nil
})
}
return out
}

// newEmpty creates a new empty collection for a the fixture
func newEmpty(capacity int) *Collection {
out := NewCollection(Options{
Capacity: amount,
Capacity: capacity,
Vacuum: 500 * time.Millisecond,
Writer: new(noopWriter),
})
Expand Down Expand Up @@ -504,16 +521,6 @@ func loadPlayers(amount int) *Collection {
return r.Float() >= 30
})

// Load and copy until we reach the amount required
data := loadFixture("players.json")
for i := 0; i < amount/len(data); i++ {
out.Query(func(txn *Txn) error {
for _, p := range data {
txn.InsertObject(p)
}
return nil
})
}
return out
}

Expand Down
6 changes: 6 additions & 0 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func columnFor(name string, v Column) *column {
}
}

// IsIndex returns whether the column is an index
func (c *column) IsIndex() bool {
_, ok := c.Column.(*columnIndex)
return ok
}

// Is checks whether a column type supports certain numerical operations.
func (c *column) IsNumeric() bool {
return (c.kind & typeNumeric) == typeNumeric
Expand Down
11 changes: 10 additions & 1 deletion commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var id uint64 = uint64(time.Now().UnixNano())

// Next returns the next commit ID
func Next() uint64 {
func next() uint64 {
return atomic.AddUint64(&id, 1)
}

Expand Down Expand Up @@ -80,6 +80,15 @@ type Commit struct {
Updates []*Buffer // The update buffers
}

// New creates a new commit for a chunk and an array of buffers
func New(chunk Chunk, buffers []*Buffer) Commit {
return Commit{
ID: next(),
Chunk: chunk,
Updates: buffers,
}
}

// Clone clones a commit into a new one
func (c *Commit) Clone() (clone Commit) {
clone.Chunk = c.Chunk
Expand Down
12 changes: 4 additions & 8 deletions commit/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,10 @@ func TestMin(t *testing.T) {

func TestCommitCodec(t *testing.T) {
buffer := bytes.NewBuffer(nil)
input := Commit{
ID: Next(),
Chunk: 0,
Updates: []*Buffer{
newInterleaved("a"),
newInterleaved("b"),
},
}
input := New(0, []*Buffer{
newInterleaved("a"),
newInterleaved("b"),
})

// Write into the buffer
n, err := input.WriteTo(buffer)
Expand Down
5 changes: 4 additions & 1 deletion commit/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func OpenFile(filename string) (*Log, error) {

// OpenTemp opens a temporary commit log file with read/write permissions
func OpenTemp() (*Log, error) {
return openFile(os.CreateTemp("", "*.log"))
return openFile(os.CreateTemp("", "column_*.log"))
}

// openFile opens a file or returns the error provided
Expand Down Expand Up @@ -120,6 +120,9 @@ func (l *Log) Name() (name string) {

// Close closes the source log file
func (l *Log) Close() (err error) {
l.lock.Lock()
defer l.lock.Unlock()

if closer, ok := l.source.(io.Closer); ok {
err = closer.Close()
}
Expand Down

0 comments on commit bd09c3f

Please sign in to comment.