-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkv.go
123 lines (106 loc) · 3.15 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2024 geebytes. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package golayeredcache
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)
type LayeredKeyValueCacheImpl struct {
*BaseLayeredCacheImpl
log *logrus.Logger
keyPrefix string
}
func (lb *LayeredKeyValueCacheImpl) OnMessage(ctx context.Context, from string, message interface{}) error {
XMessage, ok := message.(redis.XMessage)
if !ok {
return fmt.Errorf("type except redis.XMessage,but got %T", message)
}
values := XMessage.Values
if len(values) == 0 {
return nil
}
key, ok := values["key"].(string)
if !ok {
return fmt.Errorf("key is not string, got %T", values["key"])
}
// lb.log.Infof("onMessage:%v", values)
if op, ok := values["op"].(string); ok && op == "delete" {
return lb.DelOnLocal(ctx, key)
}
value := values["value"].(string)
if !ok {
return fmt.Errorf("value is not string, got %T", values["value"])
}
exp, _ := values["expire"].(int64)
expVal := time.Duration(exp) * time.Second
return lb.SetToLocal(ctx, key, []byte(value), expVal)
}
func (lb *LayeredKeyValueCacheImpl) onScan(ctx context.Context, key interface{}) error {
keyStr, ok := key.(string)
if !ok {
return fmt.Errorf("key is not string, got %T", key)
}
ch := lb.Dump(ctx, keyStr)
for v := range ch {
if err, ok := v.(error); ok {
lb.log.Errorf("dump:%v", err)
continue
}
vals, ok := v.([]interface{})
if !ok {
lb.log.Errorf("dump: error type%v", v)
continue
}
if err := lb.Load(ctx, keyStr, vals...); err != nil {
lb.log.Errorf("load:%v", err)
continue
}
}
return nil
}
func (lb *LayeredKeyValueCacheImpl) LoadDump(ctx context.Context) error {
ch := lb.Scan(ctx, lb.keyPrefix, lb.onScan)
for err := range ch {
if err != nil {
lb.log.Error("scan", err)
}
}
return nil
}
func (lc *LayeredKeyValueCacheImpl) Watch(ctx context.Context) <-chan error {
return lc.BaseLayeredCacheImpl.Watch(ctx, lc.OnMessage)
}
func (lc *LayeredKeyValueCacheImpl) UnWatch() error {
return lc.BaseLayeredCacheImpl.UnWatch()
}
func newLayeredKeyValueCacheImpl(layered *BaseLayeredCacheImpl, keyPrefix string, log *logrus.Logger) LayeredKeyValueCache {
return &LayeredKeyValueCacheImpl{
BaseLayeredCacheImpl: layered,
keyPrefix: keyPrefix,
log: log,
}
}
func (lc *LayeredKeyValueCacheImpl) Get(ctx context.Context, key string) ([]byte, error) {
values, err := lc.BaseLayeredCacheImpl.Get(ctx, key)
if err != nil || len(values) == 0 {
return nil, err
}
if val, ok := values[0].([]byte); ok&&len(val)!=0 {
return val, nil
}
if val, ok := values[0].(string); ok&&val!="" {
return []byte(val), nil
}
return nil, fmt.Errorf("type except []byte or string, but got %T", values[0])
}
func (lc *LayeredKeyValueCacheImpl) Set(ctx context.Context, key string, value []byte, exp time.Duration) error {
return lc.BaseLayeredCacheImpl.Set(ctx, key, value, exp)
}
func (lc *LayeredKeyValueCacheImpl) Del(ctx context.Context, key string) error {
return lc.BaseLayeredCacheImpl.Del(ctx, key)
}
// func (lc *LayeredKeyValueCacheImpl)Del()