forked from necaris/python3-openid
/
discover.py
460 lines (360 loc) · 15.3 KB
/
discover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# -*- test-case-name: openid.test.test_discover -*-
"""Functions to discover OpenID endpoints from identifiers.
"""
__all__ = [
'DiscoveryFailure',
'OPENID_1_0_NS',
'OPENID_1_0_TYPE',
'OPENID_1_1_TYPE',
'OPENID_2_0_TYPE',
'OPENID_IDP_2_0_TYPE',
'OpenIDServiceEndpoint',
'discover',
]
import urllib.parse
import logging
from openid import fetchers, urinorm
from openid import yadis
from openid.yadis.etxrd import nsTag, XRDSError, XRD_NS_2_0
from openid.yadis.services import applyFilter as extractServices
from openid.yadis.discover import discover as yadisDiscover
from openid.yadis.discover import DiscoveryFailure
from openid.yadis import xrires, filters
from openid.yadis import xri
from openid.consumer import html_parse
OPENID_1_0_NS = 'http://openid.net/xmlns/1.0'
OPENID_IDP_2_0_TYPE = 'http://specs.openid.net/auth/2.0/server'
OPENID_2_0_TYPE = 'http://specs.openid.net/auth/2.0/signon'
OPENID_1_1_TYPE = 'http://openid.net/signon/1.1'
OPENID_1_0_TYPE = 'http://openid.net/signon/1.0'
from openid.message import OPENID1_NS as OPENID_1_0_MESSAGE_NS
from openid.message import OPENID2_NS as OPENID_2_0_MESSAGE_NS
class OpenIDServiceEndpoint(object):
"""Object representing an OpenID service endpoint.
@ivar identity_url: the verified identifier.
@ivar canonicalID: For XRI, the persistent identifier.
"""
# OpenID service type URIs, listed in order of preference. The
# ordering of this list affects yadis and XRI service discovery.
openid_type_uris = [
OPENID_IDP_2_0_TYPE,
OPENID_2_0_TYPE,
OPENID_1_1_TYPE,
OPENID_1_0_TYPE,
]
def __init__(self):
self.claimed_id = None
self.server_url = None
self.type_uris = []
self.local_id = None
self.canonicalID = None
self.used_yadis = False # whether this came from an XRDS
self.display_identifier = None
def usesExtension(self, extension_uri):
return extension_uri in self.type_uris
def preferredNamespace(self):
if (OPENID_IDP_2_0_TYPE in self.type_uris or
OPENID_2_0_TYPE in self.type_uris):
return OPENID_2_0_MESSAGE_NS
else:
return OPENID_1_0_MESSAGE_NS
def supportsType(self, type_uri):
"""Does this endpoint support this type?
I consider C{/server} endpoints to implicitly support C{/signon}.
"""
return (
(type_uri in self.type_uris) or
(type_uri == OPENID_2_0_TYPE and self.isOPIdentifier())
)
def getDisplayIdentifier(self):
"""Return the display_identifier if set, else return the claimed_id.
"""
if self.display_identifier is not None:
return self.display_identifier
if self.claimed_id is None:
return None
else:
return urllib.parse.urldefrag(self.claimed_id)[0]
def compatibilityMode(self):
return self.preferredNamespace() != OPENID_2_0_MESSAGE_NS
def isOPIdentifier(self):
return OPENID_IDP_2_0_TYPE in self.type_uris
def parseService(self, yadis_url, uri, type_uris, service_element):
"""Set the state of this object based on the contents of the
service element."""
self.type_uris = type_uris
self.server_url = uri
self.used_yadis = True
if not self.isOPIdentifier():
# XXX: This has crappy implications for Service elements
# that contain both 'server' and 'signon' Types. But
# that's a pathological configuration anyway, so I don't
# think I care.
self.local_id = findOPLocalIdentifier(service_element,
self.type_uris)
self.claimed_id = yadis_url
def getLocalID(self):
"""Return the identifier that should be sent as the
openid.identity parameter to the server."""
# I looked at this conditional and thought "ah-hah! there's the bug!"
# but Python actually makes that one big expression somehow, i.e.
# "x is x is x" is not the same thing as "(x is x) is x".
# That's pretty weird, dude. -- kmt, 1/07
if (self.local_id is self.canonicalID is None):
return self.claimed_id
else:
return self.local_id or self.canonicalID
def fromBasicServiceEndpoint(cls, endpoint):
"""Create a new instance of this class from the endpoint
object passed in.
@return: None or OpenIDServiceEndpoint for this endpoint object"""
type_uris = endpoint.matchTypes(cls.openid_type_uris)
# If any Type URIs match and there is an endpoint URI
# specified, then this is an OpenID endpoint
if type_uris and endpoint.uri is not None:
openid_endpoint = cls()
openid_endpoint.parseService(
endpoint.yadis_url,
endpoint.uri,
endpoint.type_uris,
endpoint.service_element)
else:
openid_endpoint = None
return openid_endpoint
fromBasicServiceEndpoint = classmethod(fromBasicServiceEndpoint)
def fromHTML(cls, uri, html):
"""Parse the given document as HTML looking for an OpenID <link
rel=...>
@rtype: [OpenIDServiceEndpoint]
"""
discovery_types = [
(OPENID_2_0_TYPE, 'openid2.provider', 'openid2.local_id'),
(OPENID_1_1_TYPE, 'openid.server', 'openid.delegate'),
]
link_attrs = html_parse.parseLinkAttrs(html)
services = []
for type_uri, op_endpoint_rel, local_id_rel in discovery_types:
op_endpoint_url = html_parse.findFirstHref(
link_attrs, op_endpoint_rel)
if op_endpoint_url is None:
continue
service = cls()
service.claimed_id = uri
service.local_id = html_parse.findFirstHref(
link_attrs, local_id_rel)
service.server_url = op_endpoint_url
service.type_uris = [type_uri]
services.append(service)
return services
fromHTML = classmethod(fromHTML)
def fromXRDS(cls, uri, xrds):
"""Parse the given document as XRDS looking for OpenID services.
@rtype: [OpenIDServiceEndpoint]
@raises XRDSError: When the XRDS does not parse.
@since: 2.1.0
"""
return extractServices(uri, xrds, cls)
fromXRDS = classmethod(fromXRDS)
def fromDiscoveryResult(cls, discoveryResult):
"""Create endpoints from a DiscoveryResult.
@type discoveryResult: L{DiscoveryResult}
@rtype: list of L{OpenIDServiceEndpoint}
@raises XRDSError: When the XRDS does not parse.
@since: 2.1.0
"""
if discoveryResult.isXRDS():
method = cls.fromXRDS
else:
method = cls.fromHTML
return method(discoveryResult.normalized_uri,
discoveryResult.response_text)
fromDiscoveryResult = classmethod(fromDiscoveryResult)
def fromOPEndpointURL(cls, op_endpoint_url):
"""Construct an OP-Identifier OpenIDServiceEndpoint object for
a given OP Endpoint URL
@param op_endpoint_url: The URL of the endpoint
@rtype: OpenIDServiceEndpoint
"""
service = cls()
service.server_url = op_endpoint_url
service.type_uris = [OPENID_IDP_2_0_TYPE]
return service
fromOPEndpointURL = classmethod(fromOPEndpointURL)
def __str__(self):
return ("<%s.%s "
"server_url=%r "
"claimed_id=%r "
"local_id=%r "
"canonicalID=%r "
"used_yadis=%s "
">"
% (self.__class__.__module__, self.__class__.__name__,
self.server_url,
self.claimed_id,
self.local_id,
self.canonicalID,
self.used_yadis))
def findOPLocalIdentifier(service_element, type_uris):
"""Find the OP-Local Identifier for this xrd:Service element.
This considers openid:Delegate to be a synonym for xrd:LocalID if
both OpenID 1.X and OpenID 2.0 types are present. If only OpenID
1.X is present, it returns the value of openid:Delegate. If only
OpenID 2.0 is present, it returns the value of xrd:LocalID. If
there is more than one LocalID tag and the values are different,
it raises a DiscoveryFailure. This is also triggered when the
xrd:LocalID and openid:Delegate tags are different.
@param service_element: The xrd:Service element
@type service_element: ElementTree.Node
@param type_uris: The xrd:Type values present in this service
element. This function could extract them, but higher level
code needs to do that anyway.
@type type_uris: [str]
@raises DiscoveryFailure: when discovery fails.
@returns: The OP-Local Identifier for this service element, if one
is present, or None otherwise.
@rtype: str or unicode or NoneType
"""
# XXX: Test this function on its own!
# Build the list of tags that could contain the OP-Local Identifier
local_id_tags = []
if (OPENID_1_1_TYPE in type_uris or
OPENID_1_0_TYPE in type_uris):
local_id_tags.append(nsTag(OPENID_1_0_NS, 'Delegate'))
if OPENID_2_0_TYPE in type_uris:
local_id_tags.append(nsTag(XRD_NS_2_0, 'LocalID'))
# Walk through all the matching tags and make sure that they all
# have the same value
local_id = None
for local_id_tag in local_id_tags:
for local_id_element in service_element.findall(local_id_tag):
if local_id is None:
local_id = local_id_element.text
elif local_id != local_id_element.text:
format = 'More than one %r tag found in one service element'
message = format % (local_id_tag,)
raise DiscoveryFailure(message, None)
return local_id
def normalizeURL(url):
"""Normalize a URL, converting normalization failures to
DiscoveryFailure"""
try:
normalized = urinorm.urinorm(url)
except ValueError as why:
raise DiscoveryFailure('Normalizing identifier: %s' % (why,), None)
else:
return urllib.parse.urldefrag(normalized)[0]
def normalizeXRI(xri):
"""Normalize an XRI, stripping its scheme if present"""
if xri.startswith("xri://"):
xri = xri[6:]
return xri
def arrangeByType(service_list, preferred_types):
"""Rearrange service_list in a new list so services are ordered by
types listed in preferred_types. Return the new list."""
def enumerate(elts):
"""Return an iterable that pairs the index of an element with
that element.
For Python 2.2 compatibility"""
return list(zip(list(range(len(elts))), elts))
def bestMatchingService(service):
"""Return the index of the first matching type, or something
higher if no type matches.
This provides an ordering in which service elements that
contain a type that comes earlier in the preferred types list
come before service elements that come later. If a service
element has more than one type, the most preferred one wins.
"""
for i, t in enumerate(preferred_types):
if preferred_types[i] in service.type_uris:
return i
return len(preferred_types)
# Build a list with the service elements in tuples whose
# comparison will prefer the one with the best matching service
prio_services = [(bestMatchingService(s), orig_index, s)
for (orig_index, s) in enumerate(service_list)]
prio_services.sort()
# Now that the services are sorted by priority, remove the sort
# keys from the list.
for i in range(len(prio_services)):
prio_services[i] = prio_services[i][2]
return prio_services
def getOPOrUserServices(openid_services):
"""Extract OP Identifier services. If none found, return the
rest, sorted with most preferred first according to
OpenIDServiceEndpoint.openid_type_uris.
openid_services is a list of OpenIDServiceEndpoint objects.
Returns a list of OpenIDServiceEndpoint objects."""
op_services = arrangeByType(openid_services, [OPENID_IDP_2_0_TYPE])
openid_services = arrangeByType(openid_services,
OpenIDServiceEndpoint.openid_type_uris)
return op_services or openid_services
def discoverYadis(uri):
"""Discover OpenID services for a URI. Tries Yadis and falls back
on old-style <link rel='...'> discovery if Yadis fails.
@param uri: normalized identity URL
@type uri: str
@return: (claimed_id, services)
@rtype: (str, list(OpenIDServiceEndpoint))
"""
response = yadisDiscover(uri)
yadis_url = response.normalized_uri
body = response.response_text
try:
openid_services = OpenIDServiceEndpoint.fromXRDS(yadis_url, body)
except XRDSError:
# Does not parse as a Yadis XRDS file
openid_services = []
if not openid_services:
# Either not an XRDS or there are no OpenID services.
if response.isXRDS():
# if we got the Yadis content-type or followed the Yadis
# header, re-fetch the document without following the Yadis
# header, with no Accept header.
return discoverNoYadis(uri)
# Try to parse the response as HTML.
# <link rel="...">
openid_services = OpenIDServiceEndpoint.fromHTML(yadis_url, body)
return (yadis_url, getOPOrUserServices(openid_services))
def discoverXRI(iname):
endpoints = []
iname = normalizeXRI(iname)
try:
canonicalID, services = xrires.ProxyResolver().query(
iname, OpenIDServiceEndpoint.openid_type_uris)
if canonicalID is None:
raise XRDSError('No CanonicalID found for XRI %r' % (iname,))
flt = filters.mkFilter(OpenIDServiceEndpoint)
for service_element in services:
endpoints.extend(flt.getServiceEndpoints(iname, service_element))
except XRDSError:
logging.exception('xrds error on ' + iname)
for endpoint in endpoints:
# Is there a way to pass this through the filter to the endpoint
# constructor instead of tacking it on after?
endpoint.canonicalID = canonicalID
endpoint.claimed_id = canonicalID
endpoint.display_identifier = iname
# FIXME: returned xri should probably be in some normal form
return iname, getOPOrUserServices(endpoints)
def discoverNoYadis(uri):
http_resp = fetchers.fetch(uri)
claimed_id = http_resp.url
openid_services = OpenIDServiceEndpoint.fromHTML(
claimed_id, http_resp.read()) # MAX_RESPONSE
return claimed_id, openid_services
def discoverURI(uri):
parsed = urllib.parse.urlparse(uri)
if parsed[0] and parsed[1]:
if parsed[0] not in ['http', 'https']:
raise DiscoveryFailure('URI scheme is not HTTP or HTTPS', None)
else:
uri = 'http://' + uri
uri = normalizeURL(uri)
claimed_id, openid_services = discoverYadis(uri)
claimed_id = normalizeURL(claimed_id)
return claimed_id, openid_services
def discover(identifier):
if xri.identifierScheme(identifier) == "XRI":
return discoverXRI(identifier)
else:
return discoverURI(identifier)