-
-
Notifications
You must be signed in to change notification settings - Fork 41
/
abc.py
713 lines (433 loc) · 25.6 KB
/
abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
from abc import ABC, abstractmethod
import inspect
from ..models import Request
class AbstractOAuth2Manager(ABC):
"""
OAuth2 manager that only supports Authorization Code Flow (https://tools.ietf.org/html/rfc6749#section-1.3.1)
Arguments:
session_factory (aiogoogle.sessions.AbstractSession): A session implementation
verify (bool): whether or not to verify tokens fetched
Note:
For a flow similar to Client Credentials Flow (https://tools.ietf.org/html/rfc6749#section-1.3.4) use an ``api_key``
"""
def __new__(cls, *args, **kwargs):
# Get all coros of this the abstract class
parent_abstract_coros = inspect.getmembers(
AbstractOAuth2Manager, predicate=inspect.iscoroutinefunction
)
# Ensure all relevant child methods are implemented as coros
for coro in parent_abstract_coros:
coro_name = coro[0]
child_method = getattr(cls, coro_name)
if not inspect.iscoroutinefunction(child_method):
raise RuntimeError(f"{child_method} must be a coroutine")
# Resume with normal behavior of a Python constructor
return super(AbstractOAuth2Manager, cls).__new__(cls)
@abstractmethod
def __init__(self, session_factory):
raise NotImplementedError
@abstractmethod
def __aenter__(self):
raise NotImplementedError
@abstractmethod
def __aexit__(self, *args, **kwargs):
raise NotImplementedError
@abstractmethod
def authorize(self, request, user_creds) -> Request:
"""
Adds OAuth2 authorization headers to requests given user creds
Arguments:
request (aiogoogle.models.Request):
Request to authorize
user_creds (aiogoogle.auth.creds.UserCreds):
user_creds to refresh with
Returns:
aiogoogle.models.Request: Request with OAuth2 authorization header
"""
raise NotImplementedError
@abstractmethod
def authorization_url(
self,
client_creds=None,
state=None,
access_type=None,
include_granted_scopes=None,
login_hint=None,
prompt=None,
response_type=None,
scopes=None,
) -> str:
"""
First step of OAuth2 authoriztion code flow. Creates an OAuth2 authorization URI.
Arguments:
client_creds (aiogoogle.auth.creds.ClientCreds): A client_creds object/dictionary containing the following items:
* client_id
* scopes
* redirect_uri
scopes (list): List of OAuth2 scopes to ask for
* Optional
* Overrides the list of scopes specified in client creds
state (str): A CSRF token
* Optional
* Specifies any string value that your application uses to maintain state between your authorization request and the authorization server's response.
* The server returns the exact value that you send as a name=value pair in the hash (#) fragment of the redirect_uri after the user consents to or denies your application's access request.
* You can use this parameter for several purposes, such as:
* Directing the user to the correct resource in your application
* Sending nonces
* Mitigating cross-site request forgery.
* If no state is passed, this method will generate and add a secret token to ``user_creds['state']``.
* Since your redirect_uri can be guessed, using a state value can increase your assurance that an incoming connection is the result of an authentication request.
* If you generate a random string or encode the hash of a cookie or another value that captures the client's state, you can validate the response to additionally ensure that the request and response originated in the same browser, providing protection against attacks such as cross-site request forgery.
access_type (str): Indicates whether your application can refresh access tokens when the user is not present at the browser. Options:
* Optional
* ``"online"`` *Default*
* ``"offline"`` Choose this for a refresheable/long-term access token
include_granted_scopes (bool):
* Optional
* Enables applications to use incremental authorization to request access to additional scopes in context.
* If you set this parameter's value to ``True`` and the authorization request is granted, then the new access token will also cover any scopes to which the user previously granted the application access.
login_hint (str):
* Optional
* If your application knows which user is trying to authenticate, it can use this parameter to provide a hint to the Google Authentication Server.
* The server uses the hint to simplify the login flow either by prefilling the email field in the sign-in form or by selecting the appropriate multi-login session.
* Set the parameter value to an email address or sub identifier, which is equivalent to the user's Google ID.
* This can help you avoid problems that occur if your app logs in the wrong user account.
prompt (str):
* Optional
* A space-delimited, case-sensitive list of prompts to present the user.
* If you don't specify this parameter, the user will be prompted only the first time your app requests access.
* Possible values are:
* ``None`` : Default: Do not display any authentication or consent screens. Must not be specified with other values.
* ``'consent'`` : Prompt the user for consent.
* ``'select_account'`` : Prompt the user to select an account.
response_type (str):
* OAuth2 response type
* Defaults to Authorization Code Flow response type
Note:
* It is highly recommended that you don't leave ``state`` as ``None`` in production.
* To effortlessly create a random secret to pass it as a state token, you can use ``aiogoogle.auth.utils.create_secret()``
Note:
A Note About Scopes:
* For a list of all of Google's available scopes: https://developers.google.com/identity/protocols/googlescopes
* It is recommended that your application requests access to authorization scopes in context whenever possible.
* By requesting access to user data in context, via incremental authorization, you help users to more easily understand why your application needs the access it is requesting.
Warning:
* When listening for a callback after redirecting a user to the URL returned from this method, take the following into consideration:
* If your response endpoint renders an HTML page, any resources on that page will be able to see the authorization code in the URL.
* Scripts can read the URL directly, and the URL in the Referer HTTP header may be sent to any or all resources on the page.
* Carefully consider whether you want to send authorization credentials to all resources on that page (especially third-party scripts such as social plugins and analytics).
* To avoid this issue, it's recommend that the server first handle the request, then redirect to another URL that doesn't include the response parameters.
Example:
::
from aiogoogle.auth.utils import create_secret
from aiogoogle import ClinetCreds
client_creds = ClientCreds(
client_id='a_client_id',
scopes=['first.scope', 'second.scope'],
redirect_uri='http://localhost:8080'
)
state = create_secret()
auth_uri = oauth2.authorization_url(
client_creds=client_creds,
state=state,
access_type='offline',
include_granted_scopes=True,
login_hint='example@gmail.com',
prompt='select_account'
)
Returns:
(str): An Authorization URI
"""
raise NotImplementedError
@abstractmethod
async def build_user_creds(self, grant, client_creds, grant_type=None):
"""
Second step of Oauth2 authrization code flow. Creates a User Creds object with access and refresh token
Arguments:
grant (str):
* Aka: "code".
* The code received at your redirect URI from the auth callback
client_creds (aiogoogle.auth.creds.ClientCreds):
* Dict with client_id and client_secret items
grant_type (str):
* OAuth2 grant type
* defaults to ``code`` (Authorization code flow)
Returns:
aiogoogle.auth.creds.UserCreds: User Credentials with the following items:
* ``access_token``
* ``refresh_token``
* ``expires_in`` (JSON format ISO 8601)
* ``token_type`` always set to bearer
* ``scopes``
Raises:
aiogoogle.excs.AuthError: Auth Error
"""
raise NotImplementedError
@abstractmethod
def is_expired(self, user_creds):
"""
Checks if user_creds expired
Arguments:
user_creds (aiogoogle.auth.creds.UserCreds): User Credentials
Returns:
bool:
"""
raise NotImplementedError
@abstractmethod
async def refresh(self, user_creds, client_creds):
"""
Refreshes user_creds
Arguments:
user_creds (aiogoogle.auth.creds.UserCreds): User Credentials with ``refresh_token`` item
client_creds (aiogoogle.auth.creds.ClientCreds): Client Credentials
Returns:
aiogoogle.creds.UserCreds: Refreshed user credentials
Raises:
aiogoogle.excs.AuthError: Auth Error
"""
raise NotImplementedError
@abstractmethod
async def revoke(self, user_creds):
"""
Revokes user_creds
In some cases a user may wish to revoke access given to an application. A user can revoke access by visiting Account Settings.
It is also possible for an application to programmatically revoke the access given to it.
Programmatic revocation is important in instances where a user unsubscribes or removes an application.
In other words, part of the removal process can include an API request to ensure the permissions granted to the application are removed.
Arguments:
user_creds (aiogoogle.auth.Creds): UserCreds with an ``access_token`` item
Returns:
None:
Raises:
aiogoogle.excs.AuthError:
"""
raise NotImplementedError
class AbstractOpenIdConnectManager(AbstractOAuth2Manager):
def __new__(cls, *args, **kwargs):
# Get all coros of this the abstract class
parent_abstract_coros = inspect.getmembers(
AbstractOpenIdConnectManager, predicate=inspect.iscoroutinefunction
)
# Ensure all relevant child methods are implemented as coros
for coro in parent_abstract_coros:
coro_name = coro[0]
child_method = getattr(cls, coro_name)
if not inspect.iscoroutinefunction(child_method):
raise RuntimeError(f"{child_method} must be a coroutine")
# Resume with normal behavior of a Python constructor
return super(AbstractOpenIdConnectManager, cls).__new__(cls)
@abstractmethod
def authorization_url(
self,
client_creds,
nonce,
state=None,
prompt=None,
display=None,
login_hint=None,
access_type=None,
include_granted_scopes=None,
openid_realm=None,
hd=None,
response_type=None,
scopes=None,
):
"""
First step of OAuth2 authoriztion code flow. Creates an OAuth2 authorization URI.
Arguments:
client_creds (aiogoogle.auth.creds.ClientCreds): A client_creds object/dictionary containing the following items:
* client_id
* scopes
* The scope value must begin with the string openid and then include profile or email or both.
* redirect_uri
nonce (str): A random value generated by your app that enables replay protection.
scopes (list): List of OAuth2 scopes to ask for
* Optional
* Overrides the list of scopes specified in client creds
* Some OpenID scopes that you can include: ['email', 'profile', 'openid']
display (str):
* An ASCII string value for specifying how the authorization server displays the authentication and consent user interface pages.
* The following values are specified, and accepted by the Google servers, but do not have any effect on its behavior:
* ``page``
* ``popup``
* ``touch``
* ``wap``
state (str): A CSRF token
* Optional
* Specifies any string value that your application uses to maintain state between your authorization request and the authorization server's response.
* The server returns the exact value that you send as a name=value pair in the hash (#) fragment of the redirect_uri after the user consents to or denies your application's access request.
* You can use this parameter for several purposes, such as:
* Directing the user to the correct resource in your application
* Sending nonces
* Mitigating cross-site request forgery.
* If no state is passed, this method will generate and add a secret token to ``user_creds['state']``.
* Since your redirect_uri can be guessed, using a state value can increase your assurance that an incoming connection is the result of an authentication request.
* If you generate a random string or encode the hash of a cookie or another value that captures the client's state, you can validate the response to additionally ensure that the request and response originated in the same browser, providing protection against attacks such as cross-site request forgery.
access_type (str): Indicates whether your application can refresh access tokens when the user is not present at the browser. Options:
* Optional
* ``"online"`` *Default*
* ``"offline"`` Choose this for a refresheable/long-term access token
include_granted_scopes (bool):
* Optional
* Enables applications to use incremental authorization to request access to additional scopes in context.
* If you set this parameter's value to ``True`` and the authorization request is granted, then the new access token will also cover any scopes to which the user previously granted the application access.
login_hint (str):
* Optional
* If your application knows which user is trying to authenticate, it can use this parameter to provide a hint to the Google Authentication Server.
* The server uses the hint to simplify the login flow either by prefilling the email field in the sign-in form or by selecting the appropriate multi-login session.
* Set the parameter value to an email address or sub identifier, which is equivalent to the user's Google ID.
* This can help you avoid problems that occur if your app logs in the wrong user account.
prompt (str):
* Optional
* A space-delimited, case-sensitive list of prompts to present the user.
* If you don't specify this parameter, the user will be prompted only the first time your app requests access.
* Possible values are:
* ``None`` : Default: Do not display any authentication or consent screens. Must not be specified with other values.
* ``'consent'`` : Prompt the user for consent.
* ``'select_account'`` : Prompt the user to select an account.
openid_realm (str):
* openid.realm is a parameter from the OpenID 2.0 protocol.
* It is used in OpenID 2.0 requests to signify the URL-space for which an authentication request is valid.
* Use openid.realm if you are migrating an existing application from OpenID 2.0 to OpenID Connect.
* For more details, see Migrating off of OpenID 2.0. https://developers.google.com/identity/protocols/OpenID2Migration
hd (str):
* The hd (hosted domain) parameter streamlines the login process for G Suite hosted accounts.
* By including the domain of the G Suite user (for example, mycollege.edu), you can indicate that the account selection UI should be optimized for accounts at that domain.
* To optimize for G Suite accounts generally instead of just one domain, use an asterisk: hd=*.
* Don't rely on this UI optimization to control who can access your app, as client-side requests can be modified.
* Be sure to validate that the returned ID token has an hd claim value that matches what you expect (e.g. mycolledge.edu).
* Unlike the request parameter, the ID token claim is contained within a security token from Google, so the value can be trusted.
response_type (str):
* OAuth2 response type
* Defaults to Authorization Code Flow response type
Note:
It is highly recommended that you don't leave ``state`` as ``None`` in production
To effortlessly create a random secret to pass it as a state token, you can use ``aiogoogle.auth.utils.create_secret()``
Note:
A Note About Scopes:
* You can mix OAuth2 scopes with OpenID connect scopes. e.g.: ``openid email https://www.googleapis.com/auth/urlshortener``
* For a list of all of Google's available scopes: https://developers.google.com/identity/protocols/googlescopes
* It is recommended that your application requests access to authorization scopes in context whenever possible.
* By requesting access to user data in context, via incremental authorization, you help users to more easily understand why your application needs the access it is requesting.
Warning:
* When listening for a callback after redirecting a user to the URL returned from this method, take the following into consideration:
* If your response endpoint renders an HTML page, any resources on that page will be able to see the authorization code in the URL.
* Scripts can read the URL directly, and the URL in the Referer HTTP header may be sent to any or all resources on the page.
* Carefully consider whether you want to send authorization credentials to all resources on that page (especially third-party scripts such as social plugins and analytics).
* To avoid this issue, it's recommend that the server first handle the request, then redirect to another URL that doesn't include the response parameters.
Example:
::
from aiogoogle.auth.utils import create_secret
from aiogoogle import ClinetCreds
client_creds = ClientCreds(
client_id='a_client_id',
scopes=['first.scope', 'second.scope'],
redirect_uri='http://localhost:8080'
)
state = create_secret()
nonce = create_secret()
auth_uri = openid_connect.authorization_url(
client_creds=client_creds,
nonce=nonce,
state=state,
access_type='offline',
include_granted_scopes=True,
login_hint='example@gmail.com',
prompt='select_account'
)
Returns:
(str): An Authorization URI
"""
raise NotImplementedError
@abstractmethod
def build_user_creds(
self, grant, client_creds, grant_type=None, nonce=None, hd=None, verify=True
):
"""
Second step of Oauth2 authrization code flow and OpenID connect. Creates a User Creds object with access and refresh token
Arguments:
grant (str):
* Aka: "code".
* The code received at your redirect URI from the auth callback
client_creds (aiogoogle.auth.creds.ClientCreds):
* Dict with client_id and client_secret items
grant_type (str):
* OAuth2 grant type
* defaults to ``code`` (Authorization code flow)
nonce (str):
* Random value that prevents replay attacks
* pass the one you used with ``self.authorization_url()`` method
hd (str):
* hosted domain for G-suite
* used for id_token verification
verify (str):
* Whether or not to verify the received id_token
* Unless you're building a speed critical application AND you're receiving your tokens directly from Google, you should leave this as True.
Returns:
aiogoogle.auth.creds.UserCreds: User Credentials with the following items:
* ``access_token``
* ``refresh_token``
* ``expires_in`` (JSON format ISO 8601)
* ``token_type`` always set to bearer
* ``scopes``
Raises:
aiogoogle.excs.AuthError: Auth Error
"""
raise NotImplementedError
@abstractmethod
def decode_and_validate(self, id_token_jwt, client_id=None, nonce=None, hd=None):
"""
Decodes then validates an openid_connect jwt with Google's oaauth2 certificates
Arguments:
id_token_jwt (str): Found in :class:aiogoogle.auth.creds.UserCreds
client_id (str): If provided will validate token's audience ('aud')
nonce (str): If provided, will validate the nonce provided at authorization
hd (str): If provided, will validate client's hosted domain
Returns:
dict: Decoded OpenID connect JWT
"""
raise NotImplementedError
@abstractmethod
def get_user_info(self, user_creds):
"""
https://developers.google.com/+/web/api/rest/openidconnect/getOpenIdConnect
People: getOpenIdConnect
Get user information after performing an OpenID connect flow.
Use this method instead of people.get (Google+ API) when you need the OpenID Connect format.
This method is not discoverable nor is it in the Google API client libraries.
To learn more, see OpenID Connect for sign-in. https://developers.google.com/+/web/api/rest/openidconnect/index.html
Example:
::
>>> await get_user_info(user_creds)
{
"kind": "plus#personOpenIdConnect",
"gender": string,
"sub": string,
"name": string,
"given_name": string,
"family_name": string,
"profile": string,
"picture": string,
"email": string,
"email_verified": "true",
"locale": string,
"hd": string
}
Arguments:
user_creds (aiogoogle.auth.creds.UserCreds): User credentials
"""
raise NotImplementedError
class AbstractAPIKeyManager(ABC):
@abstractmethod
def authorize(self, request, api_key):
"""
Adds API Key authorization query argument to URL of a request given an API key
Arguments:
request (aiogoogle.models.Request):
Request to authorize
creds (aiogoogle.auth.creds.ApiKey):
ApiKey to refresh with
Returns:
aiogoogle.models.Request: Request with API key in URL
"""
raise NotImplementedError