-
Notifications
You must be signed in to change notification settings - Fork 2
/
authorizer.py
62 lines (49 loc) · 1.85 KB
/
authorizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import datetime
import json
import boto3
CACHED_AUTH_TOKEN = None # Once set, wait at least TOKEN_CACHE_TIME to look up.
TOKEN_CACHE_TIME = 300 # seconds
LAST_CACHE_TIME = None
# Possible policy effects
EFFECT_DENY = 'Deny'
EFFECT_ALLOW = 'Allow'
def retrieve_auth_token():
global CACHED_AUTH_TOKEN, LAST_CACHE_TIME
# If there is no cache time, use the min possible date
LAST_CACHE_TIME = LAST_CACHE_TIME or datetime.datetime.min
token_cache_delta = datetime.datetime.now() - LAST_CACHE_TIME
token_cache_delta_in_seconds = token_cache_delta.total_seconds()
token_cache_expired = token_cache_delta_in_seconds > TOKEN_CACHE_TIME
if not CACHED_AUTH_TOKEN or token_cache_expired:
client = boto3.client('secretsmanager')
secret = client.get_secret_value(
SecretId="app/okta-to-aws-sso"
)
secret_string = json.loads(secret['SecretString'])
CACHED_AUTH_TOKEN = secret_string['auth_token_for_okta']
LAST_CACHE_TIME = datetime.datetime.now()
return CACHED_AUTH_TOKEN
def lambda_handler(event, context):
provided_token = event['headers']['Authorization']
principal_id = event['headers']['x-api-key']
resource = event['methodArn']
expected_token = retrieve_auth_token()
policy = generate_policy(principal_id, EFFECT_DENY, resource)
if provided_token == expected_token:
policy = generate_policy(principal_id, EFFECT_ALLOW, resource)
print(json.dumps(policy))
return policy
def generate_policy(principal_id, effect, resource):
return {
"principalId": principal_id,
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": effect,
"Resource": resource
}
]
}
}