forked from fluxcd/flux
/
db.go
127 lines (112 loc) · 2.66 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package sql
import (
"database/sql"
"encoding/json"
_ "github.com/cznic/ql/driver"
_ "github.com/lib/pq"
"github.com/pkg/errors"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/instance"
)
type DB struct {
conn *sql.DB
}
func New(driver, datasource string) (*DB, error) {
conn, err := sql.Open(driver, datasource)
if err != nil {
return nil, err
}
db := &DB{
conn: conn,
}
return db, db.sanityCheck()
}
func (db *DB) UpdateConfig(inst flux.InstanceID, update instance.UpdateFunc) error {
tx, err := db.conn.Begin()
if err != nil {
return err
}
var (
currentConfig instance.Config
confString string
)
switch tx.QueryRow(`SELECT config FROM config WHERE instance = $1`, string(inst)).Scan(&confString) {
case sql.ErrNoRows:
currentConfig = instance.MakeConfig()
case nil:
if err = json.Unmarshal([]byte(confString), ¤tConfig); err != nil {
return err
}
default:
return err
}
newConfig, err := update(currentConfig)
if err != nil {
err2 := tx.Rollback()
if err2 != nil {
return errors.Wrapf(err, "transaction rollback failed: %s", err2)
}
return err
}
newConfigBytes, err := json.Marshal(newConfig)
if err != nil {
return err
}
_, err = tx.Exec(`DELETE FROM config WHERE instance = $1`, string(inst))
if err == nil {
_, err = tx.Exec(`INSERT INTO config (instance, config, stamp) VALUES
($1, $2, now())`, string(inst), string(newConfigBytes))
}
if err == nil {
err = tx.Commit()
}
return err
}
func (db *DB) GetConfig(inst flux.InstanceID) (instance.Config, error) {
var c string
err := db.conn.QueryRow(`SELECT config FROM config WHERE instance = $1`, string(inst)).Scan(&c)
switch err {
case nil:
break
case sql.ErrNoRows:
return instance.MakeConfig(), nil
default:
return instance.Config{}, err
}
var conf instance.Config
return conf, json.Unmarshal([]byte(c), &conf)
}
func (db *DB) All() ([]instance.NamedConfig, error) {
rows, err := db.conn.Query(`SELECT instance, config FROM config`)
if err != nil {
return nil, err
}
defer rows.Close()
instances := []instance.NamedConfig{}
for rows.Next() {
var (
id, confStr string
conf instance.Config
)
err = rows.Scan(&id, &confStr)
if err == nil {
err = json.Unmarshal([]byte(confStr), &conf)
}
if err != nil {
return nil, err
}
instances = append(instances, instance.NamedConfig{
ID: flux.InstanceID(id),
Config: conf,
})
}
return instances, rows.Err()
}
// ---
func (db *DB) sanityCheck() error {
_, err := db.conn.Query(`SELECT instance, config, stamp FROM config LIMIT 1`)
if err != nil {
return errors.Wrap(err, "failed sanity check for config table")
}
return nil
}