/
partition_memory.go
109 lines (85 loc) · 2.33 KB
/
partition_memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package memory
import (
"github.com/tryfix/kstream/backend"
"github.com/tryfix/log"
"github.com/tryfix/metrics"
"strconv"
"time"
)
type PartitionMemory interface {
backend.Backend
Partitions() []backend.Iterator
}
type partitionMemory struct {
partitionCount int
partitions map[int]backend.Backend
}
func NewPartitionMemoryBackend(partitions int, logger log.Logger, reporter metrics.Reporter) PartitionMemory {
partitionedBackend := &partitionMemory{
partitionCount: partitions,
partitions: make(map[int]backend.Backend),
}
for i := 0; i < partitions; i++ {
conf := NewConfig()
conf.Logger = logger
conf.MetricsReporter = reporter
backend := NewMemoryBackend(conf)
partitionedBackend.partitions[i] = backend
}
return partitionedBackend
}
func (pm *partitionMemory) Name() string {
return `partitioned_memory_backend`
}
func (pm *partitionMemory) Set(key []byte, value []byte, expiry time.Duration) error {
k, err := strconv.Atoi(string(key))
if err != nil {
return err
}
partitionId := k % pm.partitionCount
pm.partitions[partitionId].Set(key, value, expiry)
return nil
}
func (pm *partitionMemory) Get(key []byte) ([]byte, error) {
k, err := strconv.Atoi(string(key))
if err != nil {
return nil, err
}
partitionId := k % pm.partitionCount
return pm.partitions[partitionId].Get(key)
}
func (pm *partitionMemory) RangeIterator(fromKy []byte, toKey []byte) backend.Iterator {
panic("implement me")
}
func (pm *partitionMemory) Iterator() backend.Iterator {
panic("implement me")
}
func (pm *partitionMemory) Delete(key []byte) error {
partitionId, err := strconv.Atoi(string(key))
if err != nil {
return err
}
partitionId = partitionId % pm.partitionCount
return pm.partitions[partitionId].Delete(key)
}
func (m *partitionMemory) Destroy() error { return nil }
func (pm *partitionMemory) SetExpiry(time time.Duration) {}
func (pm *partitionMemory) String() string {
return `partition memory`
}
func (pm *partitionMemory) Persistent() bool {
return false
}
func (pm *partitionMemory) Close() error {
for i := 0; i < pm.partitionCount; i++ {
pm.partitions[i].Close()
}
return nil
}
func (pm *partitionMemory) Partitions() []backend.Iterator {
var iterators []backend.Iterator
for _, partition := range pm.partitions {
iterators = append(iterators, partition.Iterator())
}
return iterators
}