Skip to content

Commit

Permalink
Update hook expiration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tidwall committed Sep 12, 2021
1 parent c686b87 commit 83094b2
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 266 deletions.
114 changes: 0 additions & 114 deletions internal/expire/expire.go

This file was deleted.

88 changes: 0 additions & 88 deletions internal/expire/expire_test.go

This file was deleted.

1 change: 1 addition & 0 deletions internal/server/crud.go
Expand Up @@ -508,6 +508,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
server.cols = btree.NewNonConcurrent(byCollectionKey)
server.groupHooks = btree.NewNonConcurrent(byGroupHook)
server.groupObjects = btree.NewNonConcurrent(byGroupObject)
server.hookExpires = btree.NewNonConcurrent(byHookExpires)
server.hooks = make(map[string]*Hook)
server.hooksOut = make(map[string]*Hook)
server.hookTree = &rtree.RTree{}
Expand Down
88 changes: 63 additions & 25 deletions internal/server/expire.go
Expand Up @@ -18,32 +18,70 @@ func (s *Server) backgroundExpiring() {
func() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().UnixNano()
var ids []string
var msgs []*Message
s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer)
ids = col.col.Expired(now, ids[:0])
for _, id := range ids {
msgs = append(msgs, &Message{
Args: []string{"del", col.key, id},
})
}
return true
})
for _, msg := range msgs {
_, d, err := s.cmdDel(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d items\n", len(msgs))
}
now := time.Now()
s.backgroundExpireObjects(now)
s.backgroundExpireHooks(now)
}()
time.Sleep(bgExpireDelay)
}
}

func (s *Server) backgroundExpireObjects(now time.Time) {
nano := now.UnixNano()
var ids []string
var msgs []*Message
s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer)
ids = col.col.Expired(nano, ids[:0])
for _, id := range ids {
msgs = append(msgs, &Message{
Args: []string{"del", col.key, id},
})
}
return true
})
for _, msg := range msgs {
_, d, err := s.cmdDel(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d objects\n", len(msgs))
}

}

func (s *Server) backgroundExpireHooks(now time.Time) {
var msgs []*Message
s.hookExpires.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook)
if h.expires.After(now) {
return false
}
msg := &Message{}
if h.channel {
msg.Args = []string{"delchan", h.Name}
} else {
msg.Args = []string{"delhook", h.Name}
}
msgs = append(msgs, msg)
return true
})

for _, msg := range msgs {
_, d, err := s.cmdDelHook(msg, msg.Args[0] == "delchan")
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d hooks\n", len(msgs))
}
}

0 comments on commit 83094b2

Please sign in to comment.