Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version_id transport parameter for fetching a specific S3 object version #325

Merged
merged 47 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ceac5e8
Added file receipt by id.
Jun 16, 2019
7dacd24
Correction after the review of PR 325 06/16/2019.
Jun 16, 2019
b83cbd1
Into the _get() function added exception handling botocore.exceptions…
Jun 17, 2019
21793b9
Check for missing version moved outside exception block
Jun 22, 2019
282c27d
Added new test_s3_version.py test file.
Jun 22, 2019
0f9ecf6
Added description of new parameter version_id.
Jun 22, 2019
bb9275e
Added example to use version_id.
Jun 22, 2019
355856c
Change indent.
Jun 22, 2019
0a92ee6
Rename test data
Jun 25, 2019
09783e7
Test data initialization moved to method setUp
Jun 25, 2019
3ceaaac
PEP8 code editing
Jun 25, 2019
5145974
Update README.rst
interpolatio Jun 25, 2019
6a9bbd1
Update README.rst
interpolatio Jun 25, 2019
397b38d
Update README.rst
interpolatio Jun 25, 2019
d471e7f
Update test_bad_id
Jun 25, 2019
9a8bc96
Merge branch 'version_id' of https://github.com/interpolatio/smart_op…
Jun 25, 2019
a0bef04
Added creation of an exception when passing the "write" parameter and…
Jun 26, 2019
27343cf
Added top level function.
Jun 27, 2019
f418170
Update README.rst
interpolatio Jun 27, 2019
34665f0
Merge branch 'version_id' of https://github.com/interpolatio/smart_op…
Jun 27, 2019
345d8c6
Update help s3 object.
Jun 27, 2019
294d4cf
Fixed incorrect BufferedInputBase operation detected by the test "tes…
Jun 27, 2019
d006386
Updated test descriptions
Jun 28, 2019
486b2b8
Added test:
Jun 30, 2019
0ee62be
Update tests. Delete old test_top_level.
Jun 30, 2019
0617638
In high function open, support version_id added.
Jun 30, 2019
4218458
Tests updated to test the function of the top-level open.
Jul 4, 2019
317f692
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 4, 2019
3c9158e
Delete version_id param in function open top-level.
Jul 7, 2019
83b6257
Add example for use parametr 'version_id'.
Jul 7, 2019
a98a626
Add example version_id for readmi.rst.
Jul 7, 2019
20eb205
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 7, 2019
78881b0
Merge branch 'master' into version_id
mpenkov Jul 8, 2019
eefb086
Merge remote-tracking branch 'origin/version_id' into version_id
Jul 8, 2019
202cd9a
Update READMI.rst:
Jul 8, 2019
040b87c
Update example version_id.
Jul 9, 2019
b9e20a9
Update README.rst
interpolatio Jul 9, 2019
ff004fc
Update README.rst
interpolatio Jul 9, 2019
e5eb307
Update README.rst
interpolatio Jul 9, 2019
bd05e93
fix README.rst to work with doctests
mpenkov Jul 12, 2019
a3b4ef2
Update test for PEP8 - length string < 110
Jul 14, 2019
662568b
Update example for PEP8 - length string < 110
Jul 14, 2019
e2e6435
git add integration-tests/test_version_id.py
mpenkov Jul 14, 2019
1fc2ceb
minor fixes to documentation
mpenkov Jul 14, 2019
8b36b4f
python -c 'help(smart_open)' > help.txt
mpenkov Jul 14, 2019
7d94968
clean up unit tests, remove duplication
mpenkov Jul 14, 2019
1dda9ee
fix wording
mpenkov Jul 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 29 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ How?
... break
'<!doctype html>\n'

Other examples of URIs that ``smart_open`` accepts::
Other examples of URLs that ``smart_open`` accepts::

s3://my_bucket/my_key
s3://my_key:my_secret@my_bucket/my_key
Expand Down Expand Up @@ -274,6 +274,31 @@ Since going over all (or select) keys in an S3 bucket is a very common operation
annual/monthly_rain/2011.monthly_rain.nc 13
annual/monthly_rain/2012.monthly_rain.nc 13

Specific S3 object version
--------------------------

The ``version_id`` transport parameter enables you to get the desired version of the object from an S3 bucket.

.. Important::
S3 disables version control by default.
Before using the ``version_id`` parameter, you must explicitly enable version control for your S3 bucket.
Read https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html for details.

.. code-block:: python

>>> # Read previous versions of an object in a versioned bucket
>>> bucket, key = 'smart-open-versioned', 'demo.txt'
>>> versions = [v.id for v in boto3.resource('s3').Bucket(bucket).object_versions.filter(Prefix=key)]
>>> for v in versions:
... with open('s3://%s/%s' % (bucket, key), transport_params={'version_id': v}) as fin:
... print(v, repr(fin.read()))
KiQpZPsKI5Dm2oJZy_RzskTOtl2snjBg 'second version\n'
N0GJcE3TQCKtkaS.gF.MUBZS85Gs3hzn 'first version\n'

>>> # If you don't specify a version, smart_open will read the most recent one
>>> with open('s3://%s/%s' % (bucket, key)) as fin:
... print(repr(fin.read()))
'second version\n'

File-like Binary Streams
------------------------
Expand All @@ -287,11 +312,11 @@ This is useful when you already have a `binary file <https://docs.python.org/3/g
>>> import io, gzip
>>>
>>> # Prepare some gzipped binary data in memory, as an example.
>>> # Note that any binary file will do; we're using BytesIO here for simplicity.
>>> # Any binary file will do; we're using BytesIO here for simplicity.
>>> buf = io.BytesIO()
>>> with gzip.GzipFile(fileobj=buf, mode='w') as fout:
... fout.write(b'this is a bytestring')
>>> buf.seek(0)
... _ = fout.write(b'this is a bytestring')
>>> _ = buf.seek(0)
>>>
>>> # Use case starts here.
>>> buf.name = 'file.gz' # add a .name attribute so smart_open knows what compressor to use
Expand Down
4 changes: 3 additions & 1 deletion help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ FUNCTIONS
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
version_id: str, optional
Version of the object, used when reading object. If None, will fetch the most recent version.

HTTP (for details, see :mod:`smart_open.http` and :func:`smart_open.http.open`):

Expand Down Expand Up @@ -264,6 +266,6 @@ VERSION
1.8.4

FILE
/home/misha/git/smart_open/smart_open/__init__.py
/Users/misha/git/smart_open/smart_open/__init__.py


41 changes: 41 additions & 0 deletions integration-tests/test_version_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Tests the version_id transport parameter for S3 against real S3."""

import boto3
from smart_open import open

BUCKET, KEY = 'smart-open-versioned', 'demo.txt'
"""Our have a public-readable bucket with a versioned object."""

URL = 's3://%s/%s' % (BUCKET, KEY)


def assert_equal(a, b):
assert a == b, '%r != %r' % (a, b)


def main():
versions = [
v.id for v in boto3.resource('s3').Bucket(BUCKET).object_versions.filter(Prefix=KEY)
]
expected_versions = [
'KiQpZPsKI5Dm2oJZy_RzskTOtl2snjBg',
'N0GJcE3TQCKtkaS.gF.MUBZS85Gs3hzn',
]
assert_equal(versions, expected_versions)

contents = [
open(URL, transport_params={'version_id': v}).read()
for v in versions
]
expected_contents = ['second version\n', 'first version\n']
assert_equal(contents, expected_contents)

with open(URL) as fin:
most_recent_contents = fin.read()
assert_equal(most_recent_contents, expected_contents[0])

print('OK')


if __name__ == '__main__':
main()
44 changes: 30 additions & 14 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def open(
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
Expand Down Expand Up @@ -97,6 +98,8 @@ def open(
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
version_id: str, optional
Version of the object, used when reading object. If None, will fetch the most recent version.

"""
logger.debug('%r', locals())
Expand All @@ -108,10 +111,14 @@ def open(
if multipart_upload_kwargs is None:
multipart_upload_kwargs = {}

if (mode == WRITE_BINARY) and (version_id is not None):
raise ValueError("version_id must be None when writing")

if mode == READ_BINARY:
fileobj = SeekableBufferedInputBase(
bucket_id,
key_id,
version_id=version_id,
buffer_size=buffer_size,
session=session,
resource_kwargs=resource_kwargs,
Expand All @@ -127,10 +134,22 @@ def open(
)
else:
assert False, 'unexpected mode: %r' % mode

return fileobj


def _get(s3_object, version=None, **kwargs):
if version is not None:
kwargs['VersionId'] = version
try:
return s3_object.get(**kwargs)
except botocore.client.ClientError as error:
raise IOError(
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
'unable to access bucket: %r key: %r version: %r error: %s' % (
s3_object.bucket_name, s3_object.key, version, error
)
)


class RawReader(object):
"""Read an S3 object."""
def __init__(self, s3_object):
Expand All @@ -147,9 +166,10 @@ def read(self, size=-1):
class SeekableRawReader(object):
"""Read an S3 object."""

def __init__(self, s3_object, content_length):
def __init__(self, s3_object, content_length, version_id=None):
self._object = s3_object
self._content_length = content_length
self._version_id = version_id
self.seek(0)

def seek(self, position):
Expand Down Expand Up @@ -177,7 +197,7 @@ def seek(self, position):
#
self._body = io.BytesIO()
else:
self._body = self._object.get(Range=range_string)['Body']
self._body = _get(self._object, self._version_id, Range=range_string)['Body']
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

def read(self, size=-1):
if self._position >= self._content_length:
Expand All @@ -191,7 +211,7 @@ def read(self, size=-1):


class BufferedInputBase(io.BufferedIOBase):
def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, session=None, resource_kwargs=None):
if session is None:
session = boto3.Session()
Expand All @@ -200,8 +220,10 @@ def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,

s3 = session.resource('s3', **resource_kwargs)
self._object = s3.Object(bucket, key)
self._version_id = version_id
self._raw_reader = RawReader(self._object)
self._content_length = self._object.content_length
self._content_length = _get(self._object, self._version_id)['ContentLength']
self._current_pos = 0
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
self._eof = False
Expand Down Expand Up @@ -328,24 +350,18 @@ class SeekableBufferedInputBase(BufferedInputBase):

Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, session=None, resource_kwargs=None):
if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
s3 = session.resource('s3', **resource_kwargs)
self._object = s3.Object(bucket, key)
self._version_id = version_id
self._content_length = _get(self._object, self._version_id)['ContentLength']

try:
self._content_length = self._object.content_length
except botocore.client.ClientError:
raise ValueError(
'%r does not exist in the bucket %r, '
'or is forbidden for access' % (key, bucket)
)

self._raw_reader = SeekableRawReader(self._object, self._content_length)
self._raw_reader = SeekableRawReader(self._object, self._content_length, self._version_id)
self._current_pos = 0
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
self._eof = False
Expand Down
2 changes: 1 addition & 1 deletion smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def test_nonexisting_bucket(self):
fout.write(expected)

def test_read_nonexisting_key(self):
with self.assertRaises(ValueError):
with self.assertRaises(IOError):
with smart_open.s3.open(BUCKET_NAME, 'my_nonexisting_key', 'rb') as fin:
fin.read()

Expand Down
127 changes: 127 additions & 0 deletions smart_open/tests/test_s3_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
import logging
import os
import time
import unittest
import uuid

import boto3
import botocore.client
import moto

from smart_open import open


BUCKET_NAME = 'test-smartopen-{}'.format(uuid.uuid4().hex)
KEY_NAME = 'test-key'
DISABLE_MOCKS = os.environ.get('SO_DISABLE_MOCKS') == "1"


logger = logging.getLogger(__name__)


def maybe_mock_s3(func):
if DISABLE_MOCKS:
return func
else:
return moto.mock_s3(func)


@maybe_mock_s3
def setUpModule():
'''Called once by unittest when initializing this module. Sets up the
test S3 bucket.

'''
boto3.resource('s3').create_bucket(Bucket=BUCKET_NAME)
boto3.resource('s3').BucketVersioning(BUCKET_NAME).enable()


def put_to_bucket(contents, num_attempts=12, sleep_time=5):
# fake (or not) connection, bucket and key
logger.debug('%r', locals())

#
# In real life, it can take a few seconds for the bucket to become ready.
# If we try to write to the key while the bucket while it isn't ready, we
# will get a ClientError: NoSuchBucket.
#
for attempt in range(num_attempts):
try:
boto3.resource('s3').Object(BUCKET_NAME, KEY_NAME).put(Body=contents)
return
except botocore.exceptions.ClientError as err:
logger.error('caught %r, retrying', err)
time.sleep(sleep_time)

assert False, 'failed to create bucket %s after %d attempts' % (BUCKET_NAME, num_attempts)


@maybe_mock_s3
class TestVersionId(unittest.TestCase):

def setUp(self):
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
#
# Each run of this test reuses the BUCKET_NAME, but works with a
# different key for isolation.
#
self.key = 'test-write-key-{}'.format(uuid.uuid4().hex)
self.url = "s3://%s/%s" % (BUCKET_NAME, self.key)
self.test_ver1 = u"String version 1.0".encode('utf8')
self.test_ver2 = u"String version 2.0".encode('utf8')

with open(self.url, 'wb') as fout:
fout.write(self.test_ver1)
with open(self.url, 'wb') as fout:
fout.write(self.test_ver2)

#
# Earlist versions will be first in the list.
#
self.versions = [
v.id
for v in boto3.resource('s3').Bucket(BUCKET_NAME).object_versions.filter(Prefix=self.key)
]

def test_good_id(self):
"""Does passing the version_id parameter into the s3 submodule work correctly when reading?"""
params = {'version_id': self.versions[0]}
with open(self.url, mode='rb', transport_params=params) as fin:
expected = fin.read()
self.assertEqual(expected, self.test_ver1)

def test_bad_id(self):
"""Does passing an invalid version_id exception into the s3 submodule get handled correctly?"""
params = {'version_id': 'bad-version-does-not-exist'}
with self.assertRaises(IOError):
open(self.url, 'rb', transport_params=params)

def test_bad_mode(self):
"""Do we correctly handle non-None version when writing?"""
params = {'version_id': self.versions[0]}
with self.assertRaises(ValueError):
open(self.url, 'wb', transport_params=params)

def test_no_version(self):
"""Passing in no version at all gives the newest version of the file?"""
with open(self.url, 'rb') as fin:
expected = fin.read()
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(expected, self.test_ver2)

def test_newest_version(self):
"""Passing in the newest version explicitly gives the most recent content?"""
params = {'version_id': self.versions[1]}
with open(self.url, mode='rb', transport_params=params) as fin:
expected = fin.read()
self.assertEqual(expected, self.test_ver2)

def test_oldset_version(self):
"""Passing in the oldest version gives the oldest content?"""
params = {'version_id': self.versions[0]}
with open(self.url, mode='rb', transport_params=params) as fin:
expected = fin.read()
self.assertEqual(expected, self.test_ver1)


if __name__ == '__main__':
unittest.main()