/
blob.py
277 lines (233 loc) · 9.07 KB
/
blob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import binascii
import collections
import datetime
import enum
import hashlib
import io
from typing import Any
from typing import Dict
from typing import Optional
from typing import TYPE_CHECKING
from urllib.parse import quote
import rsa
from gcloud.aio.auth import BUILD_GCLOUD_REST # pylint: disable=no-name-in-module
from gcloud.aio.auth import decode # pylint: disable=no-name-in-module
from gcloud.aio.auth import IamClient # pylint: disable=no-name-in-module
from gcloud.aio.auth import Token # pylint: disable=no-name-in-module
from pyasn1.codec.der import decoder
from pyasn1_modules import pem
from pyasn1_modules.rfc5208 import PrivateKeyInfo
from .constants import DEFAULT_TIMEOUT
# Selectively load libraries based on the package
if BUILD_GCLOUD_REST:
from requests import Session
else:
from aiohttp import ClientSession as Session # type: ignore[assignment]
if TYPE_CHECKING:
from .bucket import Bucket # pylint: disable=cyclic-import
HOST = 'storage.googleapis.com'
PKCS1_MARKER = (
'-----BEGIN RSA PRIVATE KEY-----',
'-----END RSA PRIVATE KEY-----',
)
PKCS8_MARKER = (
'-----BEGIN PRIVATE KEY-----',
'-----END PRIVATE KEY-----',
)
PKCS8_SPEC = PrivateKeyInfo()
class PemKind(enum.Enum):
"""
Tracks the response of ``pem.readPemBlocksFromFile(key, *args)``>
Note that the specified method returns ``(marker_id, key_bytes)``, where
``marker_id`` is the integer index of the matching ``arg`` (or -1 if no
match was found.
For example::
(marker_id, _) = pem.readPemBlocksFromFile(key, PKCS1_MARKER,
PCKS8_MARKER)
if marker_id == -1:
# "key" did not match either type or was invalid
if marker_id == 0:
# "key" matched the zeroth provided marker arg, eg. PKCS1_MARKER
if marker_id == 1:
# "key" matched the zeroth provided marker arg, eg. PKCS8_MARKER
"""
INVALID = -1
PKCS1 = 0
PKCS8 = 1
class _SignatureMethod(enum.Enum):
"""
Indicates where the url signing will be done through Google's
IAM API or through local signing with a PEM file, which is faster
but requires that the provided token contains client_email and
private_key data
"""
PEM = 0
IAM_API = 1
class Blob:
def __init__(
self, bucket: 'Bucket', name: str,
metadata: Dict[str, Any],
) -> None:
self.__dict__.update(**metadata)
self.bucket = bucket
self.name = name
self.size: int = int(self.size)
@property
def chunk_size(self) -> int:
return self.size + (262144 - (self.size % 262144))
async def download(
self, timeout: int = DEFAULT_TIMEOUT,
session: Optional[Session] = None,
auto_decompress: bool = True,
) -> Any:
headers = None if auto_decompress else {'accept-encoding': 'gzip'}
return await self.bucket.storage.download(
self.bucket.name,
self.name,
timeout=timeout,
session=session,
headers=headers,
)
async def upload(
self, data: Any,
content_type: Optional[str] = None,
session: Optional[Session] = None,
) -> Dict[str, Any]:
metadata = await self.bucket.storage.upload(
self.bucket.name,
self.name,
data,
content_type=content_type,
session=session,
)
self.__dict__.update(metadata)
return metadata
async def get_signed_url( # pylint: disable=too-many-locals
self, expiration: int, headers: Optional[Dict[str, str]] = None,
query_params: Optional[Dict[str, Any]] = None,
http_method: str = 'GET', iam_client: Optional[IamClient] = None,
service_account_email: Optional[str] = None,
token: Optional[Token] = None, session: Optional[Session] = None,
) -> str:
"""
Create a temporary access URL for Storage Blob accessible by anyone
with the link.
Adapted from Google Documentation:
https://cloud.google.com/storage/docs/access-control/signing-urls-manually#python-sample
"""
if expiration > 604800:
raise ValueError(
"expiration time can't be longer than 604800 "
'seconds (7 days)',
)
quoted_name = quote(self.name, safe=b'/~')
canonical_uri = f'/{self.bucket.name}/{quoted_name}'
datetime_now = datetime.datetime.now(datetime.timezone.utc)
request_timestamp = datetime_now.strftime('%Y%m%dT%H%M%SZ')
datestamp = datetime_now.strftime('%Y%m%d')
token = token or self.bucket.storage.token
credential_scope = f'{datestamp}/auto/storage/goog4_request'
# Try to sign locally if available
client_email = token.service_data.get('client_email')
private_key = token.service_data.get('private_key')
if not client_email or not private_key:
# Cannot sign locally, so we'll have to use Google's IAM API
signature_method = _SignatureMethod.IAM_API
credential = f'{service_account_email}/{credential_scope}'
else:
signature_method = _SignatureMethod.PEM
credential = f'{client_email}/{credential_scope}'
headers = headers or {}
headers['host'] = HOST
ordered_headers = collections.OrderedDict(
sorted(headers.items(), key=lambda x: x[0].lower()))
canonical_headers = ''.join(
f'{str(k).lower()}:{str(v).lower()}\n'
for k, v in ordered_headers.items()
)
signed_headers = ';'.join(
f'{str(k).lower()}' for k in ordered_headers.keys()
)
query_params = query_params or {}
query_params['X-Goog-Algorithm'] = 'GOOG4-RSA-SHA256'
query_params['X-Goog-Credential'] = credential
query_params['X-Goog-Date'] = request_timestamp
query_params['X-Goog-Expires'] = expiration
query_params['X-Goog-SignedHeaders'] = signed_headers
ordered_query_params = collections.OrderedDict(
sorted(query_params.items()),
)
canonical_query_str = '&'.join(
f'{quote(str(k), safe="")}={quote(str(v), safe="")}'
for k, v in ordered_query_params.items()
)
canonical_req = '\n'.join([
http_method, canonical_uri,
canonical_query_str, canonical_headers,
signed_headers, 'UNSIGNED-PAYLOAD',
])
canonical_req_hash = hashlib.sha256(canonical_req.encode()).hexdigest()
str_to_sign = '\n'.join([
'GOOG4-RSA-SHA256', request_timestamp,
credential_scope, canonical_req_hash,
])
if (signature_method == _SignatureMethod.PEM and private_key
and isinstance(private_key, str)):
signed_blob = self.get_pem_signature(str_to_sign, private_key)
else:
try:
iam_client = iam_client or IamClient(
token=token, session=session)
except TypeError as e:
raise TypeError('Blob signing is not yet supported'
' for AUTHORIZED_USER tokens') from e
signed_blob = await self.get_iam_api_signature(
str_to_sign,
iam_client,
service_account_email,
session,
)
signature = binascii.hexlify(signed_blob).decode()
return (
f'https://{HOST}{canonical_uri}?'
f'{canonical_query_str}&X-Goog-Signature={signature}'
)
@staticmethod
def get_pem_signature(str_to_sign: str, private_key: str) -> bytes:
# N.B. see the ``PemKind`` enum
marker_id, key_bytes = pem.readPemBlocksFromFile(
io.StringIO(private_key), PKCS1_MARKER, PKCS8_MARKER,
)
if marker_id == PemKind.INVALID.value:
raise ValueError('private key is invalid or unsupported')
if marker_id == PemKind.PKCS8.value:
# convert from pkcs8 to pkcs1
key_info, remaining = decoder.decode(
key_bytes,
asn1Spec=PKCS8_SPEC,
)
if remaining != b'':
raise ValueError(
'could not read PKCS8 key: found extra bytes',
remaining,
)
private_key_info = key_info.getComponentByName('privateKey')
key_bytes = private_key_info.asOctets()
key = rsa.key.PrivateKey.load_pkcs1(key_bytes, format='DER')
signed_blob = rsa.pkcs1.sign(
str_to_sign.encode(),
key,
'SHA-256',
)
return signed_blob
@staticmethod
async def get_iam_api_signature(
str_to_sign: str, iam_client: IamClient,
service_account_email: Optional[str], session: Optional[Session],
) -> bytes:
signed_resp = await iam_client.sign_blob(
str_to_sign,
service_account_email=service_account_email,
session=session,
)
return decode(signed_resp['signedBlob'])