/
redis.go
128 lines (118 loc) · 3.06 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package dao
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/txchat/dtalk/service/record/pusher/logH"
)
const (
_prefixConnSeq = "conn-seq:%v"
)
func keyConnection(cid string) string {
return fmt.Sprintf(_prefixConnSeq, cid)
}
func connSeqIndexValConvert(tp string, logs []string) (string, error) {
val := fmt.Sprintf("type=%s;", tp)
val += strings.Join(logs, ",")
//check
reg := regexp.MustCompile(`^type=(\S+);`)
if !reg.MatchString(val) {
return val, errors.New("conn seq index convert err")
}
return val, nil
}
func connSeqIndexValParse(val string) ([]string, error) {
reg := regexp.MustCompile(`^type=(\S+);`)
if !reg.MatchString(val) {
return nil, errors.New("conn seq index parse err")
}
//找出type子串
typeCnt := reg.FindStringSubmatch(val)
if len(typeCnt) < 2 {
return nil, errors.New("conn seq index parse err: cnt len < 2")
}
//找出;所在的尾部下标
typeIdx := reg.FindStringIndex(val)
if len(typeIdx) < 2 {
return nil, errors.New("conn seq index parse err: idx len < 2")
}
log := strings.Split(val[typeIdx[1]:], ",")
var ret = make([]string, len(log)+1)
copy(ret[1:], log)
ret[0] = typeCnt[1]
return ret, nil
}
//key:connect id; val:logs id
func (d *Dao) AddConnSeqIndex(cid string, seq int32, item *logH.ConnSeqItem) error {
key := keyConnection(cid)
conn := d.redis.Get()
defer conn.Close()
val, err := json.Marshal(item)
if err != nil {
return err
}
if err := conn.Send("HSET", key, seq, val); err != nil {
d.log.Error().Err(err).Msg(fmt.Sprintf("conn.Send(HSET %s,%d,%s)", key, seq, val))
return err
}
if err := conn.Flush(); err != nil {
d.log.Error().Err(err).Msg("conn.Flush()")
return err
}
if _, err := conn.Receive(); err != nil {
d.log.Error().Err(err).Msg("conn.Receive()")
return err
}
return nil
}
func (d *Dao) GetConnSeqIndex(cid string, seq int32) (*logH.ConnSeqItem, error) {
key := keyConnection(cid)
conn := d.redis.Get()
defer conn.Close()
data, err := redis.String(conn.Do("HGET", key, seq))
if err != nil {
if err == redis.ErrNil {
return nil, nil
}
d.log.Error().Err(err).Msg(fmt.Sprintf("conn.DO(HGET %s, %d)", key, seq))
return nil, err
}
var item logH.ConnSeqItem
err = json.Unmarshal([]byte(data), &item)
if err != nil {
return nil, err
}
//logsStr := ret[1:]
//tp := ret[0]
//var logs = make([]uint64, len(logsStr))
//for i, l := range logsStr {
// log, err := strconv.ParseInt(l, 10, 64)
// if err != nil {
// return "", nil, err
// }
// logs[i] = uint64(log)
//}
//return bizroto.EventType(tp), logs, nil
return &item, nil
}
func (d *Dao) ClearConnSeq(cid string) error {
key := keyConnection(cid)
conn := d.redis.Get()
defer conn.Close()
if err := conn.Send("DEL", key); err != nil {
d.log.Error().Err(err).Msg(fmt.Sprintf("conn.Send(DEL %s)", key))
return err
}
if err := conn.Flush(); err != nil {
d.log.Error().Err(err).Msg("conn.Flush()")
return err
}
if _, err := conn.Receive(); err != nil {
d.log.Error().Err(err).Msg("conn.Receive()")
return err
}
return nil
}