forked from ryandotsmith/l2met
/
mem_store.go
65 lines (57 loc) · 1.18 KB
/
mem_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package store
import (
"errors"
"github.com/ryandotsmith/l2met/bucket"
"sync"
"time"
)
type MemStore struct {
sync.Mutex
m map[bucket.Id]*bucket.Bucket
}
func NewMemStore() *MemStore {
return &MemStore{m: make(map[bucket.Id]*bucket.Bucket)}
}
func (s *MemStore) Health() bool {
return true
}
func (m *MemStore) MaxPartitions() uint64 {
return uint64(1)
}
func (m *MemStore) Scan(schedule time.Time) (<-chan *bucket.Bucket, error) {
m.Lock()
//TODO(ryandotsmith): Can we eliminate the magical number?
buckets := make(chan *bucket.Bucket, 1000)
go func(out chan *bucket.Bucket) {
defer m.Unlock()
defer close(out)
for k, v := range m.m {
ready := v.Id.Time.Add(v.Id.Resolution).Add(time.Second)
if !ready.After(schedule) {
delete(m.m, k)
out <- v
}
}
}(buckets)
return buckets, nil
}
func (m *MemStore) Get(b *bucket.Bucket) error {
m.Lock()
defer m.Unlock()
bucket, present := m.m[*b.Id]
if !present {
return errors.New("Bucket not in MemStore.")
}
b = bucket
return nil
}
func (m *MemStore) Put(b *bucket.Bucket) error {
m.Lock()
defer m.Unlock()
if _, present := m.m[*b.Id]; !present {
m.m[*b.Id] = b
} else {
m.m[*b.Id].Add(b)
}
return nil
}