-
Notifications
You must be signed in to change notification settings - Fork 156
/
handleProfile.go
287 lines (261 loc) · 10.3 KB
/
handleProfile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// Copyright (c) 2021 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0
package zedagent
import (
"fmt"
"net/http"
"net/url"
"path/filepath"
"strings"
"time"
zconfig "github.com/lf-edge/eve-api/go/config"
"github.com/lf-edge/eve-api/go/profile"
"github.com/lf-edge/eve/pkg/pillar/flextimer"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/zedcloud"
"google.golang.org/protobuf/proto"
)
const (
defaultLocalServerPort = "8888"
profileURLPath = "/api/v1/local_profile"
savedLocalProfileFile = "lastlocalprofile"
)
// makeLocalServerBaseURL constructs local server URL without path.
func makeLocalServerBaseURL(localServerAddr string) (string, error) {
localServerURL := fmt.Sprintf("http://%s", localServerAddr)
u, err := url.Parse(localServerURL)
if err != nil {
return "", fmt.Errorf("url.Parse: %s", err)
}
if u.Port() == "" {
localServerURL = fmt.Sprintf("%s:%s", localServerURL, defaultLocalServerPort)
}
return localServerURL, nil
}
// Run a periodic fetch of the currentProfile from localServer
func localProfileTimerTask(handleChannel chan interface{}, getconfigCtx *getconfigContext) {
ctx := getconfigCtx.zedagentCtx
// use ConfigInterval as localProfileInterval
localProfileInterval := ctx.globalConfig.GlobalValueInt(types.ConfigInterval)
interval := time.Duration(localProfileInterval) * time.Second
max := float64(interval)
min := max * 0.3
ticker := flextimer.NewRangeTicker(time.Duration(min),
time.Duration(max))
// Return handle to caller
handleChannel <- ticker
log.Functionf("localProfileTimerTask: waiting for localProfileTrigger")
//wait for the first trigger comes from parseProfile to have information about localProfileServer
<-getconfigCtx.sideController.localProfileTrigger
log.Functionf("localProfileTimerTask: waiting for localProfileTrigger done")
//trigger again to pass into loop
triggerGetLocalProfile(getconfigCtx)
wdName := agentName + "currentProfile"
// Run a periodic timer so we always update StillRunning
stillRunning := time.NewTicker(25 * time.Second)
ctx.ps.StillRunning(wdName, warningTime, errorTime)
ctx.ps.RegisterFileWatchdog(wdName)
for {
select {
case <-getconfigCtx.sideController.localProfileTrigger:
start := time.Now()
profileStateMachine(getconfigCtx, false)
ctx.ps.CheckMaxTimeTopic(wdName, "getLocalProfileConfigTrigger", start,
warningTime, errorTime)
case <-ticker.C:
start := time.Now()
profileStateMachine(getconfigCtx, false)
ctx.ps.CheckMaxTimeTopic(wdName, "getLocalProfileConfigTimer", start,
warningTime, errorTime)
case <-stillRunning.C:
}
ctx.ps.StillRunning(wdName, warningTime, errorTime)
}
}
func parseLocalProfile(localProfileBytes []byte) (*profile.LocalProfile, error) {
var localProfile = &profile.LocalProfile{}
err := proto.Unmarshal(localProfileBytes, localProfile)
if err != nil {
return nil, fmt.Errorf("unmarshalling failed: %v", err)
}
return localProfile, nil
}
// read saved local profile in case of particular reboot reason
func readSavedLocalProfile(getconfigCtx *getconfigContext) (*profile.LocalProfile, error) {
localProfileMessage, ts, err := readSavedConfig(
filepath.Join(checkpointDirname, savedLocalProfileFile))
if err != nil {
return nil, fmt.Errorf("readSavedLocalProfile: %v", err)
}
if localProfileMessage != nil {
log.Noticef("Using saved local profile dated %s",
ts.Format(time.RFC3339Nano))
return parseLocalProfile(localProfileMessage)
}
return nil, nil
}
// getLocalProfileConfig connects to local profile server to fetch the current profile
func getLocalProfileConfig(getconfigCtx *getconfigContext, localServerURL string) (*profile.LocalProfile, error) {
log.Functionf("getLocalProfileConfig(%s)", localServerURL)
if !getconfigCtx.sideController.localServerMap.upToDate {
err := updateLocalServerMap(getconfigCtx, localServerURL)
if err != nil {
return nil, fmt.Errorf("getLocalProfileConfig: updateLocalServerMap: %v", err)
}
// Make sure HasLocalServer is set correctly for the AppInstanceConfig
updateHasLocalServer(getconfigCtx)
}
srvMap := getconfigCtx.sideController.localServerMap.servers
if len(srvMap) == 0 {
return nil, fmt.Errorf(
"getLocalProfileConfig: cannot find any configured apps for localServerURL: %s",
localServerURL)
}
var errList []string
for bridgeName, servers := range srvMap {
for _, srv := range servers {
fullURL := srv.localServerAddr + profileURLPath
localProfile := &profile.LocalProfile{}
resp, err := zedcloud.SendLocalProto(
zedcloudCtx, fullURL, bridgeName, srv.bridgeIP, nil, localProfile)
if err != nil {
errList = append(errList, fmt.Sprintf("SendLocal: %s", err))
continue
}
if resp.StatusCode != http.StatusOK {
errList = append(errList, fmt.Sprintf("SendLocalProto: wrong response status code: %d",
resp.StatusCode))
continue
}
if localProfile.GetServerToken() != getconfigCtx.sideController.profileServerToken {
errList = append(errList,
fmt.Sprintf("invalid token submitted by local server (%s)", localProfile.GetServerToken()))
continue
}
return localProfile, nil
}
}
return nil, fmt.Errorf("getLocalProfileConfig: all attempts failed: %s", strings.Join(errList, ";"))
}
// saveOrTouchReceivedLocalProfile updates modification time of received LocalProfile in case of no changes
// or updates content of received LocalProfile in case of changes or no checkpoint file
func saveOrTouchReceivedLocalProfile(getconfigCtx *getconfigContext, localProfile *profile.LocalProfile) {
if getconfigCtx.sideController.localProfile == localProfile.GetLocalProfile() &&
getconfigCtx.sideController.profileServerToken == localProfile.GetServerToken() &&
existsSavedConfig(savedLocalProfileFile) {
touchSavedConfig(savedLocalProfileFile)
return
}
contents, err := proto.Marshal(localProfile)
if err != nil {
log.Errorf("saveOrTouchReceivedLocalProfile Marshalling failed: %s", err)
return
}
saveConfig(savedLocalProfileFile, contents)
return
}
// parseProfile process local and global profile configuration
// must be called before processing of app instances from config
func parseProfile(ctx *getconfigContext, config *zconfig.EdgeDevConfig) {
log.Functionf("parseProfile start: globalProfile: %s localProfile: %s",
ctx.sideController.globalProfile, ctx.sideController.localProfile)
if ctx.sideController.globalProfile != config.GlobalProfile {
log.Noticef("parseProfile: GlobalProfile changed from %s to %s",
ctx.sideController.globalProfile, config.GlobalProfile)
ctx.sideController.globalProfile = config.GlobalProfile
}
ctx.sideController.profileServerToken = config.ProfileServerToken
if ctx.sideController.localProfileServer != config.LocalProfileServer {
log.Noticef("parseProfile: LocalProfileServer changed from %s to %s",
ctx.sideController.localProfileServer, config.LocalProfileServer)
ctx.sideController.localProfileServer = config.LocalProfileServer
triggerGetLocalProfile(ctx)
triggerRadioPOST(ctx)
updateLocalAppInfoTicker(ctx, false)
triggerLocalAppInfoPOST(ctx)
updateLocalDevInfoTicker(ctx, false)
triggerLocalDevInfoPOST(ctx)
ctx.sideController.lpsThrottledLocation = false
}
profileStateMachine(ctx, true)
log.Functionf("parseProfile done globalProfile: %s currentProfile: %s",
ctx.sideController.globalProfile, ctx.sideController.currentProfile)
}
// determineCurrentProfile return current profile based on localProfile, globalProfile
func determineCurrentProfile(ctx *getconfigContext) string {
if ctx.sideController.localProfile == "" {
return ctx.sideController.globalProfile
}
return ctx.sideController.localProfile
}
// triggerGetLocalProfile notifies task to reload local profile from profileServer
func triggerGetLocalProfile(ctx *getconfigContext) {
log.Functionf("triggerGetLocalProfile")
select {
case ctx.sideController.localProfileTrigger <- Notify{}:
default:
}
}
// run state machine to handle changes to globalProfile, localProfileServer,
// or to do periodic fetch of the local profile
// If skipFetch is set we do not look for an update from a localProfileServer
// but keep the current localProfile
func profileStateMachine(ctx *getconfigContext, skipFetch bool) {
localProfile := getLocalProfile(ctx, skipFetch)
if ctx.sideController.localProfile != localProfile {
log.Noticef("local profile changed from %s to %s",
ctx.sideController.localProfile, localProfile)
ctx.sideController.localProfile = localProfile
}
currentProfile := determineCurrentProfile(ctx)
if ctx.sideController.currentProfile != currentProfile {
log.Noticef("current profile changed from %s to %s",
ctx.sideController.currentProfile, currentProfile)
ctx.sideController.currentProfile = currentProfile
publishZedAgentStatus(ctx)
}
}
// getLocalProfile returns the local profile to use, and cleans up ctx and
// checkpoint when the local profile server has been removed. If skipCheck
// is not set it will query the local profile server.
// It returns the last known value until it gets a response from the server
// or localProfileServer is cleared.
func getLocalProfile(ctx *getconfigContext, skipFetch bool) string {
localProfileServer := ctx.sideController.localProfileServer
if localProfileServer == "" {
if ctx.sideController.localProfile != "" {
log.Noticef("clearing localProfile checkpoint since no server")
cleanSavedConfig(savedLocalProfileFile)
}
return ""
}
if skipFetch {
return ctx.sideController.localProfile
}
localServerURL, err := makeLocalServerBaseURL(localProfileServer)
if err != nil {
log.Errorf("getLocalProfile: makeLocalServerBaseURL: %s", err)
return ""
}
localProfileConfig, err := getLocalProfileConfig(ctx, localServerURL)
if err != nil {
log.Errorf("getLocalProfile: getLocalProfileConfig: %s", err)
// Return last known value
return ctx.sideController.localProfile
}
localProfile := localProfileConfig.GetLocalProfile()
saveOrTouchReceivedLocalProfile(ctx, localProfileConfig)
return localProfile
}
// processSavedProfile reads saved local profile and set it
func processSavedProfile(ctx *getconfigContext) {
localProfile, err := readSavedLocalProfile(ctx)
if err != nil {
log.Functionf("processSavedProfile: readSavedLocalProfile %s", err)
return
}
if localProfile != nil {
log.Noticef("starting with localProfile %s", localProfile.LocalProfile)
ctx.sideController.localProfile = localProfile.LocalProfile
}
}