-
Notifications
You must be signed in to change notification settings - Fork 21
/
reaper.go
139 lines (114 loc) · 3.13 KB
/
reaper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package reaper
import (
"log"
"time"
"github.com/boltdb/bolt"
"github.com/yosssi/boltstore/shared"
)
//##############//
//### Public ###//
//##############//
// Run invokes a reap function as a goroutine.
func Run(db *bolt.DB, options Options) (chan<- struct{}, <-chan struct{}) {
options.setDefault()
quitC, doneC := make(chan struct{}), make(chan struct{})
go reap(db, options, quitC, doneC)
return quitC, doneC
}
// Quit terminates the reap goroutine.
func Quit(quitC chan<- struct{}, doneC <-chan struct{}) {
quitC <- struct{}{}
<-doneC
}
//###############//
//### Private ###//
//###############//
func reap(db *bolt.DB, options Options, quitC <-chan struct{}, doneC chan<- struct{}) {
// Create a new ticker
ticker := time.NewTicker(options.CheckInterval)
defer func() {
// Stop the ticker
ticker.Stop()
}()
var prevKey []byte
for {
select {
case <-quitC: // Check if a quit signal is sent.
doneC <- struct{}{}
return
case <-ticker.C: // Check if the ticker fires a signal.
// This slice is a buffer to save all expired session keys.
expiredSessionKeys := make([][]byte, 0)
// Start a bolt read transaction.
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(options.BucketName)
if bucket == nil {
return nil
}
c := bucket.Cursor()
var i int
var isExpired bool
for k, v := c.Seek(prevKey); ; k, v = c.Next() {
// If we hit the end of our sessions then
// exit and start over next time.
if k == nil {
prevKey = nil
return nil
}
i++
// The flag if the session is expired
isExpired = false
session, err := shared.Session(v)
if err != nil {
// Just remove the session with the invalid session data.
// Log the error first.
log.Printf("boltstore: removing session from database with invalid value: %v", err)
isExpired = true
} else if shared.Expired(session) {
isExpired = true
}
if isExpired {
// Copy the byte slice key, because this data is
// not safe outside of this transaction.
temp := make([]byte, len(k))
copy(temp, k)
// Add it to the expired sessios keys slice
expiredSessionKeys = append(expiredSessionKeys, temp)
}
if options.BatchSize == i {
// Store the current key to the previous key.
// Copy the byte slice key, because this data is
// not safe outside of this transaction.
prevKey = make([]byte, len(k))
copy(prevKey, k)
return nil
}
}
})
if err != nil {
log.Printf("boltstore: obtain expired sessions error: %v", err)
}
if len(expiredSessionKeys) > 0 {
// Remove the expired sessions from the database
err = db.Update(func(txu *bolt.Tx) error {
// Get the bucket
b := txu.Bucket(options.BucketName)
if b == nil {
return nil
}
// Remove all expired sessions in the slice
for _, key := range expiredSessionKeys {
err = b.Delete(key)
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Printf("boltstore: remove expired sessions error: %v", err)
}
}
}
}
}