Skip to content

Commit

Permalink
Adding record column (#73)
Browse files Browse the repository at this point in the history
This PR contains some refactoring and a new `record` column that allows
you to use a `BinaryMarshaler` and `BinaryUnmarshaler` type to be
stored. As such, it supports types that implement this standard way of
encoding, for example `time.Time`.


```go
col := NewCollection()
col.CreateColumn("timestamp", ForRecord(func() *time.Time {
	return new(time.Time)
}, nil))

// Insert the time, it implements binary marshaler
idx, _ := col.Insert(func(r Row) error {
	now := time.Unix(1667745766, 0)
	r.SetRecord("timestamp", &now)
	return nil
})

// We should be able to read back the time
col.QueryAt(idx, func(r Row) error {
	ts, ok := r.Record("timestamp")
	assert.True(t, ok)
	assert.Equal(t, "November", ts.(*time.Time).UTC().Month().String())
	return nil
})
```
  • Loading branch information
kelindar committed Nov 26, 2022
1 parent dbb0148 commit a761108
Show file tree
Hide file tree
Showing 28 changed files with 1,133 additions and 590 deletions.
142 changes: 63 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,53 +40,44 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc
- [Expiring Values](#expiring-values)
- [Transaction Commit and Rollback](#transaction-commit-and-rollback)
- [Using Primary Keys](#using-primary-keys)
- [Storing Binary Records](#storing-binary-records)
- [Streaming Changes](#streaming-changes)
- [Snapshot and Restore](#snapshot-and-restore)
- [Complete Example](#complete-example)
- [Examples](#examples)
- [Benchmarks](#benchmarks)
- [Contributing](#contributing)

## Collection and Columns

In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.

In the example below we're loading some `JSON` data by using `json.Unmarshal()` and auto-creating colums based on the first element on the loaded slice. After this is done, we can then load our data by inserting the objects one by one into the collection. This is accomplished by calling `InsertObject()` method on the collection itself repeatedly.
In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which needs to be specified by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function. In the example below we create a new collection with several columns.

```go
data := loadFromJson("players.json")

// Create a new columnar collection
players := column.NewCollection()
players.CreateColumnsOf(data[0])

// Insert every item from our loaded data
for _, v := range data {
players.InsertObject(v)
}
```

Now, let's say we only want specific columns to be added. We can do this by calling `CreateColumn()` method on the collection manually to create the required columns.

```go
// Create a new columnar collection with pre-defined columns
// Create a new collection with some columns
players := column.NewCollection()
players.CreateColumn("name", column.ForString())
players.CreateColumn("class", column.ForString())
players.CreateColumn("balance", column.ForFloat64())
players.CreateColumn("age", column.ForInt16())
```

// Insert every item from our loaded data
for _, v := range loadFromJson("players.json") {
players.InsertObject(v)
}
Now that we have created a collection, we can insert a single record by using `Insert()` method on the collection. In this example we're inserting a single row and manually specifying values. Note that this function returns an `index` that indicates the row index for the inserted row.

```go
index, err := players.Insert(func(r column.Row) error {
r.SetString("name", "merlin")
r.SetString("class", "mage")
r.SetFloat64("balance", 99.95)
r.SetInt16("age", 107)
return nil
})
```

While the previous example demonstrated how to insert many objects, it was doing it one by one and is rather inefficient. This is due to the fact that each `InsertObject()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.
While the previous example demonstrated how to insert a single row, inserting multiple rows this way is rather inefficient. This is due to the fact that each `Insert()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.

```go
players.Query(func(txn *column.Txn) error {
for _, v := range loadFromJson("players.json") {
txn.InsertObject(v)
for _, v := range myRawData {
txn.Insert(...)
}
return nil // Commit
})
Expand Down Expand Up @@ -356,6 +347,49 @@ players.QueryKey("merlin", func(r column.Row) error {
})
```

## Storing Binary Records

If you find yourself in need of encoding a more complex structure as a single column, you may do so by using `column.ForRecord()` function. This allows you to specify a `BinaryMarshaler` / `BinaryUnmarshaler` type that will get automatically encoded as a single column. In th example below we are creating a `Location` type that implements the required methods.

```go
type Location struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}

func (l Location) MarshalBinary() ([]byte, error) {
return json.Marshal(l)
}

func (l *Location) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, l)
}
```

Now that we have a record implementation, we can create a column for this struct by using `ForRecord()` function as shown below.

```go
players.CreateColumn("location", ForRecord(func() *Location {
return new(Location)
}, nil)) // no merging
```

In order to manipulate the record, we can use the appropriate `Record()`, `SetRecord()` methods of the `Row`, similarly to other column types.

```go
// Insert a new location
idx, _ := players.Insert(func(r Row) error {
r.SetRecord("location", &Location{X: 1, Y: 2})
return nil
})

// Read the location back
players.QueryAt(idx, func(r Row) error {
location, ok := r.Record("location")
return nil
})
```

## Streaming Changes

This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Logger` interface during the creation of the collection.
Expand Down Expand Up @@ -429,59 +463,9 @@ if err != nil {
err := players.Restore(src)
```

## Complete Example
## Examples

```go
func main(){

// Create a new columnar collection
players := column.NewCollection()
players.CreateColumn("serial", column.ForKey())
players.CreateColumn("name", column.ForEnum())
players.CreateColumn("active", column.ForBool())
players.CreateColumn("class", column.ForEnum())
players.CreateColumn("race", column.ForEnum())
players.CreateColumn("age", column.ForFloat64())
players.CreateColumn("hp", column.ForFloat64())
players.CreateColumn("mp", column.ForFloat64())
players.CreateColumn("balance", column.ForFloat64())
players.CreateColumn("gender", column.ForEnum())
players.CreateColumn("guild", column.ForEnum())

// index on humans
players.CreateIndex("human", "race", func(r column.Reader) bool {
return r.String() == "human"
})

// index for mages
players.CreateIndex("mage", "class", func(r column.Reader) bool {
return r.String() == "mage"
})

// index for old
players.CreateIndex("old", "age", func(r column.Reader) bool {
return r.Float() >= 30
})

// Load the items into the collection
loaded := loadFixture("players.json")
players.Query(func(txn *column.Txn) error {
for _, v := range loaded {
txn.InsertObject(v)
}
return nil
})

// Run an indexed query
players.Query(func(txn *column.Txn) error {
name := txn.Enum("name")
return txn.With("human", "mage", "old").Range(func(idx uint32) {
value, _ := name.Get()
println("old mage, human:", value)
})
})
}
```
Multiple complete usage examples of this library can be found in the [examples](https://github.com/kelindar/column/tree/main/examples) directory in this repository.

## Benchmarks

Expand Down
2 changes: 1 addition & 1 deletion codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
panic(err)
}

dst, err := os.OpenFile("column_numbers.go", os.O_RDWR|os.O_CREATE, os.ModePerm)
dst, err := os.OpenFile("column_numbers.go", os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
defer dst.Close()
if err != nil {
panic(err)
Expand Down
18 changes: 9 additions & 9 deletions codegen/numbers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@ func make{{.Name}}s(opts ...func(*option[{{.Type}}])) Column {
)
}

// {{.Type}}Writer represents a read-write accessor for {{.Type}}
type {{.Type}}Writer struct {
numericReader[{{.Type}}]
// rw{{.Name}} represents a read-write cursor for {{.Type}}
type rw{{.Name}} struct {
rdNumber[{{.Type}}]
writer *commit.Buffer
}

// Set sets the value at the current transaction cursor
func (s {{.Type}}Writer) Set(value {{.Type}}) {
func (s rw{{.Name}}) Set(value {{.Type}}) {
s.writer.Put{{.Name}}(commit.Put, s.txn.cursor, value)
}

// Merge atomically merges a delta to the value at the current transaction cursor
func (s {{.Type}}Writer) Merge(delta {{.Type}}) {
func (s rw{{.Name}}) Merge(delta {{.Type}}) {
s.writer.Put{{.Name}}(commit.Merge, s.txn.cursor, delta)
}

// {{.Name}} returns a read-write accessor for {{.Type}} column
func (txn *Txn) {{.Name}}(columnName string) {{.Type}}Writer {
return {{.Type}}Writer{
numericReader: numericReaderFor[{{.Type}}](txn, columnName),
writer: txn.bufferFor(columnName),
func (txn *Txn) {{.Name}}(columnName string) rw{{.Name}} {
return rw{{.Name}}{
rdNumber: readNumberOf[{{.Type}}](txn, columnName),
writer: txn.bufferFor(columnName),
}
}

Expand Down
28 changes: 3 additions & 25 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"github.com/kelindar/smutex"
)

// Object represents a single object
type Object = map[string]interface{}

const (
expireColumn = "expire"
rowColumn = "row"
Expand Down Expand Up @@ -127,25 +124,6 @@ func (c *Collection) findFreeIndex(count uint64) uint32 {
return idx
}

// InsertObject adds an object to a collection and returns the allocated index.
func (c *Collection) InsertObject(obj Object) (index uint32) {
c.Query(func(txn *Txn) error {
index, _ = txn.InsertObject(obj)
return nil
})
return
}

// InsertObjectWithTTL adds an object to a collection, sets the expiration time
// based on the specified time-to-live and returns the allocated index.
func (c *Collection) InsertObjectWithTTL(obj Object, ttl time.Duration) (index uint32) {
c.Query(func(txn *Txn) error {
index, _ = txn.InsertObjectWithTTL(obj, ttl)
return nil
})
return
}

// Insert executes a mutable cursor transactionally at a new offset.
func (c *Collection) Insert(fn func(Row) error) (index uint32, err error) {
err = c.Query(func(txn *Txn) (innerErr error) {
Expand Down Expand Up @@ -181,9 +159,9 @@ func (c *Collection) createColumnKey(columnName string, column *columnKey) error
return nil
}

// CreateColumnsOf registers a set of columns that are present in the target object.
func (c *Collection) CreateColumnsOf(object Object) error {
for k, v := range object {
// CreateColumnsOf registers a set of columns that are present in the target map.
func (c *Collection) CreateColumnsOf(value map[string]any) error {
for k, v := range value {
column, err := ForKind(reflect.TypeOf(v).Kind())
if err != nil {
return err
Expand Down
Loading

0 comments on commit a761108

Please sign in to comment.