forked from fnproject/fn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_access.go
169 lines (139 loc) · 5.19 KB
/
data_access.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package agent
import (
"context"
"io"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/common/singleflight"
"github.com/fnproject/fn/api/models"
"github.com/patrickmn/go-cache"
)
// DataAccess abstracts the datastore and message queue operations done by the
// agent, so that API nodes and runner nodes can work with the same interface
// but actually operate on the data in different ways (by direct access or by
// mediation through an API node).
type DataAccess interface {
// GetApp abstracts querying the datastore for an app.
GetApp(ctx context.Context, appName string) (*models.App, error)
// GetRoute abstracts querying the datastore for a route within an app.
GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error)
// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
Enqueue(ctx context.Context, mCall *models.Call) error
// Dequeue will query the queue for the next available Call that can be run
// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
Dequeue(ctx context.Context) (*models.Call, error)
// Start will attempt to start the provided Call within an appropriate
// context.
Start(ctx context.Context, mCall *models.Call) error
// Finish will notify the system that the Call has been processed, and
// fulfill the reservation in the queue if the call came from a queue.
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}
// CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
type CachedDataAccess struct {
DataAccess
cache *cache.Cache
singleflight singleflight.SingleFlight
}
func NewCachedDataAccess(da DataAccess) DataAccess {
cda := &CachedDataAccess{
DataAccess: da,
cache: cache.New(5*time.Second, 1*time.Minute),
}
return cda
}
func routeCacheKey(appname, path string) string {
return "r:" + appname + "\x00" + path
}
func appCacheKey(appname string) string {
return "a:" + appname
}
func (da *CachedDataAccess) GetApp(ctx context.Context, appName string) (*models.App, error) {
key := appCacheKey(appName)
app, ok := da.cache.Get(key)
if ok {
return app.(*models.App), nil
}
resp, err := da.singleflight.Do(key,
func() (interface{}, error) {
return da.DataAccess.GetApp(ctx, appName)
})
if err != nil {
return nil, err
}
app = resp.(*models.App)
da.cache.Set(key, app, cache.DefaultExpiration)
return app.(*models.App), nil
}
func (da *CachedDataAccess) GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) {
key := routeCacheKey(appName, routePath)
r, ok := da.cache.Get(key)
if ok {
return r.(*models.Route), nil
}
resp, err := da.singleflight.Do(key,
func() (interface{}, error) {
return da.DataAccess.GetRoute(ctx, appName, routePath)
})
if err != nil {
return nil, err
}
r = resp.(*models.Route)
da.cache.Set(key, r, cache.DefaultExpiration)
return r.(*models.Route), nil
}
type directDataAccess struct {
mq models.MessageQueue
ds models.Datastore
ls models.LogStore
}
func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess {
da := &directDataAccess{
mq: mq,
ds: ds,
ls: ls,
}
return da
}
func (da *directDataAccess) GetApp(ctx context.Context, appName string) (*models.App, error) {
return da.ds.GetApp(ctx, appName)
}
func (da *directDataAccess) GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error) {
return da.ds.GetRoute(ctx, appName, routePath)
}
func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
_, err := da.mq.Push(ctx, mCall)
return err
// TODO: Insert a call in the datastore with the 'queued' state
}
func (da *directDataAccess) Dequeue(ctx context.Context) (*models.Call, error) {
return da.mq.Reserve(ctx)
}
func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error {
// TODO Access datastore and try a Compare-And-Swap to set the call to
// 'running'. If it fails, delete the message from the MQ and return an
// error. If it is successful, don't do anything - the message will be
// removed when the call Finish'es.
// At the moment we don't have the queued/running/finished mechanics so we
// remove the message here.
return da.mq.Delete(ctx, mCall)
}
func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error {
// this means that we could potentially store an error / timeout status for a
// call that ran successfully [by a user's perspective]
// TODO: this should be update, really
if err := da.ds.InsertCall(ctx, mCall); err != nil {
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
// note: Not returning err here since the job could have already finished successfully.
}
if err := da.ls.InsertLog(ctx, mCall.AppName, mCall.ID, stderr); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully.
}
if async {
// XXX (reed): delete MQ message, eventually
// YYY (hhexo): yes, once we have the queued/running/finished mechanics
// return da.mq.Delete(ctx, mCall)
}
return nil
}