Skip to content

Commit

Permalink
removed selector
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Jan 16, 2022
1 parent 700880e commit 6fdb003
Show file tree
Hide file tree
Showing 15 changed files with 918 additions and 803 deletions.
132 changes: 69 additions & 63 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc

In order to get data into the store, you'll need to first create a `Collection` by calling `NewCollection()` method. Each collection requires a schema, which can be either specified manually by calling `CreateColumn()` multiple times or automatically inferred from an object by calling `CreateColumnsOf()` function.

In the example below we're loading some `JSON` data by using `json.Unmarshal()` and auto-creating colums based on the first element on the loaded slice. After this is done, we can then load our data by inserting the objects one by one into the collection. This is accomplished by calling `Insert()` method on the collection itself repeatedly.
In the example below we're loading some `JSON` data by using `json.Unmarshal()` and auto-creating colums based on the first element on the loaded slice. After this is done, we can then load our data by inserting the objects one by one into the collection. This is accomplished by calling `InsertObject()` method on the collection itself repeatedly.

```go
data := loadFromJson("players.json")
Expand All @@ -57,7 +57,7 @@ players.CreateColumnsOf(data[0])

// Insert every item from our loaded data
for _, v := range data {
players.Insert(v)
players.InsertObject(v)
}
```

Expand All @@ -73,16 +73,16 @@ players.CreateColumn("age", column.ForInt16())

// Insert every item from our loaded data
for _, v := range loadFromJson("players.json") {
players.Insert(v)
players.InsertObject(v)
}
```

While the previous example demonstrated how to insert many objects, it was doing it one by one and is rather inefficient. This is due to the fact that each `Insert()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.
While the previous example demonstrated how to insert many objects, it was doing it one by one and is rather inefficient. This is due to the fact that each `InsertObject()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.

```go
players.Query(func(txn *Txn) error {
for _, v := range loadFromJson("players.json") {
txn.Insert(v)
txn.InsertObject(v)
}
return nil // Commit
})
Expand Down Expand Up @@ -164,22 +164,29 @@ Let's first examine the `Range()` method. In the example below we select all of

```go
players.Query(func(txn *Txn) error {
txn.With("rogue").Range("name", func(v column.Cursor) {
println("rogue name ", v.String()) // Prints the name
names := txn.String("name")

return txn.With("rogue").Range(func(i uint32) {
name, _ := names.Get()
println("rogue name", name)
})
return nil
})
```

Now, what if you need two columns? The range only allows you to quickly select a single column, but you can still retrieve other columns by their name during the iteration. This can be accomplished by corresponding `StringAt()`, `FloatAt()`, `IntAt()`, `UintAt()` or `ValueAt()` methods as shown below.

```go
players.Query(func(txn *Txn) error {
txn.With("rogue").Range("name", func(v column.Cursor) {
println("rogue name ", v.String()) // Prints the name
println("rogue age ", v.IntAt("age")) // Prints the age
names := txn.String("name")
ages := txn.Int64("age")

return txn.With("rogue").Range(func(i uint32) {
name, _ := names.Get()
age, _ := ages.Get()

println("rogue name", name)
println("rogue age", age)
})
return nil
})
```

Expand Down Expand Up @@ -212,22 +219,25 @@ In the example below we're selecting all of the rogues and updating both their b

```go
players.Query(func(txn *Txn) error {
txn.With("rogue").Range("balance", func(v column.Cursor) {
v.SetFloat64(10.0) // Update the "balance" to 10.0
v.SetInt64At("age", 50) // Update the "age" to 50
}) // Select the balance
return nil
balance := txn.Float64("balance")
age := txn.Int64("age")

return txn.With("rogue").Range(func(i uint32) {
balance.Set(10.0) // Update the "balance" to 10.0
age.Set(50) // Update the "age" to 50
})
})
```

In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add..()` or `Add..At()` operations of the `Cursor` or `Selector`. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.

```go
players.Query(func(txn *Txn) error {
txn.With("rogue").Range("balance", func(v column.Cursor) {
v.AddFloat64(500.0) // Increment the "balance" by 500
balance := txn.Float64("balance")

return txn.With("rogue").Range(func(i uint32) {
balance.Add(500.0) // Increment the "balance" by 500
})
return nil
})
```

Expand All @@ -238,7 +248,7 @@ Sometimes, it is useful to automatically delete certain rows when you do not nee
In the example below we are inserting an object to the collection and setting the time-to-live to _5 seconds_ from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.

```go
players.InsertWithTTL(map[string]interface{}{
players.InsertObjectWithTTL(map[string]interface{}{
"name": "Merlin",
"class": "mage",
"age": 55,
Expand All @@ -250,10 +260,14 @@ On an interestig node, since `expire` column which is automatically added to eac

```go
players.Query(func(txn *column.Txn) error {
return txn.Range("expire", func(v column.Cursor) {
oldExpire := time.Unix(0, v.Int()) // Convert expiration to time.Time
newExpire := expireAt.Add(1 * time.Hour).UnixNano() // Add some time
v.Set(newExpire)
expire := txn.Int64("expire")

return txn.Range(func(i uint32) {
if v, ok := expire.Get(); ok && v > 0 {
oldExpire := time.Unix(0, v) // Convert expiration to time.Time
newExpire := expireAt.Add(1 * time.Hour).UnixNano() // Add some time
expire.Set(newExpire)
}
})
})
```
Expand All @@ -265,7 +279,8 @@ Transactions allow for isolation between two concurrent operations. In fact, all
```go
// Range over all of the players and update (successfully their balance)
players.Query(func(txn *column.Txn) error {
txn.Range("balance", func(v column.Cursor) {
balance := txn.Float64("balance")
txn.Range(func(i uint32) {
v.Set(10.0) // Update the "balance" to 10.0
})

Expand All @@ -279,7 +294,8 @@ Now, in this example, we try to update balance but a query callback returns an e
```go
// Range over all of the players and update (successfully their balance)
players.Query(func(txn *column.Txn) error {
txn.Range("balance", func(v column.Cursor) {
balance := txn.Float64("balance")
txn.Range(func(i uint32) {
v.Set(10.0) // Update the "balance" to 10.0
})

Expand Down Expand Up @@ -340,59 +356,49 @@ func main(){

// Create a new columnar collection
players := column.NewCollection()
players.CreateColumn("serial", column.ForKey())
players.CreateColumn("name", column.ForEnum())
players.CreateColumn("active", column.ForBool())
players.CreateColumn("class", column.ForEnum())
players.CreateColumn("race", column.ForEnum())
players.CreateColumn("age", column.ForFloat64())
players.CreateColumn("hp", column.ForFloat64())
players.CreateColumn("mp", column.ForFloat64())
players.CreateColumn("balance", column.ForFloat64())
players.CreateColumn("gender", column.ForEnum())
players.CreateColumn("guild", column.ForEnum())

// index on humans
players.CreateIndex("human", "race", func(v interface{}) bool {
return v == "human"
players.CreateIndex("human", "race", func(r column.Reader) bool {
return r.String() == "human"
})

// index for mages
players.CreateIndex("mage", "class", func(v interface{}) bool {
return v == "mage"
players.CreateIndex("mage", "class", func(r column.Reader) bool {
return r.String() == "mage"
})

// index for old
players.CreateIndex("old", "age", func(v interface{}) bool {
return v.(float64) >= 30
players.CreateIndex("old", "age", func(r column.Reader) bool {
return r.Float() >= 30
})

// Load the items into the collection
loaded := loadFixture("players.json")
players.CreateColumnsOf(loaded[0])
for _, v := range loaded {
players.Insert(v)
}

// This performs a full scan on 3 different columns and compares them given the
// specified predicates. This is not indexed, but does a columnar scan which is
// cache-friendly.
players.Query(func(txn *column.Txn) error {
println(txn.WithString("race", func(v string) bool {
return v == "human"
}).WithString("class", func(v string) bool {
return v == "mage"
}).WithFloat("age", func(v float64) bool {
return v >= 30
}).Count()) // prints the count
return nil
})

// This performs a count, but instead of scanning through the entire dataset, it scans
// over pre-built indexes and combines them using a logical AND operation. The result
// will be the same as the query above but the performance of the query is 10x-100x
// faster depending on the size of the underlying data.
players.Query(func(txn *column.Txn) error {
println(txn.With("human", "mage", "old").Count()) // prints the count
for _, v := range loaded {
txn.InsertObject(v)
}
return nil
})

// Same condition as above, but we also select the actual names of those
// players and iterate through them.
// Run an indexed query
players.Query(func(txn *column.Txn) error {
txn.With("human", "mage", "old").Range("name", func(v column.Cursor) {
println(v.String()) // prints the name
}) // The column to select
return nil
name := txn.Enum("name")
return txn.With("human", "mage", "old").Range(func(idx uint32) {
value, _ := name.Get()
println("old mage, human:", value)
})
})
}
```
Expand Down
61 changes: 30 additions & 31 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,34 +171,6 @@ func (c *Collection) UpdateAtKey(key string, fn func(txn *Txn) error) error {
})
}

// SelectAt performs a selection on a specific row specified by its index. It returns
// a boolean value indicating whether an element is present at the index or not.
func (c *Collection) SelectAt(idx uint32, fn func(v Selector)) bool {
chunk := uint(idx >> chunkShift)
if idx >= uint32(len(c.fill))<<6 || !c.fill.Contains(idx) {
return false
}

// Lock the chunk which we are about to read and call the selector delegate
c.slock.RLock(chunk)
fn(Selector{idx: idx, col: c})
c.slock.RUnlock(chunk)
return true
}

// SelectAtKey performs a selection on a specific row specified by its key. It returns
// a boolean value indicating whether an element is present at the key or not.
func (c *Collection) SelectAtKey(key string, fn func(v Selector)) (found bool) {
if c.pk == nil {
return false
}

if idx, ok := c.pk.OffsetOf(key); ok {
found = c.SelectAt(idx, fn)
}
return
}

// DeleteAt attempts to delete an item at the specified index for this collection. If the item
// exists, it marks at as deleted and returns true, otherwise it returns false.
func (c *Collection) DeleteAt(idx uint32) (deleted bool) {
Expand Down Expand Up @@ -323,9 +295,7 @@ func (c *Collection) DropIndex(indexName string) error {
// deleted during iteration (range), but the actual operations will be queued and
// executed after the iteration.
func (c *Collection) Query(fn func(txn *Txn) error) error {
c.lock.RLock()
txn := c.txns.acquire(c)
c.lock.RUnlock()

// Execute the query and keep the error for later
if err := fn(txn); err != nil {
Expand All @@ -341,10 +311,39 @@ func (c *Collection) Query(fn func(txn *Txn) error) error {
return nil
}

// Select ...
/*
func (c *Collection) Select(fn func(s Selector) error) error {
txn := c.txns.acquire(c)
err := fn(Selector{txn: txn})
txn.rollback()
c.txns.release(txn)
return err
}
// SelectAt performs a selection on a specific row specified by its index.
func (c *Collection) SelectAt(idx uint32, fn func(s Selector) error) error {
return c.Select(func(s Selector) error {
return s.SelectAt(idx, fn)
})
}
// SelectAtKey performs a selection on a specific row specified by its key.
func (c *Collection) SelectAtKey(key string, fn func(vs Selector) error) error {
if c.pk == nil {
return nil
}
if idx, ok := c.pk.OffsetOf(key); ok {
return c.SelectAt(idx, fn)
}
return nil
}
*/

// Close closes the collection and clears up all of the resources.
func (c *Collection) Close() error {
c.cancel()

return nil
}

Expand Down

0 comments on commit 6fdb003

Please sign in to comment.