Skip to content

Commit

Permalink
Change hooks collection type from hashmap to btree
Browse files Browse the repository at this point in the history
This commit changes the collection type that holds all of the
hooks from a hashmap to a btree. This allows for better
flexibility for operations that need to perform range searches
and scanning of the collection.
  • Loading branch information
tidwall committed Sep 13, 2021
1 parent 83094b2 commit 8829b8f
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 72 deletions.
6 changes: 4 additions & 2 deletions internal/server/aof.go
Expand Up @@ -224,11 +224,13 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error {
func (s *Server) getQueueCandidates(d *commandDetails) []*Hook {
candidates := make(map[*Hook]bool)
// add the hooks with "outside" detection
for _, hook := range s.hooksOut {
s.hooksOut.Ascend(nil, func(v interface{}) bool {
hook := v.(*Hook)
if hook.Key == d.key {
candidates[hook] = true
}
}
return true
})
// look for candidates that might "cross" geofences
if d.oldObj != nil && d.obj != nil && s.hookCross.Len() > 0 {
r1, r2 := d.oldObj.Rect(), d.obj.Rect()
Expand Down
16 changes: 9 additions & 7 deletions internal/server/aofshrink.go
Expand Up @@ -3,11 +3,11 @@ package server
import (
"math"
"os"
"sort"
"strconv"
"strings"
"time"

"github.com/tidwall/btree"
"github.com/tidwall/geojson"
"github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/collection"
Expand Down Expand Up @@ -169,17 +169,19 @@ func (server *Server) aofshrink() {
func() {
server.mu.Lock()
defer server.mu.Unlock()
for name := range server.hooks {
hnames = append(hnames, name)
}
hnames = make([]string, 0, server.hooks.Len())
server.hooks.Walk(func(v []interface{}) {
for _, v := range v {
hnames = append(hnames, v.(*Hook).Name)
}
})
}()
// sort the names for consistency
sort.Strings(hnames)
var hookHint btree.PathHint
for _, name := range hnames {
func() {
server.mu.Lock()
defer server.mu.Unlock()
hook := server.hooks[name]
hook, _ := server.hooks.GetHint(name, &hookHint).(*Hook)
if hook == nil {
return
}
Expand Down
15 changes: 10 additions & 5 deletions internal/server/crud.go
Expand Up @@ -462,12 +462,14 @@ func (server *Server) cmdRename(msg *Message, nx bool) (res resp.Value, d comman
err = errKeyNotFound
return
}
for _, h := range server.hooks {
server.hooks.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook)
if h.Key == d.key || h.Key == d.newKey {
err = errKeyHasHooksSet
return
return false
}
}
return true
})
d.command = "rename"
newCol := server.getCol(d.newKey)
if newCol == nil {
Expand Down Expand Up @@ -505,14 +507,17 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
err = errInvalidNumberOfArguments
return
}

// clear the entire database
server.cols = btree.NewNonConcurrent(byCollectionKey)
server.groupHooks = btree.NewNonConcurrent(byGroupHook)
server.groupObjects = btree.NewNonConcurrent(byGroupObject)
server.hookExpires = btree.NewNonConcurrent(byHookExpires)
server.hooks = make(map[string]*Hook)
server.hooksOut = make(map[string]*Hook)
server.hooks = btree.NewNonConcurrent(byHookName)
server.hooksOut = btree.NewNonConcurrent(byHookName)
server.hookTree = &rtree.RTree{}
server.hookCross = &rtree.RTree{}

d.command = "flushdb"
d.updated = true
d.timestamp = time.Now()
Expand Down
91 changes: 48 additions & 43 deletions internal/server/hooks.go
Expand Up @@ -22,18 +22,8 @@ var hookLogSetDefaults = &buntdb.SetOptions{
TTL: time.Second * 30,
}

type hooksByName []*Hook

func (a hooksByName) Len() int {
return len(a)
}

func (a hooksByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}

func (a hooksByName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
func byHookName(a, b interface{}) bool {
return a.(*Hook).Name < b.(*Hook).Name
}

func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
Expand Down Expand Up @@ -159,7 +149,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (

return NOMessage, d, err
}
prevHook := s.hooks[name]
prevHook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
if prevHook != nil {
if prevHook.channel != chanCmd {
return NOMessage, d,
Expand All @@ -180,8 +170,8 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
}
}
prevHook.Close()
delete(s.hooks, name)
delete(s.hooksOut, name)
s.hooks.Delete(prevHook)
s.hooksOut.Delete(prevHook)
if !prevHook.expires.IsZero() {
s.hookExpires.Delete(prevHook)
}
Expand All @@ -191,9 +181,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
d.updated = true
d.timestamp = time.Now()

s.hooks[name] = hook
s.hooks.Set(hook)
if hook.Fence.detect == nil || hook.Fence.detect["outside"] {
s.hooksOut[name] = hook
s.hooksOut.Set(hook)
}

// remove previous hook from spatial index
Expand Down Expand Up @@ -264,11 +254,12 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
if len(vs) != 0 {
return NOMessage, d, errInvalidNumberOfArguments
}
if hook, ok := s.hooks[name]; ok && hook.channel == chanCmd {
hook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
if hook != nil && hook.channel == chanCmd {
hook.Close()
// remove hook from maps
delete(s.hooks, hook.Name)
delete(s.hooksOut, hook.Name)
s.hooks.Delete(hook)
s.hooksOut.Delete(hook)
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
Expand Down Expand Up @@ -320,18 +311,20 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
}

count := 0
for name, hook := range s.hooks {
var hooks []*Hook
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
hooks = append(hooks, hook)
return true
})

for _, hook := range hooks {
if hook.channel != channel {
continue
}
match, _ := glob.Match(pattern, name)
if !match {
continue
}
hook.Close()
// remove hook from maps
delete(s.hooks, hook.Name)
delete(s.hooksOut, hook.Name)
s.hooks.Delete(hook)
s.hooksOut.Delete(hook)
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
Expand Down Expand Up @@ -365,6 +358,26 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
return
}

func (s *Server) forEachHookByPattern(
pattern string, channel bool, iter func(hook *Hook) bool,
) {
g := glob.Parse(pattern, false)
hasUpperLimit := g.Limits[1] != ""
s.hooks.Ascend(&Hook{Name: g.Limits[0]}, func(v interface{}) bool {
hook := v.(*Hook)
if hasUpperLimit && hook.Name > g.Limits[1] {
return false
}
if hook.channel == channel {
match, _ := glob.Match(pattern, hook.Name)
if match {
return iter(hook)
}
}
return true
})
}

func (s *Server) cmdHooks(msg *Message, channel bool) (
res resp.Value, err error,
) {
Expand All @@ -381,18 +394,6 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
return NOMessage, errInvalidNumberOfArguments
}

var hooks []*Hook
for name, hook := range s.hooks {
if hook.channel != channel {
continue
}
match, _ := glob.Match(pattern, name)
if match {
hooks = append(hooks, hook)
}
}
sort.Sort(hooksByName(hooks))

switch msg.OutputType {
case JSON:
buf := &bytes.Buffer{}
Expand All @@ -402,7 +403,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
} else {
buf.WriteString(`"hooks":[`)
}
for i, hook := range hooks {
var i int
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
var ttl = -1
if !hook.expires.IsZero() {
ttl = int(hook.expires.Sub(start).Seconds())
Expand Down Expand Up @@ -444,13 +446,15 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
buf.WriteString(jsonString(meta.Value))
}
buf.WriteString(`}}`)
}
i++
return true
})
buf.WriteString(`],"elapsed":"` +
time.Since(start).String() + "\"}")
return resp.StringValue(buf.String()), nil
case RESP:
var vals []resp.Value
for _, hook := range hooks {
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
var hvals []resp.Value
hvals = append(hvals, resp.StringValue(hook.Name))
hvals = append(hvals, resp.StringValue(hook.Key))
Expand All @@ -471,7 +475,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
}
hvals = append(hvals, resp.ArrayValue(metas))
vals = append(vals, resp.ArrayValue(hvals))
}
return true
})
return resp.ArrayValue(vals), nil
}
return resp.SimpleStringValue(""), nil
Expand Down
26 changes: 13 additions & 13 deletions internal/server/server.go
Expand Up @@ -112,17 +112,17 @@ type Server struct {
lstack []*commandDetails
lives map[*liveBuffer]bool
lcond *sync.Cond
fcup bool // follow caught up
fcuponce bool // follow caught up once
shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log
hooks map[string]*Hook // hook name
hookCross *rtree.RTree // hook spatial tree for "cross" geofences
hookTree *rtree.RTree // hook spatial tree for all
hooksOut map[string]*Hook // hooks with "outside" detection
groupHooks *btree.BTree // hooks that are connected to objects
groupObjects *btree.BTree // objects that are connected to hooks
hookExpires *btree.BTree // queue of all hooks marked for expiration
fcup bool // follow caught up
fcuponce bool // follow caught up once
shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log
hooks *btree.BTree // hook name -- [string]*Hook
hookCross *rtree.RTree // hook spatial tree for "cross" geofences
hookTree *rtree.RTree // hook spatial tree for all
hooksOut *btree.BTree // hooks with "outside" detection -- [string]*Hook
groupHooks *btree.BTree // hooks that are connected to objects
groupObjects *btree.BTree // objects that are connected to hooks
hookExpires *btree.BTree // queue of all hooks marked for expiration

aofconnM map[net.Conn]io.Closer
luascripts *lScriptMap
Expand Down Expand Up @@ -164,8 +164,8 @@ func Serve(opts Options) error {
fcond: sync.NewCond(&sync.Mutex{}),
lives: make(map[*liveBuffer]bool),
lcond: sync.NewCond(&sync.Mutex{}),
hooks: make(map[string]*Hook),
hooksOut: make(map[string]*Hook),
hooks: btree.NewNonConcurrent(byHookName),
hooksOut: btree.NewNonConcurrent(byHookName),
hookCross: &rtree.RTree{},
hookTree: &rtree.RTree{},
aofconnM: make(map[net.Conn]io.Closer),
Expand Down
4 changes: 2 additions & 2 deletions internal/server/stats.go
Expand Up @@ -157,7 +157,7 @@ func (s *Server) basicStats(m map[string]interface{}) {
m["pid"] = os.Getpid()
m["aof_size"] = s.aofsz
m["num_collections"] = s.cols.Len()
m["num_hooks"] = len(s.hooks)
m["num_hooks"] = s.hooks.Len()
sz := 0
s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer).col
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *Server) extStats(m map[string]interface{}) {
// Number of collections in the database
m["tile38_num_collections"] = s.cols.Len()
// Number of hooks in the database
m["tile38_num_hooks"] = len(s.hooks)
m["tile38_num_hooks"] = s.hooks.Len()
// Number of hook groups in the database
m["tile38_num_hook_groups"] = s.groupHooks.Len()
// Number of object groups in the database
Expand Down

0 comments on commit 8829b8f

Please sign in to comment.