/
postgresql.go
251 lines (232 loc) · 6.07 KB
/
postgresql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// Package postgresql implements a micro Store backed by sql
package postgresql
import (
"database/sql"
"fmt"
"strings"
"time"
"unicode"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
)
// DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided.
const DefaultNamespace = "micro"
type sqlStore struct {
db *sql.DB
table string
options.Options
}
// List all the known records
func (s *sqlStore) List() ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
rows, err := q.Query()
if err != nil {
if err == sql.ErrNoRows {
return records, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return records, rowErr
}
if err := rows.Err(); err != nil {
return records, err
}
return records, nil
}
// Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return nil, err
}
var records []*store.Record
var timehelper pq.NullTime
for _, key := range keys {
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}
}
return records, nil
}
// Write records
func (s *sqlStore) Write(rec ...*store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
DO UPDATE
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table))
if err != nil {
return err
}
for _, r := range rec {
var err error
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}
}
return nil
}
// Delete records with keys
func (s *sqlStore) Delete(keys ...string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table))
if err != nil {
return err
}
for _, key := range keys {
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}
}
return nil
}
func (s *sqlStore) initDB(options options.Options) error {
// Get the store.namespace option, or use sql.DefaultNamespace
namespaceOpt, found := options.Values().Get("store.namespace")
if !found {
s.table = DefaultNamespace
} else {
if namespace, ok := namespaceOpt.(string); ok {
s.table = namespace
} else {
return errors.New("store.namespace option must be a string")
}
}
// Create "micro" schema
schema, err := s.db.Prepare("CREATE SCHEMA IF NOT EXISTS micro ;")
if err != nil {
return err
}
_, err = schema.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create Schema")
}
// Create a table for the Store namespace
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s
(
key text COLLATE "default" NOT NULL,
value bytea,
expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, s.table, s.table))
if err != nil {
return errors.Wrap(err, "SQL statement preparation failed")
}
_, err = tableq.Exec()
if err != nil {
return errors.Wrap(err, "Couldn't create table")
}
return nil
}
// New returns a new micro Store backed by sql
func New(opts ...options.Option) (store.Store, error) {
options := options.NewOptions(opts...)
driver, dataSourceName, err := validateOptions(options)
if err != nil {
return nil, err
}
if !strings.Contains(dataSourceName, " ") {
dataSourceName = fmt.Sprintf("host=%s", dataSourceName)
}
db, err := sql.Open(driver, dataSourceName)
if err != nil {
return nil, err
}
if err := db.Ping(); err != nil {
return nil, err
}
s := &sqlStore{
db: db,
}
return s, s.initDB(options)
}
// validateOptions checks whether the provided options are valid, then returns the driver
// and data source name.
func validateOptions(options options.Options) (driver, dataSourceName string, err error) {
driverOpt, found := options.Values().Get("store.sql.driver")
if !found {
return "", "", errors.New("No store.sql.driver option specified")
}
nodesOpt, found := options.Values().Get("store.nodes")
if !found {
return "", "", errors.New("No store.nodes option specified (expected a database connection string)")
}
driver, ok := driverOpt.(string)
if !ok {
return "", "", errors.New("store.sql.driver option must be a string")
}
nodes, ok := nodesOpt.([]string)
if !ok {
return "", "", errors.New("store.nodes option must be a []string")
}
if len(nodes) != 1 {
return "", "", errors.New("expected only 1 store.nodes option")
}
namespaceOpt, found := options.Values().Get("store.namespace")
if found {
namespace, ok := namespaceOpt.(string)
if !ok {
return "", "", errors.New("store.namespace must me a string")
}
for _, r := range namespace {
if !unicode.IsLetter(r) {
return "", "", errors.New("store.namespace must only contain letters")
}
}
}
return driver, nodes[0], nil
}