Skip to content

Commit

Permalink
Fix parsing of AWS secret access keys that contain slash (/)
Browse files Browse the repository at this point in the history
  • Loading branch information
frederickjansen committed Nov 14, 2021
1 parent 381747a commit ca7cf89
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
51 changes: 44 additions & 7 deletions b3u/b3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from __future__ import annotations
import doctest
from urllib.parse import urlparse, parse_qs
from urllib.parse import urlparse, parse_qs, quote, unquote, ParseResult

def credentials(uri: str) -> dict:
"""
Expand All @@ -20,19 +20,32 @@ def credentials(uri: str) -> dict:
aws_access_key_id abc
aws_secret_access_key xyz
aws_session_token 123
>>> cs = credentials('s3://abc:/abcdef/ghijklmnopqrstuvwxyz/1234567890/:123@bucket/object.data')
>>> for (k, v) in sorted(cs.items()):
... print(k, v)
aws_access_key_id abc
aws_secret_access_key /abcdef/ghijklmnopqrstuvwxyz/1234567890/
aws_session_token 123
>>> cs = credentials('s3://abc:/abcdef/ghijklmnopqrstuvwxyz/1234567890/@bucket/object.data')
>>> for (k, v) in sorted(cs.items()):
... print(k, v)
aws_access_key_id abc
aws_secret_access_key /abcdef/ghijklmnopqrstuvwxyz/1234567890/
"""
params = {}
result = urlparse(uri)
result = _make_url_safe(uri)

if result.username is not None and result.username != '':
params['aws_access_key_id'] = result.username

if result.password is not None and result.password != '':
if not ':' in result.password:
params['aws_secret_access_key'] = result.password
params['aws_secret_access_key'] = unquote(result.password)
else:
(secret, token) = result.password.split(':')
params['aws_secret_access_key'] = secret
params['aws_secret_access_key'] = unquote(secret)
params['aws_session_token'] = token

return params
Expand Down Expand Up @@ -67,9 +80,13 @@ def configuration(uri: str, safe: bool = True) -> dict:
>>> for (k, v) in sorted(cs.items()):
... print(k, v)
other_param other_value
>>> cs = configuration('s3://bucket/object.data?other_param=other:value', False)
>>> for (k, v) in sorted(cs.items()):
... print(k, v)
other_param other:value
"""
params = credentials(uri)
result = parse_qs(urlparse(uri).query)
result = parse_qs(_make_url_safe(uri).query)

for (key, values) in result.items():
if len(values) == 1:
Expand Down Expand Up @@ -106,7 +123,7 @@ def for_client(uri: str, safe: bool = True) -> dict:
other_param other_value
service_name s3
"""
result = urlparse(uri)
result = _make_url_safe(uri)
params = configuration(uri, False)
params['service_name'] = result.scheme

Expand Down Expand Up @@ -149,7 +166,7 @@ def for_get(uri: str) -> dict:
{'Name': '/path/to/parameter'}
"""
params = {}
result = urlparse(uri)
result = _make_url_safe(uri)

if result.scheme == 's3':
if result.hostname is not None and result.hostname != '':
Expand All @@ -162,6 +179,26 @@ def for_get(uri: str) -> dict:

return params

def _make_url_safe(uri: str) -> ParseResult:
"""
URL encode slashes in aws_secret_access_key to make it compatible with
urlparse()
:param uri: AWS resource URI
:return: Url parsed URI with encoded slashes for aws_secret_access_key
"""
parts = uri.split(':')
if len(parts) >= 3:
key_and_bucket = parts[2].split('@')
if len(key_and_bucket[0]) == 40:
key_and_bucket[0] = quote(key_and_bucket[0], safe='')

if len(parts) >= 4:
uri = ':'.join(parts[:2]) + ':' + ''.join(key_and_bucket) + ':' + ':'.join(parts[3:])
else:
uri = ':'.join(parts[:2]) + ':' + '@'.join(key_and_bucket)
return urlparse(uri)


# Succinct synonyms.
cred = credentials
conf = configuration
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="b3u",
version="1.0.0",
version="1.0.1",
packages=["b3u",],
install_requires=[],
license="MIT",
Expand Down

0 comments on commit ca7cf89

Please sign in to comment.