forked from motiv-labs/janus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
file_repository.go
123 lines (103 loc) · 3.14 KB
/
file_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package api
import (
"context"
"encoding/json"
"io/ioutil"
"path/filepath"
"strings"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// FileSystemRepository represents a mongodb repository
type FileSystemRepository struct {
*InMemoryRepository
watcher *fsnotify.Watcher
}
// Type used for JSON.Unmarshaller
type definitionList struct {
defs []*Definition
}
// NewFileSystemRepository creates a mongo country repo
func NewFileSystemRepository(dir string) (*FileSystemRepository, error) {
repo := FileSystemRepository{InMemoryRepository: NewInMemoryRepository()}
// Grab json files from directory
files, err := ioutil.ReadDir(dir)
if nil != err {
return nil, err
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, errors.Wrap(err, "failed to create a file system watcher")
}
repo.watcher = watcher
for _, f := range files {
if strings.Contains(f.Name(), ".json") {
filePath := filepath.Join(dir, f.Name())
logger := log.WithField("path", filePath)
appConfigBody, err := ioutil.ReadFile(filePath)
if err != nil {
logger.WithError(err).Error("Couldn't load the api definition file")
return nil, err
}
err = repo.watcher.Add(filePath)
if err != nil {
logger.WithError(err).Error("Couldn't load the api definition file")
return nil, err
}
definition := repo.parseDefinition(appConfigBody)
for _, v := range definition.defs {
if err = repo.add(v); err != nil {
logger.WithField("name", v.Name).WithError(err).Error("Failed during add definition to the repository")
return nil, err
}
}
}
}
return &repo, nil
}
// Close terminates the session. It's a runtime error to use a session
// after it has been closed.
func (r *FileSystemRepository) Close() error {
return r.watcher.Close()
}
// Watch watches for changes on the database
func (r *FileSystemRepository) Watch(ctx context.Context, cfgChan chan<- ConfigurationChanged) {
go func() {
for {
select {
case event := <-r.watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
body, err := ioutil.ReadFile(event.Name)
if err != nil {
log.WithError(err).Error("Couldn't load the api definition file")
continue
}
cfgChan <- ConfigurationChanged{
Configurations: &Configuration{Definitions: r.parseDefinition(body).defs},
}
}
case err := <-r.watcher.Errors:
log.WithError(err).Error("error received from file system notify")
return
case <-ctx.Done():
return
}
}
}()
}
func (r *FileSystemRepository) parseDefinition(apiDef []byte) definitionList {
appConfigs := definitionList{}
// Try unmarshalling as if json is an unnamed Array of multiple definitions
if err := json.Unmarshal(apiDef, &appConfigs); err != nil {
// Try unmarshalling as if json is a single Definition
appConfigs.defs = append(appConfigs.defs, NewDefinition())
if err := json.Unmarshal(apiDef, &appConfigs.defs[0]); err != nil {
log.WithError(err).Error("[RPC] --> Couldn't unmarshal api configuration")
}
}
return appConfigs
}
func (d *definitionList) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &d.defs)
}