-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
560 additions
and
500 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# coding: utf8 | ||
|
||
# Copyright 2014-2015 Vincent Jacques <vincent@vincent-jacques.net> | ||
|
||
import LowVoltage.policies as _pol | ||
# @todo Do we need a Connection class to use as a base for all connection decorators? It couls ease documenting the return type of make_connection. It could allow isinstance in clients. It could be more intuitive for clients. But with duck-typing it's not striclty necessary. | ||
from .signing import SigningConnection | ||
from .retrying import RetryingConnection | ||
from .completing import CompletingConnection | ||
from .waiting import WaitingConnection | ||
|
||
|
||
# @todo Consider using a builder pattern... as everywhere else in LowVoltage | ||
def make_connection( | ||
region, | ||
credentials, | ||
endpoint=None, | ||
retry_policy=_pol.ExponentialBackoffRetryPolicy(1, 2, 5), | ||
complete_batches=True, | ||
wait_for_tables=True, | ||
): | ||
"""Create a connection, using all decorators (RetryingConnection, CompletingConnection, WaitingConnection on top of a SigningConnection)""" | ||
# @todo Maybe allow injection of the Requests session to tweek low-level parameters (connection timeout, etc.)? | ||
|
||
if endpoint is None: | ||
endpoint = "https://dynamodb.{}.amazonaws.com/".format(region) | ||
connection = SigningConnection(region, credentials, endpoint) | ||
if retry_policy is not None: | ||
connection = RetryingConnection(connection, retry_policy) | ||
if complete_batches: | ||
connection = CompletingConnection(connection) | ||
if wait_for_tables: | ||
connection = WaitingConnection(connection) | ||
return connection |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
# coding: utf8 | ||
|
||
# Copyright 2014-2015 Vincent Jacques <vincent@vincent-jacques.net> | ||
|
||
import unittest | ||
|
||
import MockMockMock | ||
|
||
import LowVoltage | ||
from .signing import SigningConnection | ||
from .retrying import RetryingConnection | ||
import LowVoltage.exceptions as _exn | ||
import LowVoltage.policies as _pol | ||
|
||
|
||
class CompletingConnection(object): | ||
"""Connection decorator completing batch actions (UnprocessedKeys and UnprocessedItems)""" | ||
|
||
def __init__(self, connection): | ||
self.__connection = connection | ||
|
||
def request(self, action): | ||
r = self.__connection.request(action) | ||
if action.is_completable: | ||
next_action = action.get_completion_action(r) | ||
while next_action is not None: | ||
next_response = self.__connection.request(next_action) | ||
action.complete_response(r, next_response) | ||
next_action = action.get_completion_action(r) | ||
return r | ||
|
||
|
||
class CompletingConnectionUnitTests(unittest.TestCase): | ||
def setUp(self): | ||
self.mocks = MockMockMock.Engine() | ||
self.base_connection = self.mocks.create("base_connection") | ||
self.action = self.mocks.create("action") | ||
self.connection = CompletingConnection(self.base_connection.object) | ||
|
||
def tearDown(self): | ||
self.mocks.tearDown() | ||
|
||
def test_dont_complete_uncompletable_action(self): | ||
r = object() | ||
self.base_connection.expect.request(self.action.object).andReturn(r) | ||
self.action.expect.is_completable.andReturn(False) | ||
|
||
self.assertIs( | ||
self.connection.request(self.action.object), | ||
r | ||
) | ||
|
||
def test_try_to_complete_action(self): | ||
r = object() | ||
self.base_connection.expect.request(self.action.object).andReturn(r) | ||
self.action.expect.is_completable.andReturn(True) | ||
self.action.expect.get_completion_action(r).andReturn(None) | ||
|
||
self.assertIs( | ||
self.connection.request(self.action.object), | ||
r | ||
) | ||
|
||
def test_complete_action_once(self): | ||
r1 = object() | ||
self.base_connection.expect.request(self.action.object).andReturn(r1) | ||
self.action.expect.is_completable.andReturn(True) | ||
a2 = object() | ||
self.action.expect.get_completion_action(r1).andReturn(a2) | ||
r2 = object() | ||
self.base_connection.expect.request(a2).andReturn(r2) | ||
self.action.expect.complete_response(r1, r2) | ||
self.action.expect.get_completion_action(r1).andReturn(None) | ||
|
||
self.assertIs( | ||
self.connection.request(self.action.object), | ||
r1 | ||
) | ||
|
||
def test_complete_several_times(self): | ||
r1 = object() | ||
self.base_connection.expect.request(self.action.object).andReturn(r1) | ||
self.action.expect.is_completable.andReturn(True) | ||
a2 = object() | ||
self.action.expect.get_completion_action(r1).andReturn(a2) | ||
r2 = object() | ||
self.base_connection.expect.request(a2).andReturn(r2) | ||
self.action.expect.complete_response(r1, r2) | ||
a3 = object() | ||
self.action.expect.get_completion_action(r1).andReturn(a3) | ||
r3 = object() | ||
self.base_connection.expect.request(a3).andReturn(r3) | ||
self.action.expect.complete_response(r1, r3) | ||
a4 = object() | ||
self.action.expect.get_completion_action(r1).andReturn(a4) | ||
r4 = object() | ||
self.base_connection.expect.request(a4).andReturn(r4) | ||
self.action.expect.complete_response(r1, r4) | ||
self.action.expect.get_completion_action(r1).andReturn(None) | ||
|
||
self.assertIs( | ||
self.connection.request(self.action.object), | ||
r1 | ||
) | ||
|
||
|
||
class CompletingConnectionLocalIntegTests(unittest.TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
cls.base_connection = RetryingConnection(SigningConnection("us-west-2", _pol.StaticCredentials("DummyKey", "DummySecret"), "http://localhost:65432/"), _pol.ExponentialBackoffRetryPolicy(1, 2, 5)) | ||
cls.connection = CompletingConnection(cls.base_connection) | ||
|
||
def setUp(self): | ||
self.connection.request( | ||
LowVoltage.CreateTable("Aaa").hash_key("h", LowVoltage.STRING).provisioned_throughput(1, 2) | ||
) | ||
|
||
def tearDown(self): | ||
self.connection.request(LowVoltage.DeleteTable("Aaa")) | ||
|
||
def test_complete_batch_get(self): | ||
for i in range(100): | ||
self.connection.request(LowVoltage.PutItem("Aaa", {"h": unicode(i), "xs": "x" * 300000})) | ||
|
||
batch_get = LowVoltage.BatchGetItem().table("Aaa").keys({"h": unicode(i)} for i in range(100)) | ||
|
||
r = self.base_connection.request(batch_get) | ||
self.assertEqual(len(r.unprocessed_keys["Aaa"]["Keys"]), 45) | ||
self.assertEqual(len(r.responses["Aaa"]), 55) | ||
|
||
r = self.connection.request(batch_get) | ||
self.assertEqual(r.unprocessed_keys, {}) | ||
self.assertEqual(len(r.responses["Aaa"]), 100) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
# coding: utf8 | ||
|
||
# Copyright 2014-2015 Vincent Jacques <vincent@vincent-jacques.net> | ||
|
||
import time | ||
import unittest | ||
|
||
import MockMockMock | ||
|
||
from LowVoltage.actions.action import Action | ||
from .signing import SigningConnection | ||
import LowVoltage.exceptions as _exn | ||
import LowVoltage.policies as _pol | ||
|
||
|
||
class RetryingConnection(object): | ||
"""Connection decorator retrying failed requests (due to network, server and throtling errors)""" | ||
|
||
def __init__(self, connection, retry_policy): | ||
self.__connection = connection | ||
self.__retry_policy = retry_policy | ||
|
||
def request(self, action): | ||
errors = 0 | ||
while True: | ||
try: | ||
return self.__connection.request(action) | ||
except _exn.Error as e: | ||
errors += 1 | ||
delay = self.__retry_policy.get_retry_delay_on_exception(action, e, errors) | ||
if delay is None: | ||
raise | ||
else: | ||
time.sleep(delay) | ||
|
||
|
||
class RetryingConnectionUnitTests(unittest.TestCase): | ||
def setUp(self): | ||
self.mocks = MockMockMock.Engine() | ||
self.policy = self.mocks.create("policy") | ||
self.basic_connection = self.mocks.create("connection") | ||
self.connection = RetryingConnection(self.basic_connection.object, self.policy.object) | ||
self.action = object() | ||
self.response = object() | ||
|
||
def tearDown(self): | ||
self.mocks.tearDown() | ||
|
||
def test_unknown_exception_is_passed_through(self): | ||
exception = Exception() | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
with self.assertRaises(Exception) as catcher: | ||
self.connection.request(self.action) | ||
self.assertIs(catcher.exception, exception) | ||
|
||
def test_known_error_is_retried_until_success(self): | ||
exception = _exn.Error() | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 1).andReturn(0) | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 2).andReturn(0) | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 3).andReturn(0) | ||
self.basic_connection.expect.request(self.action).andReturn(self.response) | ||
self.assertIs(self.connection.request(self.action), self.response) | ||
|
||
def test_known_error_is_retried_then_raised(self): | ||
exception = _exn.Error() | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 1).andReturn(0) | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 2).andReturn(0) | ||
self.basic_connection.expect.request(self.action).andRaise(exception) | ||
self.policy.expect.get_retry_delay_on_exception(self.action, exception, 3).andReturn(None) | ||
with self.assertRaises(_exn.Error) as catcher: | ||
self.connection.request(self.action) | ||
self.assertIs(catcher.exception, exception) | ||
|
||
|
||
class RetryingConnectionLocalIntegTests(unittest.TestCase): | ||
class TestAction(Action): | ||
class Result(object): | ||
def __init__(self, **kwds): | ||
self.kwds = kwds | ||
|
||
def __init__(self, name, payload={}): | ||
Action.__init__(self, name) | ||
self.__payload = payload | ||
|
||
def build(self): | ||
return self.__payload | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.connection = RetryingConnection(SigningConnection("us-west-2", _pol.StaticCredentials("DummyKey", "DummySecret"), "http://localhost:65432/"), _pol.ExponentialBackoffRetryPolicy(1, 2, 5)) | ||
|
||
def test_request(self): | ||
r = self.connection.request(self.TestAction("ListTables")) | ||
self.assertIsInstance(r, self.TestAction.Result) | ||
self.assertEqual(r.kwds, {"TableNames": []}) | ||
|
||
def test_client_error(self): | ||
with self.assertRaises(_exn.InvalidAction): | ||
self.connection.request(self.TestAction("UnexistingAction")) | ||
|
||
def test_network_error(self): | ||
connection = RetryingConnection(SigningConnection("us-west-2", _pol.StaticCredentials("DummyKey", "DummySecret"), "http://localhost:65555/"), _pol.ExponentialBackoffRetryPolicy(0, 1, 4)) | ||
with self.assertRaises(_exn.NetworkError): | ||
connection.request(self.TestAction("ListTables")) | ||
|
||
def test_unexisting_table(self): | ||
with self.assertRaises(_exn.ResourceNotFoundException): | ||
self.connection.request(self.TestAction("GetItem", {"TableName": "Bbb"})) |
Oops, something went wrong.