Skip to content

Commit

Permalink
clean up s3 submodule interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenkov committed Sep 15, 2018
1 parent 7f51488 commit 4ce1dfb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 65 deletions.
63 changes: 16 additions & 47 deletions smart_open/s3.py
Expand Up @@ -56,26 +56,21 @@ def _clamp(value, minval, maxval):


def open(bucket_id, key_id, mode,
resource=None,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
profile_name=None,
endpoint_url=None,
aws_access_key_id=None,
aws_secret_access_key=None,
multipart_upload_kwargs=None,
):
"""
Open s3://{bucket_id}/{key_id}
Use the resource object to override the default session (e.g. profile name,
access keys, and endpoint URL).
:param str bucket_id:
:param str key_id:
:param str mode: must be one of rb or wb
:param boto3.Session session:
:param str profile_name: The boto3 profile name to use when connecting to S3.
:param str endpoint_url:
:param str aws_access_key_id:
:param str aws_secret_access_key:
:param int min_part_size: For writing only.
:param boto3.s3.ServiceResource resource: The resource for accessing S3.
:param dict multipart_upload_kwargs: For writing only.
"""
logger.debug('%r', locals())
Expand All @@ -86,22 +81,14 @@ def open(bucket_id, key_id, mode,
fileobj = SeekableBufferedInputBase(
bucket_id,
key_id,
session=session,
profile_name=profile_name,
endpoint_url=endpoint_url,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
resource=resource
)
elif mode == WRITE_BINARY:
fileobj = BufferedOutputBase(
bucket_id,
key_id,
session=session,
profile_name=profile_name,
resource=resource,
min_part_size=min_part_size,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url=endpoint_url,
multipart_upload_kwargs=multipart_upload_kwargs,
)
else:
Expand Down Expand Up @@ -323,20 +310,11 @@ def __init__(self,
key,
buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE,
session=None,
profile_name=None,
aws_access_key_id=None,
aws_secret_access_key=None,
endpoint_url=None,
resource=None,
):
if session is None:
session = boto3.Session(
profile_name=profile_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
s3 = session.resource('s3', endpoint_url=endpoint_url)
self._object = s3.Object(bucket, key)
if resource is None:
resource = boto3.Session().resource('s3')
self._object = resource.Object(bucket, key)
self._raw_reader = SeekableRawReader(self._object)
self._content_length = self._object.content_length
self._current_pos = 0
Expand Down Expand Up @@ -399,34 +377,25 @@ class BufferedOutputBase(io.BufferedIOBase):
def __init__(self,
bucket,
key,
session=None,
profile_name=None,
aws_access_key_id=None,
aws_secret_access_key=None,
endpoint_url=None,
resource=None,
min_part_size=DEFAULT_MIN_PART_SIZE,
multipart_upload_kwargs=None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")

if session is None:
session = boto3.Session(
profile_name=profile_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
s3 = session.resource('s3', endpoint_url=endpoint_url)
if resource is None:
resource = boto3.Session().resource('s3')

#
# https://stackoverflow.com/questions/26871884/how-can-i-easily-determine-if-a-boto-3-s3-bucket-resource-exists
#
try:
s3.meta.client.head_bucket(Bucket=bucket)
resource.meta.client.head_bucket(Bucket=bucket)
except botocore.client.ClientError:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket)
self._object = s3.Object(bucket, key)
self._object = resource.Object(bucket, key)
self._min_part_size = min_part_size

if multipart_upload_kwargs is None:
Expand Down
23 changes: 18 additions & 5 deletions smart_open/smart_open_lib.py
Expand Up @@ -374,15 +374,28 @@ def _open_binary_stream(uri, mode, kwargs):
return fobj, filename
elif parsed_uri.scheme in ("s3", "s3n", 's3u'):
endpoint_url = None if kwargs.host is None else 'http://' + kwargs.host

#
# TODO: this shouldn't be smart_open's responsibility.
#
if kwargs.s3_session is None:
import boto3
session = boto3.Session(
profile_name=kwargs.profile_name,
aws_access_key_id=parsed_uri.access_id,
aws_secret_access_key=parsed_uri.access_secret,
)
resource = session.resource('s3', endpoint_url=endpoint_url)
else:
assert parsed_uri.access_id is None, 'aws_access_key_id conflicts with session'
assert parsed_uri.access_secret is None, 'aws_secret_access_key conflicts with session'
resource = kwargs.s3_session.resource('s3', endpoint_url=endpoint_url)

fobj = smart_open_s3.open(
parsed_uri.bucket_id,
parsed_uri.key_id,
mode,
session=kwargs.s3_session,
profile_name=kwargs.profile_name,
aws_access_key_id=parsed_uri.access_id,
aws_secret_access_key=parsed_uri.access_secret,
endpoint_url=endpoint_url,
resource=resource,
min_part_size=kwargs.s3_min_part_size,
multipart_upload_kwargs=kwargs.s3_upload,
)
Expand Down
24 changes: 11 additions & 13 deletions smart_open/tests/test_smart_open.py
Expand Up @@ -1029,15 +1029,14 @@ def test_gzip_write_mode(self):
s3.create_bucket(Bucket='bucket')
uri = smart_open_lib._parse_uri("s3://bucket/key.gz")

resource = mock.Mock()
session = mock.Mock(resource=mock.Mock(return_value=resource))

with mock.patch('smart_open.smart_open_s3.open') as mock_open:
smart_open.smart_open("s3://bucket/key.gz", "wb")
smart_open.smart_open("s3://bucket/key.gz", "wb", s3_session=session)
mock_open.assert_called_with(
'bucket', 'key.gz', 'wb',
aws_access_key_id=None,
aws_secret_access_key=None,
session=None,
profile_name=None,
endpoint_url=None,
resource=resource,
min_part_size=smart_open.smart_open_s3.DEFAULT_MIN_PART_SIZE,
multipart_upload_kwargs=None,
)
Expand All @@ -1053,17 +1052,16 @@ def test_gzip_read_mode(self):
with smart_open.smart_open(key, "wb") as fout:
fout.write(text.encode("utf-8"))

resource = mock.Mock()
session = mock.Mock(resource=mock.Mock(return_value=resource))

with mock.patch('smart_open.smart_open_s3.open') as mock_open:
smart_open.smart_open(key, "r")
smart_open.smart_open(key, "r", s3_session=session)
mock_open.assert_called_with(
'bucket', 'key.gz', 'rb',
aws_access_key_id=None,
aws_secret_access_key=None,
session=None,
profile_name=None,
endpoint_url=None,
multipart_upload_kwargs=None,
resource=resource,
min_part_size=smart_open.smart_open_s3.DEFAULT_MIN_PART_SIZE,
multipart_upload_kwargs=None,
)

@mock_s3
Expand Down

0 comments on commit 4ce1dfb

Please sign in to comment.