-
Notifications
You must be signed in to change notification settings - Fork 10
/
cachedsql.go
162 lines (135 loc) · 5.11 KB
/
cachedsql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package sqlc
import (
"context"
"database/sql"
"time"
"github.com/teamgram/marmota/pkg/stores/sqlx"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/syncx"
)
// see doc/sql-cache.md
const cacheSafeGapBetweenIndexAndPrimary = time.Second * 5
var (
// ErrNotFound is an alias of sqlx.ErrNotFound.
ErrNotFound = sqlx.ErrNotFound
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
singleFlights = syncx.NewSingleFlight()
stats = cache.NewStat("sqlc")
)
type (
// ExecFn defines the sql exec method.
ExecFn func(ctx context.Context, conn *sqlx.DB) (int64, int64, error)
// IndexQueryFn defines the query method that based on unique indexes.
IndexQueryFn func(ctx context.Context, conn *sqlx.DB, v interface{}) (interface{}, error)
// PrimaryQueryFn defines the query method that based on primary keys.
PrimaryQueryFn func(ctx context.Context, conn *sqlx.DB, v, primary interface{}) error
// QueryFn defines the query method.
QueryFn func(ctx context.Context, conn *sqlx.DB, v interface{}) error
KeysQueryFn func(ctx context.Context, conn *sqlx.DB, keys ...string) (map[string]interface{}, error)
CacheFn func(k, v string) (interface{}, error)
// A CachedConn is a DB connection with cache capability.
CachedConn struct {
db *sqlx.DB
cache cache.Cache
}
)
// NewConn returns a CachedConn with a redis cluster cache.
func NewConn(db *sqlx.DB, c cache.CacheConf, opts ...cache.Option) CachedConn {
cc := cache.New(c, singleFlights, stats, sql.ErrNoRows, opts...)
return NewConnWithCache(db, cc)
}
// NewConnWithCache returns a CachedConn with a custom cache.
func NewConnWithCache(db *sqlx.DB, c cache.Cache) CachedConn {
return CachedConn{
db: db,
cache: c,
}
}
// NewNodeConn returns a CachedConn with a redis node cache.
func NewNodeConn(db *sqlx.DB, rds *redis.Redis, opts ...cache.Option) CachedConn {
c := cache.NewNode(rds, singleFlights, stats, sql.ErrNoRows, opts...)
return NewConnWithCache(db, c)
}
// DelCache deletes cache with keys.
func (cc CachedConn) DelCache(ctx context.Context, keys ...string) error {
return cc.cache.DelCtx(ctx, keys...)
}
// GetCache unmarshals cache with given key into v.
func (cc CachedConn) GetCache(ctx context.Context, key string, v interface{}) error {
return cc.cache.GetCtx(ctx, key, v)
}
// Exec runs given exec on given keys, and returns execution result.
func (cc CachedConn) Exec(ctx context.Context, exec ExecFn, keys ...string) (lastInsertId, rowsAffected int64, err error) {
lastInsertId, rowsAffected, err = exec(ctx, cc.db)
if err != nil {
return
}
err = cc.DelCache(ctx, keys...)
return
}
// ExecNoCache runs exec with given sql statement, without affecting cache.
func (cc CachedConn) ExecNoCache(ctx context.Context, q string, args ...interface{}) (
sql.Result, error) {
return cc.db.Exec(ctx, q, args...)
}
// QueryRow unmarshals into v with given key and query func.
func (cc CachedConn) QueryRow(ctx context.Context, v interface{}, key string, query QueryFn) error {
return cc.cache.TakeCtx(ctx, v, key, func(v interface{}) error {
return query(ctx, cc.db, v)
})
}
// QueryRows unmarshals into v with given key and query func.
func (cc CachedConn) QueryRows(ctx context.Context, query KeysQueryFn, calVal CacheFn, keys ...string) error {
return cc.cache.TakesCtx(
ctx,
func(keys ...string) (map[string]interface{}, error) {
return query(ctx, cc.db, keys...)
},
calVal,
keys...)
}
// QueryRowIndex unmarshals into v with given key.
func (cc CachedConn) QueryRowIndex(ctx context.Context, v interface{}, key string,
keyer func(primary interface{}) string, indexQuery IndexQueryFn,
primaryQuery PrimaryQueryFn) error {
var primaryKey interface{}
var found bool
if err := cc.cache.TakeWithExpireCtx(ctx, &primaryKey, key,
func(val interface{}, expire time.Duration) (err error) {
primaryKey, err = indexQuery(ctx, cc.db, v)
if err != nil {
return
}
found = true
return cc.cache.SetWithExpireCtx(ctx, keyer(primaryKey), v,
expire+cacheSafeGapBetweenIndexAndPrimary)
}); err != nil {
return err
}
if found {
return nil
}
return cc.cache.TakeCtx(ctx, v, keyer(primaryKey), func(v interface{}) error {
return primaryQuery(ctx, cc.db, v, primaryKey)
})
}
// QueryRowNoCache unmarshals into v with given statement.
func (cc CachedConn) QueryRowNoCache(ctx context.Context, v interface{}, q string,
args ...interface{}) error {
return cc.db.QueryRow(ctx, v, q, args...)
}
// QueryRowsNoCache unmarshals into v with given statement.
// It doesn't use cache, because it might cause consistency problem.
func (cc CachedConn) QueryRowsNoCache(ctx context.Context, v interface{}, q string,
args ...interface{}) error {
return cc.db.QueryRows(ctx, v, q, args...)
}
// SetCache sets v into cache with given key.
func (cc CachedConn) SetCache(ctx context.Context, key string, val interface{}) error {
return cc.cache.SetCtx(ctx, key, val)
}
// Transact runs given fn in transaction mode.
func (cc CachedConn) Transact(ctx context.Context, fn func(c *sqlx.Tx) error) error {
return cc.db.Transact(ctx, fn)
}