Permalink
Browse files

Add tests, now at 100% test coverage \o/

  • Loading branch information...
1 parent f1d07fc commit e4a0edbe77f5824914531d6d4c298562e3b5d7e1 @lericson committed Sep 6, 2010
Showing with 177 additions and 38 deletions.
  1. +4 −0 setup.cfg
  2. +4 −6 simples3/bucket.py
  3. +9 −7 simples3/utils.py
  4. +4 −1 tests/__init__.py
  5. +145 −20 tests/test_bucket.py
  6. +11 −4 tests/test_urls.py
View
4 setup.cfg
@@ -1,2 +1,6 @@
[nosetests]
verbosity=2
+tests=tests,simples3/utils.py
+with-doctest=1
+#with-coverage=1
+#cover-package=simples3
View
10 simples3/bucket.py
@@ -331,15 +331,13 @@ def make_url_authed(self, key, expire=datetime.timedelta(minutes=5)):
def url_for(self, key, authenticated=False,
expire=datetime.timedelta(minutes=5)):
+ msg = "use %s instead of url_for(authenticated=%r)"
+ dep_cls = DeprecationWarning
if authenticated:
- warnings.warn(DeprecationWarning,
- "use make_url_authed instead "
- "of url_for(authenticated=True)")
+ warnings.warn(dep_cls(msg % ("make_url_authed", True)))
return self.make_url_authed(key, expire=expire)
else:
- warnings.warn(DeprecationWarning,
- "use make_url instead of "
- "url_for(authenticated=False)")
+ warnings.warn(dep_cls(msg % ("make_url", False)))
return self.make_url(key)
def put_bucket(self, config_xml=None, acl=None):
View
16 simples3/utils.py
@@ -81,10 +81,10 @@ def aws_md5(data):
"""Make an AWS-style MD5 hash (digest in base64).
>>> aws_md5("Hello!")
- 'lS0sVtBIWVgzZ0e83ZhZDQ=='
+ u'lS0sVtBIWVgzZ0e83ZhZDQ=='
>>> from StringIO import StringIO
>>> aws_md5(StringIO("Hello world!"))
- 'hvsmnRkNLIX24EaM7KQqIA=='
+ u'hvsmnRkNLIX24EaM7KQqIA=='
"""
hasher = hashlib.new("md5")
if hasattr(data, "read"):
@@ -112,7 +112,13 @@ def aws_urlquote(value):
return quote(value, "/")
def guess_mimetype(fn, default="application/octet-stream"):
- """Guess a mimetype from filename *fn*."""
+ """Guess a mimetype from filename *fn*.
+
+ >>> guess_mimetype("foo.txt")
+ 'text/plain'
+ >>> guess_mimetype("foo")
+ 'application/octet-stream'
+ """
if "." not in fn:
return default
bfn, ext = fn.lower().rsplit(".", 1)
@@ -177,7 +183,3 @@ def name(o):
if rv is not None:
break
return rv
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
View
5 tests/__init__.py
@@ -37,7 +37,7 @@ def __init__(self, fp, headers, url, code=None):
self.read = fp.read
self.readline = fp.readline
self.readlines = fp.readlines
- self.headers = headers
+ self.headers = MockHTTPMessage(headers)
self.url = url
self.code = code
@@ -83,6 +83,9 @@ def add_resp(self, path, headers, data, status="200 OK"):
msg = MockHTTPMessage(headers)
url = self.base_url + path
resp = MockHTTPResponse(fp, msg, url)
+ return self.add_resp_obj(resp, status=status)
+
+ def add_resp_obj(self, resp, status="200 OK"):
resp.code, resp.msg = status.split(" ", 1)
resp.code = int(resp.code)
self.mock_responses.append(resp)
View
165 tests/test_bucket.py
@@ -1,19 +1,25 @@
from __future__ import with_statement
+import urllib2
import unittest
import datetime
from nose.tools import eq_
import simples3
from simples3.utils import rfc822_fmt
-from tests import setup_package, teardown_package, g
+from tests import setup_package, teardown_package, MockHTTPResponse, g
setup_package, teardown_package
-class S3Tests(unittest.TestCase):
+class S3BucketTestCase(unittest.TestCase):
def setUp(self):
g.bucket.mock_reset()
+ def tearDown(self):
+ if g.bucket.mock_responses:
+ raise RuntimeError("test run without exhausting mock_responses")
+
+class MiscTests(S3BucketTestCase):
def test_str(self):
eq_(str(g.bucket), "<MockBucket johnsmith at "
"'http://johnsmith.s3.amazonaws.com'>")
@@ -24,6 +30,27 @@ def test_repr(self):
"access_key='0PN5J17HBGZHT7JJ3X82', "
"base_url='http://johnsmith.s3.amazonaws.com')")
+ def test_timeout_disabled(self):
+ g.bucket.timeout = 10.0
+ with g.bucket.timeout_disabled():
+ eq_(g.bucket.timeout, None)
+ eq_(g.bucket.timeout, 10.0)
+
+ def test_error_in_error(self):
+ # a hairy situation: an error arising during the parsing of an error.
+ def read(bs=4096):
+ raise urllib2.URLError("something something dark side")
+ FP = type("ErringFP", (object,),
+ {"read": read, "readline": read, "readlines": read})
+ url = g.bucket.base_url + "/foo.txt"
+ resp = MockHTTPResponse(FP(), {}, url, code=401)
+ g.bucket.add_resp_obj(resp, status="401 Something something")
+ try:
+ g.bucket.get("foo.txt")
+ except simples3.S3Error, e:
+ assert "read_error" in e.extra
+
+class GetTests(S3BucketTestCase):
def test_get(self):
dt = datetime.datetime(1990, 1, 31, 12, 34, 56)
headers = g.H("text/plain",
@@ -50,7 +77,31 @@ def test_get_not_found(self):
g.bucket.get("foo.txt")
except simples3.KeyNotFound, e:
eq_(e.key, "foo.txt")
+ eq_(str(e), "The specified key does not exist. (code=404, "
+ "key='foo.txt', filename='http://johnsmith.s3."
+ "amazonaws.com/foo.txt')")
+
+class InfoTests(S3BucketTestCase):
+ headers = g.H("text/plain",
+ ("x-amz-meta-foo", "bar"),
+ ("last-modified", "Mon, 06 Sep 2010 19:34:18 GMT"),
+ ("content-length", "1234"))
+
+ def test_info(self):
+ g.bucket.add_resp("/foo.txt", self.headers, "")
+ info = g.bucket.info("foo.txt")
+ eq_(info["mimetype"], "text/plain")
+ eq_(info["metadata"], {"foo": "bar"})
+
+ def test_mapping(self):
+ g.bucket.add_resp("/foo.txt", self.headers, "")
+ assert "foo.txt" in g.bucket
+
+ def test_mapping_not(self):
+ g.bucket.add_resp("/foobar.txt", self.headers, "", status="404 Blah")
+ assert "foobar.txt" not in g.bucket
+class PutTests(S3BucketTestCase):
def test_put(self):
g.bucket.add_resp("/foo.txt", g.H("application/xml"), "OK!")
g.bucket["foo.txt"] = "hello"
@@ -79,23 +130,97 @@ def test_put_retry(self):
eq_(req.get_selector(), "/foo.txt")
eq_(g.bucket.mock_responses, [])
- def test_info(self):
- headers = g.H("text/plain", ("x-amz-meta-foo", "bar"))
- g.bucket.add_resp("/foo.txt", headers, "")
- info = g.bucket.info("foo.txt")
- eq_(info["mimetype"], "text/plain")
- eq_(info["metadata"], {"foo": "bar"})
- g.bucket.add_resp("/foo.txt", headers, "")
- assert "foo.txt" in g.bucket
- g.bucket.add_resp("/foobar.txt", headers, "", status="404 Blah")
- assert "foobar.txt" not in g.bucket
+class DeleteTests(S3BucketTestCase):
+ def test_delete(self):
+ g.bucket.add_resp("/foo.txt", g.H("application/xml"), "<ok />")
+ assert g.bucket.delete("foo.txt")
+ req = g.bucket.mock_requests[-1]
+ eq_(req.get_method(), "DELETE")
- # TODO def test_delete(self): ...
- # TODO def test_copy(self): ...
- # TODO def test_listdir(self): ...
+ def test_delete_not_found(self):
+ g.bucket.add_resp("/foo.txt", g.H("application/xml"),
+ "<notfound />", status="404 Not Found")
+ assert not g.bucket.delete("foo.txt")
- def test_timeout_disabled(self):
- g.bucket.timeout = 10.0
- with g.bucket.timeout_disabled():
- eq_(g.bucket.timeout, None)
- eq_(g.bucket.timeout, 10.0)
+ def test_delete_other_error(self):
+ g.bucket.add_resp("/foo.txt", g.H("application/xml"),
+ "<wat />", status="403 What's Up?")
+ try:
+ g.bucket.delete("foo.txt")
+ except simples3.S3Error, e:
+ eq_(e.extra.get("key"), "foo.txt")
+ eq_(e.code, 403)
+ else:
+ assert False, "did not raise exception"
+
+class CopyTests(S3BucketTestCase):
+ def test_copy_metadata(self):
+ g.bucket.add_resp("/bar", g.H("application/xml"), "<ok />")
+ g.bucket.copy("foo/bar", "bar", acl="public")
+ req = g.bucket.mock_requests[-1]
+ eq_(req.get_method(), "PUT")
+ eq_(req.headers["X-amz-copy-source"], "foo/bar")
+ eq_(req.headers["X-amz-metadata-directive"], "COPY")
+
+ def test_copy_replace_metadata(self):
+ g.bucket.add_resp("/bar", g.H("application/xml"), "<ok />")
+ g.bucket.copy("foo/bar", "bar", metadata={}, acl="public")
+ req = g.bucket.mock_requests[-1]
+ eq_(req.headers["X-amz-metadata-directive"], "REPLACE")
+
+class ListDirTests(S3BucketTestCase):
+ def test_listdir(self):
+ xml = """
+<?xml version="1.0" encoding="UTF-8"?>
+<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01">
+ <Name>bucket</Name>
+ <Prefix/>
+ <Marker/>
+ <MaxKeys>1000</MaxKeys>
+ <IsTruncated>false</IsTruncated>
+ <Contents>
+ <Key>my-image.jpg</Key>
+ <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+ <ETag>&quot;fba9dede5f27731c9771645a39863328&quot;</ETag>
+ <Size>434234</Size>
+ </Contents>
+ <Contents>
+ <Key>my-third-image.jpg</Key>
+ <LastModified>2009-10-12T17:50:30.000Z</LastModified>
+ <ETag>&quot;1b2cf535f27731c974343645a3985328&quot;</ETag>
+ <Size>64994</Size>
+ </Contents>
+</ListBucketResult>
+""".lstrip()
+ g.bucket.add_resp("/", g.H("application/xml"), xml)
+ reftups = (
+ ('my-image.jpg', datetime.datetime(2009, 10, 12, 17, 50, 30),
+ '"fba9dede5f27731c9771645a39863328"', 434234),
+ ('my-third-image.jpg', datetime.datetime(2009, 10, 12, 17, 50, 30),
+ '"1b2cf535f27731c974343645a3985328"', 64994))
+ next_reftup = iter(reftups).next
+ for tup in g.bucket.listdir():
+ eq_(len(tup), 4)
+ eq_(tup, next_reftup())
+ key, mtype, etag, size = tup
+
+class ModifyBucketTests(S3BucketTestCase):
+ def test_bucket_put(self):
+ g.bucket.add_resp("/", g.H("application/xml"), "<ok />")
+ g.bucket.put_bucket(acl="private")
+ req = g.bucket.mock_requests[-1]
+ eq_(req.get_method(), "PUT")
+ eq_(req.headers["X-amz-acl"], "private")
+
+ def test_bucket_put_conf(self):
+ g.bucket.add_resp("/", g.H("application/xml"), "<ok />")
+ g.bucket.put_bucket("<etc>etc</etc>", acl="public")
+ req = g.bucket.mock_requests[-1]
+ eq_(req.get_method(), "PUT")
+ eq_(req.headers["X-amz-acl"], "public")
+
+ def test_bucket_delete(self):
+ g.bucket.add_resp("/", g.H("application/xml"), "<ok />")
+ g.bucket.delete_bucket()
+ req = g.bucket.mock_requests[-1]
+ eq_(req.get_method(), "DELETE")
View
15 tests/test_urls.py
@@ -1,11 +1,12 @@
import unittest
from tests import g
+from nose.tools import eq_
class URLGenerationTests(unittest.TestCase):
def test_make_url(self):
- self.assertEquals('http://johnsmith.s3.amazonaws.com/file.txt',
+ eq_('http://johnsmith.s3.amazonaws.com/file.txt',
g.bucket.make_url('file.txt'))
- self.assertEquals('http://johnsmith.s3.amazonaws.com/my%20key',
+ eq_('http://johnsmith.s3.amazonaws.com/my%20key',
g.bucket.make_url('my key'))
def test_make_url_authed(self):
@@ -14,5 +15,11 @@ def test_make_url_authed(self):
ou = ("http://johnsmith.s3.amazonaws.com/photos/puppy.jpg"
"?AWSAccessKeyId=0PN5J17HBGZHT7JJ3X82&Expires=1175139620&"
"Signature=rucSbH0yNEcP9oM2XNlouVI3BH4%3D")
- gu = g.bucket.make_url_authed("photos/puppy.jpg", expire=1175139620)
- self.assertEquals(ou, gu)
+ eq_(ou, g.bucket.make_url_authed("photos/puppy.jpg", expire=1175139620))
+
+ def test_deprecated_url_for(self):
+ eq_(g.bucket.make_url_authed("photos/puppy.jpg", expire=1175139620),
+ g.bucket.url_for("photos/puppy.jpg", authenticated=True,
+ expire=1175139620))
+ eq_(g.bucket.make_url("photos/puppy.jpg"),
+ g.bucket.url_for("photos/puppy.jpg"))

0 comments on commit e4a0edb

Please sign in to comment.