-
Notifications
You must be signed in to change notification settings - Fork 18
/
advertise.go
261 lines (232 loc) · 9.41 KB
/
advertise.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/
// Package launcher_utils contains utility functions for the [github.com/pelicanplatform/pelican/launcher] package.
//
// It should only be imported by the launchers package
// It should NOT be imported to any server pacakges (origin/cache/registry) or other lower level packages (config/utils/etc)
package launcher_utils
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/token"
"github.com/pelicanplatform/pelican/token_scopes"
"github.com/pelicanplatform/pelican/utils"
)
type directorResponse struct {
Error string `json:"error"`
ApprovalError bool `json:"approval_error"`
}
func doAdvertise(ctx context.Context, servers []server_structs.XRootDServer) {
log.Debugf("About to advertise %d XRootD servers", len(servers))
err := Advertise(ctx, servers)
if err != nil {
log.Warningln("XRootD server advertise failed:", err)
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusCritical, fmt.Sprintf("XRootD server advertise failed: %v", err))
} else {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusOK, "")
}
}
// Launch periodic advertise of xrootd servers (origin and cache) to the director, in the errogroup
func LaunchPeriodicAdvertise(ctx context.Context, egrp *errgroup.Group, servers []server_structs.XRootDServer) error {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusWarning, "First attempt to advertise to the director...")
doAdvertise(ctx, servers)
ticker := time.NewTicker(1 * time.Minute)
egrp.Go(func() error {
for {
select {
case <-ticker.C:
err := Advertise(ctx, servers)
if err != nil {
log.Warningln("XRootD server advertise failed:", err)
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusCritical, fmt.Sprintf("XRootD server failed to advertise to the director: %v", err))
} else {
metrics.SetComponentHealthStatus(metrics.OriginCache_Federation, metrics.StatusOK, "")
}
case <-ctx.Done():
log.Infoln("Periodic advertisement loop has been terminated")
return nil
}
doAdvertise(ctx, servers)
}
})
return nil
}
// Advertise ONCE the xrootd servers (origin and cache) to the director
func Advertise(ctx context.Context, servers []server_structs.XRootDServer) error {
var firstErr error
for _, server := range servers {
err := advertiseInternal(ctx, server)
if firstErr == nil {
firstErr = err
}
}
return firstErr
}
// Get the site name from the registry given a namespace prefix
func getSitenameFromReg(ctx context.Context, prefix string) (sitename string, err error) {
fed, err := config.GetFederation(ctx)
if err != nil {
return
}
if fed.NamespaceRegistrationEndpoint == "" {
err = fmt.Errorf("unable to fetch site name from the registry. Federation.RegistryUrl or Federation.DiscoveryUrl is unset")
return
}
requestUrl, err := url.JoinPath(fed.NamespaceRegistrationEndpoint, "api/v1.0/registry", prefix)
if err != nil {
return
}
res, err := utils.MakeRequest(context.Background(), requestUrl, http.MethodGet, nil, nil)
if err != nil {
return
}
ns := server_structs.Namespace{}
err = json.Unmarshal(res, &ns)
if err != nil {
return
}
sitename = ns.AdminMetadata.SiteName
return
}
func advertiseInternal(ctx context.Context, server server_structs.XRootDServer) error {
name := ""
var err error
// Fetch site name from the registry, if not, fall back to Xrootd.Sitename.
// However, currently the registry only supports registering namespaces, not origins.
// If multiple namespaces fall under the same origin, we will use the first namespace to fetch the site name
if server.GetServerType().IsEnabled(config.OriginType) {
originExports, err := server_utils.GetOriginExports()
if err != nil {
log.Errorf("Failed to get request sitename from the registry. Failed to get exports from the origin server. Will fall back to use Xrootd.Sitename: %v", err)
}
if len(originExports) != 0 {
if len(originExports) > 1 {
log.Warningf("The origin has multiple exports. The sitename will be fetched from the registry using the FederationPrefix of the first export: %s.", originExports[0].FederationPrefix)
}
prefix := originExports[0].FederationPrefix
name, err = getSitenameFromReg(ctx, prefix)
if err != nil {
log.Errorf("Failed to get sitename from the registry. Will fallback to use Xrootd.Sitename: %v", err)
}
} else {
log.Errorf("Failed to get sitename from the registry. The namespace is empty for the %s server. Will fallback to use Xrootd.Sitename", server.GetServerType().String())
}
} else if server.GetServerType().IsEnabled(config.CacheType) {
cachePrefix := "/caches/" + param.Xrootd_Sitename.GetString()
name, err = getSitenameFromReg(ctx, cachePrefix)
if err != nil {
log.Errorf("Failed to get sitename from the registry for the cache. Will fallback to use Xrootd.Sitename: %v", err)
}
}
if name == "" {
log.Infof("Sitename from the registry is empty, fall back to Xrootd.Sitename: %s", param.Xrootd_Sitename.GetString())
name = param.Xrootd_Sitename.GetString()
}
if name == "" {
return errors.New(fmt.Sprintf("%s name isn't set. Please set the name via Xrootd.Sitename", server.GetServerType()))
}
if err = server.GetNamespaceAdsFromDirector(); err != nil {
return errors.Wrap(err, fmt.Sprintf("%s failed to get namespaceAds from the director", server.GetServerType()))
}
serverUrl := param.Origin_Url.GetString()
webUrl := param.Server_ExternalWebUrl.GetString()
serverIssuer, err := config.GetServerIssuerURL()
if err != nil {
return errors.Wrap(err, "failed to get server issuer URL")
}
if server.GetServerType().IsEnabled(config.CacheType) {
serverUrl = param.Cache_Url.GetString()
webUrl = param.Server_ExternalWebUrl.GetString()
}
ad, err := server.CreateAdvertisement(name, serverUrl, webUrl)
if err != nil {
return err
}
body, err := json.Marshal(*ad)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to generate JSON description of %s", server.GetServerType()))
}
fedInfo, err := config.GetFederation(ctx)
if err != nil {
return err
}
directorUrlStr := fedInfo.DirectorEndpoint
if directorUrlStr == "" {
return errors.New("Director endpoint URL is not known")
}
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return errors.Wrap(err, "failed to parse Federation.DirectorURL")
}
directorUrl.Path = "/api/v1.0/director/register" + server.GetServerType().String()
advTokenCfg := token.NewWLCGToken()
advTokenCfg.Lifetime = time.Minute
advTokenCfg.Issuer = serverIssuer
advTokenCfg.AddAudiences(fedInfo.DirectorEndpoint)
advTokenCfg.Subject = "origin"
advTokenCfg.AddScopes(token_scopes.Pelican_Advertise)
// CreateToken also handles validation for us
tok, err := advTokenCfg.CreateToken()
if err != nil {
return errors.Wrap(err, "failed to create director advertisement token")
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, directorUrl.String(), bytes.NewBuffer(body))
if err != nil {
return errors.Wrap(err, "failed to create a POST request for director advertisement")
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+tok)
userAgent := "pelican-" + strings.ToLower(server.GetServerType().String()) + "/" + config.GetVersion()
req.Header.Set("User-Agent", userAgent)
// We should switch this over to use the common transport, but for that to happen
// that function needs to be exported from pelican
tr := config.GetTransport()
client := http.Client{Transport: tr}
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "failed to start the request for director advertisement")
}
defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body)
if resp.StatusCode > 299 {
var respErr directorResponse
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return errors.Wrapf(unmarshalErr, "could not decode the director's response, which responded %v from director advertisement: %s", resp.StatusCode, string(body))
}
if respErr.ApprovalError {
return fmt.Errorf("the director rejected the server advertisement with error: %s. Please contact the administrators of %s for more information.", respErr.Error, fedInfo.NamespaceRegistrationEndpoint)
}
return errors.Errorf("error during director registration: %v", respErr.Error)
}
return nil
}