Permalink
Browse files

Add test coverage to streaming

Prevents future issues like #16
  • Loading branch information...
1 parent 68fe137 commit d88bfeca8a21d4ed1816ee885a0da49085259b1a @lericson committed Dec 24, 2012
Showing with 80 additions and 32 deletions.
  1. +3 −9 tests/__init__.py
  2. +0 −23 tests/test_bucket.py
  3. +77 −0 tests/test_streaming.py
View
@@ -11,7 +11,6 @@
from cStringIO import StringIO as BytesIO
import simples3
-import simples3.streaming
from simples3.utils import rfc822_fmtdate
# httplib.HTTPMessage is useless for mocking contexts, use own
@@ -69,11 +68,11 @@ def http_request(self, req):
# TODO Will need a second mock handler when adding support for HTTPS
-class MockBucket(simples3.S3Bucket):
+class MockBucketMixin(object):
def __init__(self, *a, **k):
self.mock_responses = []
self.mock_requests = []
- super(MockBucket, self).__init__(*a, **k)
+ super(MockBucketMixin, self).__init__(*a, **k)
def build_opener(self):
mockhttp = MockHTTPHandler(self.mock_responses, self.mock_requests)
@@ -95,7 +94,7 @@ def mock_reset(self):
self.mock_responses[:] = []
self.mock_requests[:] = []
-class MockStreamingBucket(MockBucket, simples3.streaming.StreamingS3Bucket):
+class MockBucket(MockBucketMixin, simples3.S3Bucket):
pass
g = type("Globals", (object,), {})()
@@ -106,14 +105,9 @@ def setup_package():
access_key="0PN5J17HBGZHT7JJ3X82",
secret_key="uV3F3YluFJax1cknvbcGwgjvx4QpvB+leU8dUj2o",
base_url="http://johnsmith.s3.amazonaws.com")
- g.streaming_bucket = MockStreamingBucket("johnsmith",
- access_key="0PN5J17HBGZHT7JJ3X82",
- secret_key="uV3F3YluFJax1cknvbcGwgjvx4QpvB+leU8dUj2o",
- base_url="http://johnsmith.s3.amazonaws.com")
def teardown_package():
del g.bucket
- del g.streaming_bucket
def H(ctype, *hpairs):
n = datetime.datetime.now()
View
@@ -23,13 +23,10 @@ def test_rfc822():
class S3BucketTestCase(unittest.TestCase):
def setUp(self):
g.bucket.mock_reset()
- g.streaming_bucket.mock_reset()
def tearDown(self):
if g.bucket.mock_responses:
raise RuntimeError("test run without exhausting mock_responses")
- if g.streaming_bucket.mock_responses:
- raise RuntimeError("test run without exhausting mock_responses")
class MiscTests(S3BucketTestCase):
def test_str(self):
@@ -148,33 +145,13 @@ def _put_contents(self, key, contents):
g.bucket[key] = contents
self._verify_headers(contents, g.bucket.mock_requests[-1].headers)
- def _put_file_contents(self, key, contents):
- g.streaming_bucket.add_resp("/%s" % key, g.H("application/xml"), "OK!")
-
- try:
- fp = StringIO.StringIO()
- fp.write(contents)
- g.streaming_bucket.put_file(key, fp, size=len(contents))
- finally:
- fp.close()
-
- self._verify_headers(contents,
- g.streaming_bucket.mock_requests[-1].headers)
-
def test_put(self):
self._put_contents("foo.txt", "hello")
def test_put_multiple(self):
self._put_contents("bar.txt", "hi mom, how are you")
self._put_contents("foo.txt", "hello")
- def test_put_file(self):
- self._put_file_contents("foo.txt", "hello")
-
- def test_put_file_multiple(self):
- self._put_file_contents("bar.txt", "hi mom, how are you")
- self._put_file_contents("foo.txt", "hello")
-
def test_put_s3file(self):
g.bucket.add_resp("/foo.txt", g.H("application/xml"), "OK!")
g.bucket["foo.txt"] = simples3.S3File("hello")
View
@@ -0,0 +1,77 @@
+from __future__ import with_statement
+
+import StringIO
+
+from nose.tools import eq_
+
+from simples3 import streaming
+from simples3.utils import aws_md5
+from tests import MockBucketMixin, H
+
+class StreamingMockBucket(MockBucketMixin, streaming.StreamingS3Bucket):
+ pass
+
+def _verify_headers(headers, contents):
+ assert "Content-length" in headers
+ assert str(len(contents)) == headers['Content-length']
+ assert "Content-type" in headers
+ assert "Content-md5" in headers
+ content_md5 = aws_md5(contents.encode("ascii"))
+ assert content_md5 == headers['Content-md5']
+ assert "Authorization" in headers
+
+def _put_contents(bucket, key, contents):
+ bucket.add_resp("/%s" % key, H("application/xml"), "OK!")
+ bucket[key] = contents
+ _verify_headers(bucket.mock_requests[-1].headers, contents)
+
+def _put_file_contents(bucket, key, contents):
+ bucket.add_resp("/%s" % key, H("application/xml"), "OK!")
+
+ fp = StringIO.StringIO(contents)
+ try:
+ bucket.put_file(key, fp, size=len(contents))
+ finally:
+ fp.close()
+
+ _verify_headers(bucket.mock_requests[-1].headers, contents)
+
+def _test_put(bucket):
+ L = []
+ bucket.add_resp("/test.py", H("application/xml"), "OK!")
+ with open(__file__, 'rb') as fp:
+ bucket.put_file("test.py", fp, progress=lambda *a: L.append(a))
+ for total, curr, read in L:
+ if read == 0:
+ eq_(total, curr)
+ else:
+ assert curr >= read
+ assert L[-1][2] == 0, 'Last progress() has read=0'
+
+def _test_put_multiple(bucket):
+ _put_contents(bucket, "bar.txt", "hi mom, how are you")
+ _put_contents(bucket, "foo.txt", "hello")
+
+def _test_put_file(bucket):
+ _put_file_contents(bucket, "foo.txt", "hello")
+
+def _test_put_file_multiple(bucket):
+ _put_file_contents(bucket, "bar.txt", "hi mom, how are you")
+ _put_file_contents(bucket, "foo.txt", "hello")
+
+def _streaming_test_iter(bucket):
+ # yield lambda: _test_put(bucket)
+ yield lambda: _test_put_multiple(bucket)
+ yield lambda: _test_put_file(bucket)
+ yield lambda: _test_put_file_multiple(bucket)
+
+def test_streaming():
+ bucket = StreamingMockBucket("johnsmith",
+ access_key="0PN5J17HBGZHT7JJ3X82",
+ secret_key="uV3F3YluFJax1cknvbcGwgjvx4QpvB+leU8dUj2o",
+ base_url="http://johnsmith.s3.amazonaws.com")
+ for f in _streaming_test_iter(bucket):
+ bucket.mock_reset()
+ yield f
+ if bucket.mock_responses:
+ raise RuntimeError("test run without exhausting mock_responses")

0 comments on commit d88bfec

Please sign in to comment.