forked from motiv-labs/janus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongodb_repository.go
160 lines (137 loc) · 4.13 KB
/
mongodb_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package api
import (
"context"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
const (
collectionName string = "api_specs"
)
// MongoRepository represents a mongodb repository
type MongoRepository struct {
// TODO: we need to expose this so the plugins can use the same session. We should abstract the session and provide
// the plugins with a simple interface to search, insert, update and remove data from whatever backend implementation
Session *mgo.Session
refreshTime time.Duration
}
// NewMongoAppRepository creates a mongo API definition repo
func NewMongoAppRepository(dsn string, refreshTime time.Duration) (*MongoRepository, error) {
log.WithField("dsn", dsn).Debug("Trying to connect to MongoDB...")
session, err := mgo.Dial(dsn)
if err != nil {
return nil, errors.Wrap(err, "could not connect to mongodb")
}
log.Debug("Connected to MongoDB")
session.SetMode(mgo.Monotonic, true)
return &MongoRepository{Session: session, refreshTime: refreshTime}, nil
}
// Close terminates the session. It's a runtime error to use a session
// after it has been closed.
func (r *MongoRepository) Close() error {
r.Session.Close()
return nil
}
// Listen watches for changes on the configuration
func (r *MongoRepository) Listen(ctx context.Context, cfgChan <-chan ConfigurationMessage) {
go func() {
log.Debug("Listening for changes on the provider...")
for {
select {
case cfg := <-cfgChan:
switch cfg.Operation {
case AddedOperation:
err := r.add(cfg.Configuration)
if err != nil {
log.WithError(err).Error("Could not add the configuration on the provider")
}
case UpdatedOperation:
err := r.add(cfg.Configuration)
if err != nil {
log.WithError(err).Error("Could not update the configuration on the provider")
}
case RemovedOperation:
err := r.remove(cfg.Configuration.Name)
if err != nil {
log.WithError(err).Error("Could not remove the configuration from the provider")
}
}
case <-ctx.Done():
return
}
}
}()
}
// Watch watches for changes on the database
func (r *MongoRepository) Watch(ctx context.Context, cfgChan chan<- ConfigurationChanged) {
t := time.NewTicker(r.refreshTime)
go func(refreshTicker *time.Ticker) {
defer refreshTicker.Stop()
log.Debug("Watching Provider...")
for {
select {
case <-refreshTicker.C:
defs, err := r.FindAll()
if err != nil {
log.WithError(err).Error("Failed to get configurations on watch")
continue
}
cfgChan <- ConfigurationChanged{
Configurations: &Configuration{Definitions: defs},
}
case <-ctx.Done():
return
}
}
}(t)
}
// FindAll fetches all the API definitions available
func (r *MongoRepository) FindAll() ([]*Definition, error) {
var result []*Definition
session, coll := r.getSession()
defer session.Close()
err := coll.Find(nil).All(&result)
if err != nil {
return nil, err
}
return result, nil
}
// Add adds an API definition to the repository
func (r *MongoRepository) add(definition *Definition) error {
session, coll := r.getSession()
defer session.Close()
isValid, err := definition.Validate()
if false == isValid && err != nil {
log.WithError(err).Error("Validation errors")
return err
}
_, err = coll.Upsert(bson.M{"name": definition.Name}, definition)
if err != nil {
log.WithField("name", definition.Name).Error("There was an error adding the resource")
return err
}
log.WithField("name", definition.Name).Debug("Resource added")
return nil
}
// Remove removes an API definition from the repository
func (r *MongoRepository) remove(name string) error {
session, coll := r.getSession()
defer session.Close()
err := coll.Remove(bson.M{"name": name})
if err != nil {
if err == mgo.ErrNotFound {
return ErrAPIDefinitionNotFound
}
log.WithField("name", name).Error("There was an error removing the resource")
return err
}
log.WithField("name", name).Debug("Resource removed")
return nil
}
func (r *MongoRepository) getSession() (*mgo.Session, *mgo.Collection) {
session := r.Session.Copy()
coll := session.DB("").C(collectionName)
return session, coll
}