Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version_id transport parameter for fetching a specific S3 object version #325

Merged
merged 47 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ceac5e8
Added file receipt by id.
Jun 16, 2019
7dacd24
Correction after the review of PR 325 06/16/2019.
Jun 16, 2019
b83cbd1
Into the _get() function added exception handling botocore.exceptions…
Jun 17, 2019
21793b9
Check for missing version moved outside exception block
Jun 22, 2019
282c27d
Added new test_s3_version.py test file.
Jun 22, 2019
0f9ecf6
Added description of new parameter version_id.
Jun 22, 2019
bb9275e
Added example to use version_id.
Jun 22, 2019
355856c
Change indent.
Jun 22, 2019
0a92ee6
Rename test data
Jun 25, 2019
09783e7
Test data initialization moved to method setUp
Jun 25, 2019
3ceaaac
PEP8 code editing
Jun 25, 2019
5145974
Update README.rst
interpolatio Jun 25, 2019
6a9bbd1
Update README.rst
interpolatio Jun 25, 2019
397b38d
Update README.rst
interpolatio Jun 25, 2019
d471e7f
Update test_bad_id
Jun 25, 2019
9a8bc96
Merge branch 'version_id' of https://github.com/interpolatio/smart_op…
Jun 25, 2019
a0bef04
Added creation of an exception when passing the "write" parameter and…
Jun 26, 2019
27343cf
Added top level function.
Jun 27, 2019
f418170
Update README.rst
interpolatio Jun 27, 2019
34665f0
Merge branch 'version_id' of https://github.com/interpolatio/smart_op…
Jun 27, 2019
345d8c6
Update help s3 object.
Jun 27, 2019
294d4cf
Fixed incorrect BufferedInputBase operation detected by the test "tes…
Jun 27, 2019
d006386
Updated test descriptions
Jun 28, 2019
486b2b8
Added test:
Jun 30, 2019
0ee62be
Update tests. Delete old test_top_level.
Jun 30, 2019
0617638
In high function open, support version_id added.
Jun 30, 2019
4218458
Tests updated to test the function of the top-level open.
Jul 4, 2019
317f692
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 4, 2019
3c9158e
Delete version_id param in function open top-level.
Jul 7, 2019
83b6257
Add example for use parametr 'version_id'.
Jul 7, 2019
a98a626
Add example version_id for readmi.rst.
Jul 7, 2019
20eb205
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 7, 2019
78881b0
Merge branch 'master' into version_id
mpenkov Jul 8, 2019
eefb086
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 8, 2019
202cd9a
Update READMI.rst:
Jul 8, 2019
040b87c
Update example version_id.
Jul 9, 2019
b9e20a9
Update README.rst
interpolatio Jul 9, 2019
ff004fc
Update README.rst
interpolatio Jul 9, 2019
e5eb307
Update README.rst
interpolatio Jul 9, 2019
bd05e93
fix README.rst to work with doctests
mpenkov Jul 12, 2019
a3b4ef2
Update test for PEP8 - length string < 110
Jul 14, 2019
662568b
Update example for PEP8 - length string < 110
Jul 14, 2019
e2e6435
git add integration-tests/test_version_id.py
mpenkov Jul 14, 2019
1fc2ceb
minor fixes to documentation
mpenkov Jul 14, 2019
8b36b4f
python -c 'help(smart_open)' > help.txt
mpenkov Jul 14, 2019
7d94968
clean up unit tests, remove duplication
mpenkov Jul 14, 2019
1dda9ee
fix wording
mpenkov Jul 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 25 additions & 14 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def open(
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
Expand Down Expand Up @@ -97,6 +98,8 @@ def open(
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
version_id: str, optional
Version of the object. If None, will fetch the most recent version.
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

"""
logger.debug('%r', locals())
Expand All @@ -112,6 +115,7 @@ def open(
fileobj = SeekableBufferedInputBase(
bucket_id,
key_id,
version_id=version_id,
buffer_size=buffer_size,
session=session,
resource_kwargs=resource_kwargs,
Expand All @@ -127,10 +131,21 @@ def open(
)
else:
assert False, 'unexpected mode: %r' % mode

return fileobj


def _get(s3_object, version=None, **kwargs):
if version is not None:
kwargs['VersionId'] = version
try:
return s3_object.get(**kwargs)
except botocore.client.ClientError as error:
raise IOError(str(error) + "\n" +
'%r does not exist in the bucket %r, version_id %r'
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
'or is forbidden for access' % (s3_object.key, s3_object.bucket_name, version)
)


class RawReader(object):
"""Read an S3 object."""
def __init__(self, s3_object):
Expand All @@ -147,9 +162,10 @@ def read(self, size=-1):
class SeekableRawReader(object):
"""Read an S3 object."""

def __init__(self, s3_object, content_length):
def __init__(self, s3_object, content_length, version_id=None):
self._object = s3_object
self._content_length = content_length
self._version_id = version_id
self.seek(0)

def seek(self, position):
Expand Down Expand Up @@ -177,7 +193,7 @@ def seek(self, position):
#
self._body = io.BytesIO()
else:
self._body = self._object.get(Range=range_string)['Body']
self._body = _get(self._object, self._version_id, Range=range_string)['Body']
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

def read(self, size=-1):
if self._position >= self._content_length:
Expand All @@ -191,7 +207,7 @@ def read(self, size=-1):


class BufferedInputBase(io.BufferedIOBase):
def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, session=None, resource_kwargs=None):
if session is None:
session = boto3.Session()
Expand All @@ -202,6 +218,7 @@ def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
self._object = s3.Object(bucket, key)
self._raw_reader = RawReader(self._object)
self._content_length = self._object.content_length
self._content_length = _get(self._object, self._version_id)['ContentLength']
self._current_pos = 0
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
self._eof = False
Expand Down Expand Up @@ -328,24 +345,18 @@ class SeekableBufferedInputBase(BufferedInputBase):

Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, session=None, resource_kwargs=None):
if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
s3 = session.resource('s3', **resource_kwargs)
self._object = s3.Object(bucket, key)
self._version_id = version_id
self._content_length = _get(self._object, self._version_id)['ContentLength']

try:
self._content_length = self._object.content_length
except botocore.client.ClientError:
raise ValueError(
'%r does not exist in the bucket %r, '
'or is forbidden for access' % (key, bucket)
)

self._raw_reader = SeekableRawReader(self._object, self._content_length)
self._raw_reader = SeekableRawReader(self._object, self._content_length, self._version_id)
self._current_pos = 0
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
self._eof = False
Expand Down
101 changes: 101 additions & 0 deletions smart_open/tests/test_s3_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
import gzip
import io
import logging
import os
import time
import unittest
import uuid
import warnings

import boto.s3.bucket
import boto3
import botocore.client
import mock
import moto
import six

import smart_open

BUCKET_NAME = 'test-smartopen-{}'.format(uuid.uuid4().hex)
KEY_NAME = 'test-key'
WRITE_KEY_NAME = 'test-write-key'
DISABLE_MOCKS = os.environ.get('SO_DISABLE_MOCKS') == "1"


logger = logging.getLogger(__name__)


def maybe_mock_s3(func):
if DISABLE_MOCKS:
return func
else:
return moto.mock_s3(func)


@maybe_mock_s3
def setUpModule():
'''Called once by unittest when initializing this module. Sets up the
test S3 bucket.

'''
boto3.resource('s3').create_bucket(Bucket=BUCKET_NAME)
boto3.resource('s3').BucketVersioning(BUCKET_NAME).enable()


def put_to_bucket(contents, num_attempts=12, sleep_time=5):
# fake (or not) connection, bucket and key
logger.debug('%r', locals())

#
# In real life, it can take a few seconds for the bucket to become ready.
# If we try to write to the key while the bucket while it isn't ready, we
# will get a ClientError: NoSuchBucket.
#
for attempt in range(num_attempts):
try:
boto3.resource('s3').Object(BUCKET_NAME, KEY_NAME).put(Body=contents)
return
except botocore.exceptions.ClientError as err:
logger.error('caught %r, retrying', err)
time.sleep(sleep_time)

assert False, 'failed to create bucket %s after %d attempts' % (BUCKET_NAME, num_attempts)


@maybe_mock_s3
class TestVersionId(unittest.TestCase):

def test_good_id(self):
"""Does version_id into s3 work correctly?"""
test_string_ver0 = u"String version 1.0".encode('utf8')
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
test_string_ver1 = u"String version 2.0".encode('utf8')
# write into key
with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout:
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
fout.write(test_string_ver0)
with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout:
fout.write(test_string_ver1)

versions = boto3.resource('s3').Bucket(BUCKET_NAME).object_versions.filter(Prefix=WRITE_KEY_NAME)
check_version = list(versions)[0].get()['VersionId']
with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, WRITE_KEY_NAME,check_version) as fin:
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
expected = fin.read()
self.assertEqual(expected, test_string_ver0)

def test_bad_id(self):
"""Does version_id exception into s3 work correctly?"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Does version_id exception into s3 work correctly?"""
"""Does passing an invalid version_id exception into the s3 submodule get handled correctly?"""

test_string_ver0 = u"String version 1.0".encode('utf8')
test_string_ver1 = u"String version 2.0".encode('utf8')
# write into key
with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout:
fout.write(test_string_ver0)
with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout:
fout.write(test_string_ver1)
versions = boto3.resource('s3').Bucket(BUCKET_NAME).object_versions.filter(Prefix=WRITE_KEY_NAME)
check_version = list(versions)[0].get()['VersionId']

with self.assertRaises(IOError):
smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb', version_id=check_version+check_version)
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == '__main__':
unittest.main()