/
store.go
181 lines (161 loc) · 5.16 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*******************************************************************************
*
* Copyright 2019 Stefan Majewsky <majewsky@gmx.net>
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* this program. If not, see <http://www.gnu.org/licenses/>.
*
*******************************************************************************/
package core
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"time"
"github.com/fsnotify/fsnotify"
"github.com/sapcc/go-bits/logg"
)
//Database contains the contents of Portunus' database. This is what gets
//persisted into the database file.
type Database struct {
Users []User `json:"users,keepempty"`
Groups []Group `json:"groups,keepempty"`
SchemaVersion uint `json:"schema_version,keepempty"`
}
//FileStore is responsible for loading Portunus' database from
//PORTUNUS_SERVER_STATE_DIR, and persisting it when changes are made to it.
//
//The Initializer function is called at most once, only when there is no
//existing database file at the given Path.
type FileStore struct {
Path string
Initializer func() Database
running bool
}
//FileStoreAPI is the interface that the engine uses to interact with the
//FileStore.
type FileStoreAPI struct {
//Whenever the FileStore reads an updated version of the config file, it
//sends the database contents into this channel.
LoadEvents <-chan Database
//Whenever the FileStore reads an updated version of the database from this
//channel, it will persist that state into the database file.
SaveRequests chan<- Database
}
//RunAsync spawns the goroutine for the FileStore, and returns the API that the
//engine uses to interact with it.
func (s *FileStore) RunAsync() *FileStoreAPI {
if s.running {
panic("cannot call FileStore.Run() twice")
}
s.running = true
loadChan := make(chan Database, 1)
saveChan := make(chan Database, 1)
go s.run(loadChan, saveChan)
return &FileStoreAPI{LoadEvents: loadChan, SaveRequests: saveChan}
}
func (s *FileStore) run(loadChan chan<- Database, saveChan <-chan Database) {
//perform initial read of the database
loadChan <- s.loadDB(true)
watcher := s.makeWatcher()
for {
select {
case <-watcher.Events:
//wait for whatever is updating the file to complete
time.Sleep(25 * time.Millisecond)
//load updated version of database from file
loadChan <- s.loadDB(false)
//recreate the watcher (the original file might be gone if it was updated
//by an atomic rename() like we do in saveDB())
s.cleanupWatcher(watcher)
watcher = s.makeWatcher()
case err := <-watcher.Errors:
logg.Error(err.Error())
case db := <-saveChan:
//stop watching while we modify the database file, so as not to pick up
//our own change
s.cleanupWatcher(watcher)
s.saveDB(db)
watcher = s.makeWatcher()
}
}
}
func (s *FileStore) cleanupWatcher(watcher *fsnotify.Watcher) {
err := watcher.Close()
if err != nil {
logg.Fatal("cannot cleanup filesystem watcher: " + err.Error())
}
}
func (s *FileStore) makeWatcher() *fsnotify.Watcher {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logg.Fatal("cannot initialize filesystem watcher: " + err.Error())
}
err = watcher.Add(s.Path)
if err != nil {
logg.Fatal("cannot setup filesystem watch on %s: %s", s.Path, err.Error())
}
return watcher
}
func (s *FileStore) loadDB(allowEmpty bool) (db Database) {
dbContents, err := ioutil.ReadFile(s.Path)
if err != nil {
//initialize empty DB on first run
if os.IsNotExist(err) && allowEmpty {
s.saveDB(s.Initializer())
return s.loadDB(false)
}
logg.Fatal(err.Error())
}
err = json.Unmarshal(dbContents, &db)
if err != nil {
logg.Fatal("cannot load main database: " + err.Error())
}
if db.SchemaVersion != 1 {
logg.Fatal("found DB with schema version %d, but this Portunus only understands schema version 1", db.SchemaVersion)
}
//TODO validate DB (e.g. groups should only contain users that actually exist)
return
}
func (s *FileStore) saveDB(db Database) {
tmpPath := filepath.Join(
filepath.Dir(s.Path),
fmt.Sprintf(".%s.%d", filepath.Base(s.Path), os.Getpid()),
)
//serialize with predictable order to minimize diffs
sort.Slice(db.Groups, func(i, j int) bool {
return db.Groups[i].Name < db.Groups[j].Name
})
sort.Slice(db.Users, func(i, j int) bool {
return db.Users[i].LoginName < db.Users[j].LoginName
})
db.SchemaVersion = 1
dbContents, err := json.Marshal(db)
if err == nil {
var buf bytes.Buffer
err = json.Indent(&buf, dbContents, "", "\t")
dbContents = buf.Bytes()
}
if err == nil {
err = ioutil.WriteFile(tmpPath, dbContents, 0644)
}
if err == nil {
err = os.Rename(tmpPath, s.Path)
}
if err != nil {
logg.Fatal(err.Error())
}
}