-
Notifications
You must be signed in to change notification settings - Fork 18
/
director.go
305 lines (271 loc) · 10.5 KB
/
director.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/
package client
import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"path"
"sort"
"strconv"
"strings"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/pelicanplatform/pelican/config"
namespaces "github.com/pelicanplatform/pelican/namespaces"
"github.com/pelicanplatform/pelican/utils"
)
type directorResponse struct {
Error string `json:"error"`
}
// Given the Director response, create the ordered list of caches
// and store it as namespace.SortedDirectorCaches
func CreateNsFromDirectorResp(dirResp *http.Response) (namespace namespaces.Namespace, err error) {
pelicanNamespaceHdr := dirResp.Header.Values("X-Pelican-Namespace")
if len(pelicanNamespaceHdr) == 0 {
err = errors.New("Pelican director did not include mandatory X-Pelican-Namespace header in response")
return
}
xPelicanNamespace := utils.HeaderParser(pelicanNamespaceHdr[0])
namespace.Path = xPelicanNamespace["namespace"]
namespace.UseTokenOnRead, _ = strconv.ParseBool(xPelicanNamespace["require-token"])
namespace.ReadHTTPS, _ = strconv.ParseBool(xPelicanNamespace["readhttps"])
namespace.DirListHost = xPelicanNamespace["collections-url"]
xPelicanAuthorization := []string{} // map of header to x - single entry - want to create an array for issuer
if len(dirResp.Header.Values("X-Pelican-Authorization")) > 0 {
//For each entry,(which is an array of issuer=0)
//So it's a map entry - HeaderParser returns a max entry
//We want to appen the value
for _, authEntry := range dirResp.Header.Values("X-Pelican-Authorization") {
parsedEntry := utils.HeaderParser(authEntry)
xPelicanAuthorization = append(xPelicanAuthorization, parsedEntry["issuer"])
}
namespace.Issuer = xPelicanAuthorization
}
var xPelicanTokenGeneration map[string]string
if len(dirResp.Header.Values("X-Pelican-Token-Generation")) > 0 {
xPelicanTokenGeneration = utils.HeaderParser(dirResp.Header.Values("X-Pelican-Token-Generation")[0])
// Instantiate the cred gen struct
namespace.CredentialGen = &namespaces.CredentialGeneration{}
// We wind up with a duplicate issuer here as the encapsulating ns also encodes this
issuer := xPelicanTokenGeneration["issuer"]
namespace.CredentialGen.Issuer = &issuer
base_path := xPelicanTokenGeneration["base-path"]
namespace.CredentialGen.BasePath = &base_path
if max_scope_depth, exists := xPelicanTokenGeneration["max-scope-depth"]; exists {
max_scope_depth_int, err := strconv.Atoi(max_scope_depth)
if err != nil {
log.Debugln("Server sent an invalid max scope depth; ignoring:", max_scope_depth)
} else {
namespace.CredentialGen.MaxScopeDepth = &max_scope_depth_int
}
}
strategy := xPelicanTokenGeneration["strategy"]
namespace.CredentialGen.Strategy = &strategy
// The Director only returns a vault server if the strategy is vault.
if vs, exists := xPelicanTokenGeneration["vault-server"]; exists {
namespace.CredentialGen.VaultServer = &vs
}
}
// Create the caches slice
namespace.SortedDirectorCaches, err = getCachesFromDirectorResponse(dirResp, namespace.UseTokenOnRead || namespace.ReadHTTPS)
if err != nil {
log.Errorln("Unable to construct ordered cache list:", err)
return
}
log.Debugln("Namespace path constructed from Director:", namespace.Path)
return
}
// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + sourcePath
// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
// dance!)
var client *http.Client
tr := config.GetTransport()
client = &http.Client{
Transport: tr,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
req, err := http.NewRequestWithContext(ctx, verb, resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
}
// Include the Client's version as a User-Agent header. The Director will decide
// if it supports the version, and provide an error message in the case that it
// cannot.
req.Header.Set("User-Agent", getUserAgent(""))
// Perform the HTTP request
resp, err = client.Do(req)
if err != nil {
log.Errorln("Failed to get response from the director:", err)
return
}
defer resp.Body.Close()
log.Debugln("Director's response:", resp)
// Check HTTP response -- should be 307 (redirect), else something went wrong
body, _ := io.ReadAll(resp.Body)
// If we get a 404, the director will hopefully tell us why. It might be that the namespace doesn't exist
if resp.StatusCode == 404 && verb == "PROPFIND" {
// If we get a 404 response from a PROPFIND, we are likely working with an old director so we should return a response
return resp, errors.New("404: " + string(body))
} else if resp.StatusCode == 404 {
// If we get a 404 response when we are not doing a PROPFIND, just return the 404 error without a response
return nil, errors.New("404: " + string(body))
} else if resp.StatusCode == http.StatusMethodNotAllowed && verb == "PROPFIND" {
// If we get a 405 with a PROPFIND, the client will handle it
return
} else if resp.StatusCode != 307 {
var respErr directorResponse
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return nil, errors.Wrap(unmarshalErr, "Could not unmarshall the director's response")
}
return resp, errors.New(respErr.Error)
}
return
}
func getCachesFromDirectorResponse(resp *http.Response, needsToken bool) (caches []namespaces.DirectorCache, err error) {
// Get the Link header
linkHeader := resp.Header.Values("Link")
if len(linkHeader) == 0 {
return []namespaces.DirectorCache{}, nil
}
for _, linksStr := range strings.Split(linkHeader[0], ",") {
links := strings.Split(strings.ReplaceAll(linksStr, " ", ""), ";")
var endpoint string
// var rel string // "rel", as defined in the Metalink/HTTP RFC. Currently not being used by
// the OSDF Client, but is provided by the director. Will be useful in the future when
// we start looking at cases where we want to duplicate from caches if we're throttling
// connections to the origin.
var pri int
for _, val := range links {
if strings.HasPrefix(val, "<") {
endpoint = val[1 : len(val)-1]
} else if strings.HasPrefix(val, "pri") {
pri, _ = strconv.Atoi(val[4:])
}
// } else if strings.HasPrefix(val, "rel") {
// rel = val[5 : len(val)-1]
// }
}
// Construct the cache objects, getting endpoint and auth requirements from
// Director
var cache namespaces.DirectorCache
cache.AuthedReq = needsToken
cache.EndpointUrl = endpoint
cache.Priority = pri
caches = append(caches, cache)
}
// Making the assumption that the Link header doesn't already provide the caches
// in order (even though it probably does). This sorts the caches and ensures
// we're using the "pri" tag to order them
sort.Slice(caches, func(i, j int) bool {
val1 := caches[i].Priority
val2 := caches[j].Priority
return val1 < val2
})
return caches, err
}
// NewTransferDetails creates the TransferDetails struct with the given cache
func newTransferDetailsUsingDirector(cache namespaces.DirectorCache, opts transferDetailsOptions) []transferAttemptDetails {
details := make([]transferAttemptDetails, 0)
cacheEndpoint := cache.EndpointUrl
// Form the URL
cacheURL, err := url.Parse(cacheEndpoint)
if err != nil {
log.Errorln("Failed to parse cache:", cache, "error:", err)
return nil
}
if cacheURL.Scheme == "unix" && cacheURL.Host != "" {
cacheURL.Path = path.Clean("/" + path.Join(cacheURL.Host, cacheURL.Path))
} else if cacheURL.Scheme != "unix" && cacheURL.Host == "" {
// Assume the cache is just a hostname
cacheURL.Host = cacheEndpoint
cacheURL.Path = ""
cacheURL.Scheme = ""
cacheURL.Opaque = ""
}
if opts.NeedsToken {
// Unless we're using the local Unix domain socket cache, force HTTPS
if cacheURL.Scheme != "unix" {
cacheURL.Scheme = "https"
if !hasPort(cacheURL.Host) {
// Add port 8444 and 8443
urlCopy := *cacheURL
urlCopy.Host += ":8444"
details = append(details, transferAttemptDetails{
Url: &urlCopy,
Proxy: false,
PackOption: opts.PackOption,
})
// Strip the port off and add 8443
cacheURL.Host = cacheURL.Host + ":8443"
}
}
det := transferAttemptDetails{
Url: cacheURL,
Proxy: false,
PackOption: opts.PackOption,
}
if cacheURL.Scheme == "unix" {
det.UnixSocket = cacheURL.Path
}
// Whether port is specified or not, add a transfer without proxy
details = append(details, det)
} else if cacheURL.Scheme == "" || cacheURL.Scheme == "http" {
// Assume a transfer not needing a token and not specifying a scheme is HTTP
// WARNING: This is legacy code; we should always specify a scheme
cacheURL.Scheme = "http"
if !hasPort(cacheURL.Host) {
cacheURL.Host += ":8000"
}
isProxyEnabled := isProxyEnabled()
details = append(details, transferAttemptDetails{
Url: cacheURL,
Proxy: isProxyEnabled,
PackOption: opts.PackOption,
})
if isProxyEnabled && CanDisableProxy() {
details = append(details, transferAttemptDetails{
Url: cacheURL,
Proxy: false,
PackOption: opts.PackOption,
})
}
} else {
// A non-HTTP scheme is specified and a token is not needed; this wasn't possible
// in the legacy cases. Simply use the provided config
det := transferAttemptDetails{
Url: cacheURL,
Proxy: false,
PackOption: opts.PackOption,
}
if cacheURL.Scheme == "unix" {
det.UnixSocket = cacheURL.Path
}
details = append(details, det)
}
return details
}