/
storagemgr.go
82 lines (71 loc) · 2.09 KB
/
storagemgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package gmcache
import (
"hash/crc32"
"sync/atomic"
"time"
)
//The implementation of IStorage
type StorageManager struct {
bucketNum int
buckets []*Storage
memoryLimit int64
memoryUsed int64
cleanInterval time.Duration
memChangedChan chan int64
stop chan struct{}
}
//memoryLimit unit is in byte
func NewStorageManager(bucketNum int, memoryLimit int64, cleanInterval time.Duration) *StorageManager {
memMgr := &StorageManager{
bucketNum: bucketNum,
buckets: make([]*Storage, bucketNum),
memoryLimit: memoryLimit,
memoryUsed: 0,
cleanInterval: cleanInterval,
memChangedChan: make(chan int64, 1000),
stop: make(chan struct{}),
}
for i := 0; i < bucketNum; i++ {
memMgr.buckets[i] = NewStorage(memMgr.memChangedChan)
}
return memMgr
}
func (this *StorageManager) mapToIndex(key string) int {
return int(crc32.ChecksumIEEE([]byte(key))) % this.bucketNum
}
func (this *StorageManager) findStorage(key string) *Storage {
return this.buckets[this.mapToIndex(key)]
}
func (this *StorageManager) Set(key string, value []byte, ttl time.Duration) error {
if (int64(len(value)*2) + atomic.LoadInt64(&this.memoryUsed)) > this.memoryLimit {
return OUT_OF_MEMORY_LIMIT_ERROR
}
return this.findStorage(key).Set(key, value, ttl)
}
func (this *StorageManager) Get(key string) (*KVItem, error) {
return this.findStorage(key).Get(key)
}
func (this *StorageManager) Delete(key string) error {
return this.findStorage(key).Delete(key)
}
func (this *StorageManager) Run() {
cleanTicker := time.NewTicker(this.cleanInterval)
for {
select {
case <-cleanTicker.C:
for i := 0; i < this.bucketNum; i++ {
deletedBytes := this.buckets[i].DeleteExpiredKeyRandom()
atomic.AddInt64(&this.memoryUsed, -deletedBytes)
//logger.Debug("used bytes:", atomic.LoadInt64(&this.memoryUsed))
}
case deltaBytes := <-this.memChangedChan:
atomic.AddInt64(&this.memoryUsed, deltaBytes)
//logger.Debug("used bytes:", atomic.LoadInt64(&this.memoryUsed))
case <-this.stop:
return
}
}
}
func (this *StorageManager) Stop() {
close(this.stop)
}