/
auth.py
82 lines (65 loc) · 2.45 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import base64
from abc import ABC
from email.utils import formatdate
from urllib.parse import urlparse
from Crypto.Hash import SHA256
from httpsig.requests_auth import HTTPSignatureAuth
from httpsig.utils import HttpSigException
from requests.auth import AuthBase
class StrateosAuthBase(AuthBase, ABC):
def __init__(self, api_root):
self.api_root = api_root
def is_internal_request(self, request):
return urlparse(request.url).netloc == urlparse(self.api_root).netloc
class StrateosSign(StrateosAuthBase):
"""Signs requests"""
def __init__(self, email, secret, api_root):
super().__init__(api_root)
self.email = email
self.secret = secret
headers = ["(request-target)", "Date", "Host"]
body_headers = ["Digest", "Content-Length"]
try:
self.auth = HTTPSignatureAuth(
key_id=self.email,
algorithm="rsa-sha256",
headers=headers,
secret=self.secret,
)
self.body_auth = HTTPSignatureAuth(
key_id=self.email,
algorithm="rsa-sha256",
headers=headers + body_headers,
secret=self.secret,
)
except HttpSigException:
raise ValueError(
"Could not parse the specified RSA Key, ensure it "
"is a PRIVATE key in PEM format"
)
def __call__(self, request):
if not self.is_internal_request(request):
return request
if "Date" not in request.headers:
request.headers["Date"] = formatdate(
timeval=None, localtime=False, usegmt=True
)
if request.method.upper() in ("PUT", "POST", "PATCH"):
encoded_body = (
request.body
if isinstance(request.body, bytes)
else request.body.encode()
)
digest = SHA256.new(encoded_body).digest()
sha = base64.b64encode(digest).decode("ascii")
request.headers["Digest"] = f"SHA-256={sha}"
return self.body_auth(request)
return self.auth(request)
class StrateosBearerAuth(StrateosAuthBase):
def __init__(self, token, api_root):
super().__init__(api_root)
self.token = token
def __call__(self, request):
if self.is_internal_request(request):
request.headers["authorization"] = self.token
return request