diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 000000000..2f3843f07 --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,4 @@ +PynamoDB is written and maintained by Jharrod LaFon and numerous contributors: + +* Craig Bruce +* Adam Chainz \ No newline at end of file diff --git a/README.rst b/README.rst index ed64050cc..c47b53897 100644 --- a/README.rst +++ b/README.rst @@ -2,30 +2,116 @@ PynamoDB ======== +.. image:: https://pypip.in/v/pynamodb/badge.png + :target: https://pypi.python.org/pypi/pynamodb/ + :alt: Latest Version .. image:: https://travis-ci.org/jlafon/PynamoDB.png?branch=devel - :target: https://travis-ci.org/jlafon/PynamoDB + :target: https://travis-ci.org/jlafon/PynamoDB .. image:: https://coveralls.io/repos/jlafon/PynamoDB/badge.png?branch=devel - :target: https://coveralls.io/r/jlafon/PynamoDB + :target: https://coveralls.io/r/jlafon/PynamoDB +.. image:: https://pypip.in/wheel/pynamodb/badge.png + :target: https://pypi.python.org/pypi/pynamodb/ +.. image:: https://pypip.in/license/pynamodb/badge.png + :target: https://pypi.python.org/pypi/pynamodb/ -A Pythonic interface for DynamoDB +A Pythonic interface for Amazon's `DynamoDB `_ that supports +Python 2 and 3. -A rich API that is compatible with Python 2 and Python 3. +DynamoDB is a great NoSQL service provided by Amazon, but the API is verbose. +PynamoDB presents you with a simple, elegant API. See documentation at http://pynamodb.readthedocs.org/ +Basic Usage +^^^^^^^^^^^ + +Create a model that describes your DynamoDB table. + +.. code-block:: python + + from pynamodb.models import Model + from pynamodb.attributes import UnicodeAttribute + + class UserModel(Model): + """ + A DynamoDB User + """ + table_name = 'dynamodb-user' + email = UnicodeAttribute(null=True) + first_name = UnicodeAttribute(range_key=True) + last_name = UnicodeAttribute(hash_key=True) + +Now, search your table for all users with a last name of 'Smith' and whose +first name begins with 'J': + +.. code-block:: python + + for user in UserModel.query('Smith', first_name__begins_with='J'): + print(user.first_name) + +Create a new user: + +.. code-block:: python + + user = UserModel('John', 'Denver') + user.save() + +Advanced Usage +^^^^^^^^^^^^^^ + +Wan't to use indexes? No problem: + +.. code-block:: python + + from pynamodb.models import Model + from pynamodb.indexes import GlobalSecondaryIndex, AllProjection + from pynamodb.attributes import NumberAttribute, UnicodeAttribute + + class ViewIndex(GlobalSecondaryIndex): + read_capacity_units = 2 + write_capacity_units = 1 + projection = AllProjection() + view = NumberAttribute(default=0, hash_key=True) + + class TestModel(Model): + table_name = 'TestModel' + forum = UnicodeAttribute(hash_key=True) + thread = UnicodeAttribute(range_key=True) + view = NumberAttribute(default=0) + view_index = ViewIndex() + +Now query the index for all items with 0 views: + +.. code-block:: python + + for item in TestModel.view_index.query(0): + print("Item queried from index: {0}".format(item)) + +It's really that simple. + Installation:: $ pip install pynamodb +or install the development version:: + + $ pip install git+https://github.com/jlafon/PynamoDB#egg=pynamodb + Features ======== -* Python 2 support * Python 3 support -* Fully tested +* Python 2 support +* An ORM-like interface with query and scan filters * Includes the entire DynamoDB API * Supports both unicode and binary DynamoDB attributes * Support for global secondary indexes, local secondary indexes, and batch operations * Provides iterators for working with queries, scans, that are automatically paginated * Automatic pagination for bulk operations * Complex queries + + +.. image:: https://d2weczhvl823v0.cloudfront.net/jlafon/pynamodb/trend.png + :alt: Bitdeli badge + :target: https://bitdeli.com/free + diff --git a/docs/api.rst b/docs/api.rst index 45f25748f..2975c8190 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -21,7 +21,7 @@ DynamoDB. I quickly realized that my go to library, `dynamodb-mapper `__, which itself won't support them. In fact, boto doesn't support -Python 3 either. +Python 3 either. If you want to know more, `I blogged about it `__. Installation ^^^^^^^^^^^^ @@ -30,6 +30,11 @@ Installation $ pip install pynamodb +.. note:: + + PynamoDB is still under development. More advanced features are available with the development version + of PynamoDB. You can install it directly from GitHub with pip: `pip install git+https://github.com/jlafon/PynamoDB#egg=pynamodb` + Getting Started ^^^^^^^^^^^^^^^ @@ -48,7 +53,7 @@ inherit from ``pynamodb.attributes.Attribute``. The most common attributes have Here is an example, using the same table structure as shown in `Amazon's DynamoDB Thread example `__. -:: +.. code-block:: python from pynamodb.models import Model from pynamodb.attributes import ( @@ -68,32 +73,157 @@ Here is an example, using the same table structure as shown in `Amazon's DynamoD The ``table_name`` class attribute is required, and tells the model which DynamoDB table to use. The ``forum_name`` attribute is specified as the hash key for this table with the ``hash_key`` argument. Specifying that an attribute is a range key is done -with the ``range_key`` attribute. +with the ``range_key`` attribute. You can specify a default value for any field, and `default` can even be a function. + +PynamoDB comes with several built in attribute types for convenience, which include the following: + +* UnicodeAttribute +* UnicodeSetAttribute +* NumberAttribute +* NumberSetAttribute +* BinaryAttribute +* BinarySetAttribute +* UTCDateTimeAttribute +* BooleanAttribute +* JSONAttribute + +All of these built in attributes handle serializing and deserializng themselves, in both Python 2 and Python 3. Creating the table ------------------ If your table doesn't already exist, you will have to create it. This can be done with easily: - >>> Thread.create_table(read_capacity_units=1, write_capacity_units=1, wait=True) +.. code-block:: python + + >>> if not Thread.exists(): + Thread.create_table(read_capacity_units=1, write_capacity_units=1, wait=True) The ``wait`` argument tells PynamoDB to wait until the table is ready for use before returning. -Connection ----------- -The `Connection` API provides a Pythonic wrapper over the Amazon DynamoDB API. All operations -are supported, and it provides a primitive starting point for the other two APIs. +Using the Model +^^^^^^^^^^^^^^^ + +Now that you've defined a model (referring to the example above), you can start interacting with +your DynamoDB table. You can create a new `Thread` item by calling the `Thread` constructor. + +Creating Items +-------------- +.. code-block:: python + + >>> thread_item = Thread('forum_name', 'forum_subject') + +The first two arguments are automatically assigned to the item's hash and range keys. You can +specify attributes during construction as well: + +.. code-block:: python + + >>> thread_item = Thread('forum_name', 'forum_subject', replies=10) + +The item won't be added to your DynamoDB table until you call save: + +.. code-block:: python + + >>> thread_item.save() + +If you want to retrieve an item that already exists in your table, you can do that with `get`: + +.. code-block:: python + + >>> thread_item = Thread.get('forum_name', 'forum_subject') + +If the item doesn't exist, `None` will be returned. + +Updating Items +-------------- + +You can update an item with the latest data from your table: + +.. code-block:: python + + >>> thread_item.refresh() + +Updates to table items are supported too, even atomic updates. Here is an example of +atomically updating the view count of an item: + +.. code-block:: python + + >>> thread_item.update_item('views', 1, action='add') + +Batch Operations +^^^^^^^^^^^^^^^^ + +Batch operations are supported using context managers, and iterators. The DynamoDB API has limits for each batch operation +that it supports, but PynamoDB removes the need implement your own grouping or pagination. Instead, it handles +pagination for you automatically. + +Batch Writes +------------- + +Here is an example using a context manager for a bulk write operation: + +.. code-block:: python + + with Thread.batch_write() as batch: + items = [TestModel('forum-{0}'.format(x), 'thread-{0}'.format(x)) for x in range(1000)] + for item in items: + batch.save(item) + +Batch Gets +------------- + +Here is an example using an iterator for retrieving items in bulk: + +.. code-block:: python + + item_keys = [('forum-{0}'.format(x), 'thread-{0}'.format(x)) for x in range(1000)] + for item in Thread.batch_get(item_keys): + print(item) + +Query Filters +------------- + +You can query items from your table using a simple syntax, similar to other Python ORMs: + +.. code-block:: python + + for item in Thread.query('ForumName', thread__begins_with='mygreatprefix'): + print("Query returned item {0}".format(item)) + +Query filters are translated from an ORM like syntax to DynamoDB API calls, and use +the following syntax: `attribute__operator=value`, where `attribute` is the name of an attribute +and `operator` can be one of the following: + + * eq + * le + * lt + * ge + * gt + * begins_with + * between + +Scan Filters +------------ -TableConnection ---------------- +Scan filters have the same syntax as Query filters, but support different operations (a consequence of the +DynamoDB API - not PynamoDB). The supported operators are: -The `TabelConnection` API is a small convenience layer built on the `Connection` that provides - all of the DynamoDB API for a given table. + * eq + * ne + * le + * lt + * gt + * not_null + * null + * contains + * not_contains + * begins_with + * between -Model ------ +You can even specify multiple filters: -This is where the fun begins, with bulk operations, query filters, context managers, and automatic -attribute binding. The `Model` class provides very high level features for interacting with DynamoDB. +.. code-block:: python + >>> for item in Thread.scan(forum__begins_with='Prefix', views__gt=10): + print(item) diff --git a/docs/awsaccess.rst b/docs/awsaccess.rst new file mode 100644 index 000000000..aaf4b41fe --- /dev/null +++ b/docs/awsaccess.rst @@ -0,0 +1,13 @@ +AWS Access +========== + +PynamoDB uses botocore to interact with the DynamoDB API. Thus, similar methods can be used to provide your AWS +credentials. For local development the use of environment variables such as `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` +is probably preferable. You can of course use IAM users, as recommended by AWS. In addition +`EC2 roles `_ will work as well and +would be recommended when running on EC2. + +As for the permissions granted via IAM, many tasks can be carried out by PynamoDB. So you should construct your +policies as required, see the +`DynamoDB `_ docs for more +information. \ No newline at end of file diff --git a/docs/contributing.rst b/docs/contributing.rst index bae3af617..779b222c3 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -1,5 +1,7 @@ Contributing ============ -Pull requests are welcome, please write tests to accompany your changes or at least run the test suite -with `tox` before submitting. Once on GitHub Travis-ci will run the test suite as well. \ No newline at end of file +Pull requests are welcome, forking from the devel branch. Please write tests to accompany your changes or at least run +the test suite with `tox` before submitting. Once on GitHub Travis-ci will run the test suite as well. + +Don't forget to add yourself to `AUTHORS.rst `_. \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 58538285f..1f32278d0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -29,6 +29,7 @@ Topics api indexes low_level + awsaccess contributing Indices and tables diff --git a/docs/indexes.rst b/docs/indexes.rst index 04ac12c93..31a22bd5c 100644 --- a/docs/indexes.rst +++ b/docs/indexes.rst @@ -5,12 +5,16 @@ DynamoDB supports two types of indexes: global secondary indexes, and local seco Indexes can make accessing your data more efficient, and should be used when appropriate. See `the documentation for more information `__. -Defining an index -^^^^^^^^^^^^^^^^^ +Global Secondary Indexes +^^^^^^^^^^^^^^^^^^^^^^^^ Indexes are defined as classes, just like models. Here is a simple index class: -:: +.. code-block:: python + + from pynamodb.indexes import GlobalSecondaryIndex, AllProjection + from pynamodb.attributes import NumberAttribute + class ViewIndex(GlobalSecondaryIndex): """ @@ -37,7 +41,13 @@ projection classes. * ``IncludeProjection(attributes)``: Only the specified ``attributes`` are projected. We still need to attach the index to the model in order for us to use it. You define it as -a class attribute on the model, as in this example:: +a class attribute on the model, as in this example: + +.. code-block:: python + + from pynamodb.models import Model + from pynamodb.attributes import UnicodeAttribute + class TestModel(Model): """ @@ -49,11 +59,38 @@ a class attribute on the model, as in this example:: view_index = ViewIndex() view = NumberAttribute(default=0) + +Local Secondary Indexes +^^^^^^^^^^^^^^^^^^^^^^^ + +Local secondary indexes are defined just like global ones, but they inherit from ``LocalSecondaryIndex`` instead: + +.. code-block:: python + + from pynamodb.indexes import LocalSecondaryIndex, AllProjection + from pynamodb.attributes import NumberAttribute + + + class ViewIndex(LocalSecondaryIndex): + """ + This class represents a local secondary index + """ + # All attributes are projected + projection = AllProjection() + forum = UnicodeAttribute(hash_key=True) + view = NumberAttribute(range_key=True) + + +You must specify the same hash key on the local secondary index and the model. The range key can be any attribute. + + Querying an index ^^^^^^^^^^^^^^^^^^ Index queries use the same syntax as model queries. Continuing our example, we can query -the ``view_index`` simply by calling ``query``:: +the ``view_index`` global secondary index simply by calling ``query``: + +.. code-block:: python for item in TestModel.view_index.query(1): print("Item queried from index: {0}".format(item)) @@ -61,3 +98,11 @@ the ``view_index`` simply by calling ``query``:: This example queries items from the table using the global secondary index, called ``view_index``, using a hash key value of 1 for the index. This would return all ``TestModel`` items that have a ``view`` attribute of value 1. + +Local secondary index queries have a similar syntax. They require a hash key, and can include conditions on the +range key of the index. Here is an example that queries the index for values of ``view`` greater than zero: + +.. code-block:: python + + for item in TestModel.view_index.query('foo', view__gt=0): + print("Item queried from index: {0}".format(item.view)) diff --git a/docs/low_level.rst b/docs/low_level.rst index a292f3cde..61bc56fbf 100644 --- a/docs/low_level.rst +++ b/docs/low_level.rst @@ -8,7 +8,9 @@ written on top of it. Creating a connection ^^^^^^^^^^^^^^^^^^^^^ -Creating a connection is simple:: +Creating a connection is simple: + +.. code-block:: python from pynamodb.connection import Connection @@ -16,13 +18,13 @@ Creating a connection is simple:: You can specify a different DynamoDB url: -:: +.. code-block:: python conn = Connection(host='http://alternative-domain/') By default, PynamoDB will connect to the us-east-1 region, but you can specify a different one. -:: +.. code-block:: python conn = Connection(region='us-west-1') @@ -30,23 +32,29 @@ By default, PynamoDB will connect to the us-east-1 region, but you can specify a Modifying tables ^^^^^^^^^^^^^^^^ -You can easily list tables:: +You can easily list tables: + +.. code-block:: python >>> conn.list_tables() {u'TableNames': [u'Thread']} -or delete a table:: +or delete a table: + +.. code-block:: python >>> conn.delete_table('Thread') -If you want to change the capacity of a table, that can be done as well:: +If you want to change the capacity of a table, that can be done as well: + +.. code-block:: python >>> conn.update_table('Thread', read_capacity_units=20, write_capacity_units=20) You can create tables as well, although the syntax is verbose. You should really use the model API instead, but here is a low level example to demonstrate the point: -:: +.. code-block:: python kwargs = { 'write_capacity_units': 1, @@ -78,15 +86,21 @@ but here is a low level example to demonstrate the point: Modifying items ^^^^^^^^^^^^^^^ -The low level API can perform item operationst too, such as getting an item:: +The low level API can perform item operationst too, such as getting an item: + +.. code-block:: python conn.get_item('table_name', 'hash_key', 'range_key') -You can put items as well, specifying the keys and any other attributes:: +You can put items as well, specifying the keys and any other attributes: + +.. code-block:: python conn.put_item('table_name', 'hash_key', 'range_key', attributes={'key': 'value'}) -Deleting an item has similar syntax:: +Deleting an item has similar syntax: + +.. code-block:: python conn.delete_item('table_name', 'hash_key', 'range_key') diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 2936ccf35..4cff8e7b3 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -47,9 +47,9 @@ Attributes can be accessed and set normally: >>> user.email 'foo-bar -Did another process update the user? We can update the user with data from DynamoDB:: +Did another process update the user? We can refresh the user with data from DynamoDB:: - >>> user.update() + >>> user.refresh() Ready to delete the user? diff --git a/examples/indexes.py b/examples/indexes.py index 06c4229d0..00309bcec 100644 --- a/examples/indexes.py +++ b/examples/indexes.py @@ -1,9 +1,9 @@ """ Examples using DynamoDB indexes """ -from datetime import datetime -from pynamodb.models import Model, GlobalSecondaryIndex, AllProjection -from pynamodb.attributes import UTCDateTimeAttribute, UnicodeAttribute, NumberAttribute +from pynamodb.models import Model +from pynamodb.indexes import GlobalSecondaryIndex, AllProjection +from pynamodb.attributes import UnicodeAttribute, NumberAttribute class ViewIndex(GlobalSecondaryIndex): diff --git a/examples/model.py b/examples/model.py index fa8f0c4a2..50de6cb89 100644 --- a/examples/model.py +++ b/examples/model.py @@ -1,5 +1,5 @@ """ -An using Amazon's Thread example for motivation +An example using Amazon's Thread example for motivation http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SampleTablesAndData.html """ @@ -57,4 +57,4 @@ class Thread(Model): # Query for item in Thread.query('forum-1', subject__begins_with='subject'): - print(item) \ No newline at end of file + print(item) diff --git a/examples/table_connection.py b/examples/table_connection.py index 904fa296d..f1b5b71ea 100644 --- a/examples/table_connection.py +++ b/examples/table_connection.py @@ -16,4 +16,4 @@ table.put_item('hash-key', 'range-key', attributes={'name': 'value'}) # Delete an item -table.delete_item('hash-key', 'range-key') \ No newline at end of file +table.delete_item('hash-key', 'range-key') diff --git a/pynamodb/__init__.py b/pynamodb/__init__.py index 2706e0c7a..70a32f96e 100644 --- a/pynamodb/__init__.py +++ b/pynamodb/__init__.py @@ -7,4 +7,4 @@ """ __author__ = 'Jharrod LaFon' __license__ = 'MIT' -__version__ = '0.1.4' +__version__ = '0.1.9dev' diff --git a/pynamodb/attributes.py b/pynamodb/attributes.py index e29b7d0b8..e8402a355 100644 --- a/pynamodb/attributes.py +++ b/pynamodb/attributes.py @@ -3,7 +3,6 @@ """ import six import json -import copy from base64 import b64encode, b64decode from delorean import Delorean, parse from pynamodb.constants import ( @@ -11,11 +10,6 @@ DEFAULT_ENCODING ) -class ValueAttribute(object): - """ - - """ - pass class Attribute(object): """ @@ -156,7 +150,66 @@ def serialize(self, value): """ Returns a unicode string """ - return six.u(value) + if value is None or not len(value): + return None + elif isinstance(value, six.text_type): + return value + else: + return six.u(value) + + +class JSONAttribute(Attribute): + """ + A JSON Attribute + + Encodes JSON to unicode internally + """ + def __init__(self, **kwargs): + kwargs.setdefault('attr_type', STRING) + super(JSONAttribute, self).__init__(**kwargs) + + def serialize(self, value): + """ + Serializes JSON to unicode + """ + if value is None: + return None + encoded = json.dumps(value) + return six.u(encoded) + + def deserialize(self, value): + """ + Deserializes JSON + """ + return json.loads(value) + + +class BooleanAttribute(Attribute): + """ + A class for boolean attributes + + This attribute type uses a number attribute to save space + """ + def __init__(self, **kwargs): + kwargs.setdefault('attr_type', NUMBER) + super(BooleanAttribute, self).__init__(**kwargs) + + def serialize(self, value): + """ + Encodes True as 1, False as 0 + """ + if value is None: + return None + elif value: + return json.dumps(1) + else: + return json.dumps(0) + + def deserialize(self, value): + """ + Encode + """ + return bool(json.loads(value)) class NumberSetAttribute(SetMixin, Attribute): diff --git a/pynamodb/connection/base.py b/pynamodb/connection/base.py index 5905c16d5..648643c69 100644 --- a/pynamodb/connection/base.py +++ b/pynamodb/connection/base.py @@ -3,7 +3,7 @@ """ import six from botocore.session import get_session - +import logging from .util import pythonic from .exceptions import TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError from ..types import HASH, RANGE @@ -17,14 +17,20 @@ INDEX_NAME, KEY_SCHEMA, ATTR_NAME, ATTR_TYPE, TABLE_KEY, EXPECTED, KEY_TYPE, GET_ITEM, UPDATE, PUT_ITEM, HTTP_OK, SELECT, ACTION, EXISTS, VALUE, LIMIT, QUERY, SCAN, ITEM, LOCAL_SECONDARY_INDEXES, KEYS, KEY, EQ, SEGMENT, TOTAL_SEGMENTS, CREATE_TABLE, PROVISIONED_THROUGHPUT, READ_CAPACITY_UNITS, - WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION, EXCLUSIVE_START_TABLE_NAME, - DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES, HTTP_BAD_REQUEST) + WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION, EXCLUSIVE_START_TABLE_NAME, TOTAL, + DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES, HTTP_BAD_REQUEST, + CONSUMED_CAPACITY, CAPACITY_UNITS + ) + +log = logging.getLogger(__name__) +log.addHandler(logging.NullHandler()) class MetaTable(object): """ A pythonic wrapper around table metadata """ + def __init__(self, data): self.data = data self._range_keyname = None @@ -74,7 +80,6 @@ def get_index_hash_keyname(self, index_name): if schema_key.get(KEY_TYPE) == HASH: return schema_key.get(ATTR_NAME) - def get_item_attribute_map(self, attributes, item_key=ITEM, pythonic_key=True): """ Builds up a dynamodb compatible AttributeValue map @@ -144,19 +149,27 @@ def get_exclusive_start_key_map(self, exclusive_start_key): """ Builds the exclusive start key attribute map """ - return { - pythonic(EXCLUSIVE_START_KEY): { - self.hash_keyname: { - self.get_attribute_type(self.hash_keyname): exclusive_start_key + if isinstance(exclusive_start_key, dict) and self.hash_keyname in exclusive_start_key: + # This is useful when paginating results, as the LastEvaluatedKey returned is already + # structured properly + return { + pythonic(EXCLUSIVE_START_KEY): exclusive_start_key + } + else: + return { + pythonic(EXCLUSIVE_START_KEY): { + self.hash_keyname: { + self.get_attribute_type(self.hash_keyname): exclusive_start_key + } } } - } class Connection(object): """ A higher level abstraction over botocore """ + def __init__(self, region=None, host=None): self._endpoint = None self._session = None @@ -171,6 +184,56 @@ def __init__(self, region=None, host=None): def __repr__(self): return six.u("Connection<{0}>".format(self.endpoint.host)) + def _log_debug(self, operation, kwargs): + """ + Sends a debug message to the logger + """ + log.debug("Calling {0} with arguments {1}".format( + operation, + kwargs + )) + + def _log_debug_response(self, operation, response): + """ + Sends a debug message to the logger about a response + """ + log.debug("{0} response: {1}".format(operation, response)) + + def _log_error(self, operation, response): + """ + Sends an error message to the logger + """ + log.error("{0} failed with status: {1}, message: {2}".format( + operation, + response.status_code, + response.content) + ) + + def dispatch(self, operation_name, operation_kwargs): + """ + Dispatches `operation_name` with arguments ``operation_kwargs` + """ + if operation_name not in [DESCRIBE_TABLE, LIST_TABLES, UPDATE_TABLE, DELETE_TABLE, CREATE_TABLE]: + if pythonic(RETURN_CONSUMED_CAPACITY) not in operation_kwargs: + operation_kwargs.update(self.get_consumed_capacity_map(TOTAL)) + self._log_debug(operation_name, operation_kwargs) + response, data = self.service.get_operation(operation_name).call(self.endpoint, **operation_kwargs) + if not response.ok: + self._log_error(operation_name, response) + if data and CONSUMED_CAPACITY in data: + capacity = data.get(CONSUMED_CAPACITY) + if isinstance(capacity, dict) and CAPACITY_UNITS in capacity: + capacity = capacity.get(CAPACITY_UNITS) + log.debug( + "{0} {1} consumed {2} units".format( + data.get(TABLE_NAME, ''), + operation_name, + capacity + ) + ) + self._log_debug_response(operation_kwargs, response) + return response, data + @property def session(self): """ @@ -209,7 +272,7 @@ def get_meta_table(self, table_name, refresh=False): operation_kwargs = { pythonic(TABLE_NAME): table_name } - response, data = self.service.get_operation(DESCRIBE_TABLE).call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(DESCRIBE_TABLE, operation_kwargs) if not response.ok: if response.status_code == HTTP_BAD_REQUEST: return None @@ -225,12 +288,10 @@ def create_table(self, read_capacity_units=None, write_capacity_units=None, global_secondary_indexes=None, - local_secondary_indexes=None, - ): + local_secondary_indexes=None): """ Performs the CreateTable operation """ - operation = self.service.get_operation(CREATE_TABLE) operation_kwargs = { pythonic(TABLE_NAME): table_name, pythonic(PROVISIONED_THROUGHPUT): { @@ -278,7 +339,7 @@ def create_table(self, PROJECTION: index.get(pythonic(PROJECTION)), }) operation_kwargs[pythonic(LOCAL_SECONDARY_INDEXES)] = local_secondary_indexes_list - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(CREATE_TABLE, operation_kwargs) if response.status_code != HTTP_OK: raise TableError("Failed to create table: {0}".format(response.content)) return data @@ -287,11 +348,10 @@ def delete_table(self, table_name): """ Performs the DeleteTable operation """ - operation = self.service.get_operation(DELETE_TABLE) operation_kwargs = { pythonic(TABLE_NAME): table_name } - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(DELETE_TABLE, operation_kwargs) if response.status_code != HTTP_OK: raise TableError("Failed to delete table: {0}".format(response.content)) @@ -303,7 +363,6 @@ def update_table(self, """ Performs the UpdateTable operation """ - operation = self.service.get_operation(UPDATE_TABLE) operation_kwargs = { pythonic(TABLE_NAME): table_name } @@ -327,7 +386,7 @@ def update_table(self, } }) operation_kwargs[pythonic(GLOBAL_SECONDARY_INDEX_UPDATES)] = global_secondary_indexes_list - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(UPDATE_TABLE, operation_kwargs) if not response.ok: raise TableError("Failed to update table: {0}".format(response.content)) @@ -335,17 +394,16 @@ def list_tables(self, exclusive_start_table_name=None, limit=None): """ Performs the ListTables operation """ - operation = self.service.get_operation(LIST_TABLES) operation_kwargs = {} if exclusive_start_table_name: operation_kwargs.update({ pythonic(EXCLUSIVE_START_TABLE_NAME): exclusive_start_table_name }) - if limit: + if limit is not None: operation_kwargs.update({ pythonic(LIMIT): limit }) - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(LIST_TABLES, operation_kwargs) if not response.ok: raise TableError("Unable to list tables: {0}".format(response.content)) return data @@ -434,7 +492,6 @@ def delete_item(self, """ Performs the DeleteItem operation and returns the result """ - operation = self.service.get_operation(DELETE_ITEM) operation_kwargs = {pythonic(TABLE_NAME): table_name} operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key)) @@ -446,8 +503,7 @@ def delete_item(self, operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity)) if return_item_collection_metrics: operation_kwargs.update(self.get_item_collection_map(return_item_collection_metrics)) - response, data = operation.call(self.endpoint, **operation_kwargs) - + response, data = self.dispatch(DELETE_ITEM, operation_kwargs) if not response.ok: raise DeleteError("Failed to delete item: {0}".format(response.content)) return data @@ -460,18 +516,16 @@ def update_item(self, expected=None, return_consumed_capacity=None, return_item_collection_metrics=None, - return_values=None - ): + return_values=None): """ Performs the UpdateItem operation """ - operation = self.service.get_operation(UPDATE_ITEM) operation_kwargs = {pythonic(TABLE_NAME): table_name} operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key)) if expected: operation_kwargs.update(self.get_expected_map(table_name, expected)) if return_consumed_capacity: - operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity)) + operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity)) if return_item_collection_metrics: operation_kwargs.update(self.get_item_collection_map(return_item_collection_metrics)) if return_values: @@ -492,8 +546,7 @@ def update_item(self, attr_type: update.get(VALUE) } } - response, data = operation.call(self.endpoint, **operation_kwargs) - + response, data = self.dispatch(UPDATE_ITEM, operation_kwargs) if not response.ok: raise UpdateError("Failed to update item: {0}".format(response.content)) return data @@ -510,7 +563,6 @@ def put_item(self, """ Performs the PutItem operation and returns the result """ - operation = self.service.get_operation(PUT_ITEM) operation_kwargs = {pythonic(TABLE_NAME): table_name} operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=ITEM)) if attributes: @@ -524,8 +576,7 @@ def put_item(self, operation_kwargs.update(self.get_return_values_map(return_values)) if expected: operation_kwargs.update(self.get_expected_map(table_name, expected)) - - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(PUT_ITEM, operation_kwargs) if not response.ok: raise PutError("Failed to put item: {0}".format(response.content)) return data @@ -541,7 +592,6 @@ def batch_write_item(self, """ if put_items is None and delete_items is None: raise ValueError("Either put_items or delete_items must be specified") - operation = self.service.get_operation(BATCH_WRITE_ITEM) operation_kwargs = { pythonic(REQUEST_ITEMS): { table_name: [] @@ -564,7 +614,7 @@ def batch_write_item(self, DELETE_REQUEST: self.get_item_attribute_map(table_name, item, item_key=KEY, pythonic_key=False) }) operation_kwargs[pythonic(REQUEST_ITEMS)][table_name] = delete_items_list + put_items_list - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(BATCH_WRITE_ITEM, operation_kwargs) if not response.ok: raise PutError("Failed to batch write items: {0}".format(response.content)) return data @@ -578,7 +628,6 @@ def batch_get_item(self, """ Performs the batch get item operation """ - operation = self.service.get_operation(BATCH_GET_ITEM) operation_kwargs = { pythonic(REQUEST_ITEMS): { table_name: {} @@ -600,7 +649,7 @@ def batch_get_item(self, self.get_item_attribute_map(table_name, key)[pythonic(ITEM)] ) operation_kwargs[pythonic(REQUEST_ITEMS)][table_name].update(keys_map) - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(BATCH_GET_ITEM, operation_kwargs) if not response.ok: raise GetError("Failed to batch get items: {0}".format(response.content)) return data @@ -614,14 +663,13 @@ def get_item(self, """ Performs the GetItem operation and returns the result """ - operation = self.service.get_operation(GET_ITEM) operation_kwargs = {} if attributes_to_get is not None: operation_kwargs[pythonic(ATTRS_TO_GET)] = attributes_to_get operation_kwargs[pythonic(CONSISTENT_READ)] = consistent_read operation_kwargs[pythonic(TABLE_NAME)] = table_name operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key)) - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(GET_ITEM, operation_kwargs) if not response.ok: raise GetError("Failed to get item: {0}".format(response.content)) return data @@ -638,17 +686,16 @@ def scan(self, """ Performs the scan operation """ - operation = self.service.get_operation(SCAN) operation_kwargs = {pythonic(TABLE_NAME): table_name} if attributes_to_get is not None: operation_kwargs[pythonic(ATTRS_TO_GET)] = attributes_to_get - if limit: + if limit is not None: operation_kwargs[pythonic(LIMIT)] = limit if return_consumed_capacity: operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity)) if exclusive_start_key: operation_kwargs.update(self.get_exclusive_start_key_map(table_name, exclusive_start_key)) - if segment: + if segment is not None: operation_kwargs[pythonic(SEGMENT)] = segment if total_segments: operation_kwargs[pythonic(TOTAL_SEGMENTS)] = total_segments @@ -663,7 +710,7 @@ def scan(self, ATTR_VALUE_LIST: [{attr_type: value for value in condition.get(ATTR_VALUE_LIST)}], COMPARISON_OPERATOR: operator } - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(SCAN, operation_kwargs) if not response.ok: raise ScanError("Failed to scan table: {0}".format(response.content)) return data @@ -679,12 +726,10 @@ def query(self, limit=None, return_consumed_capacity=None, scan_index_forward=None, - select=None - ): + select=None): """ Performs the Query operation and returns the result """ - operation = self.service.get_operation(QUERY) operation_kwargs = {pythonic(TABLE_NAME): table_name} if attributes_to_get: operation_kwargs[pythonic(ATTRS_TO_GET)] = attributes_to_get @@ -694,7 +739,7 @@ def query(self, operation_kwargs.update(self.get_exclusive_start_key_map(table_name, exclusive_start_key)) if index_name: operation_kwargs[pythonic(INDEX_NAME)] = index_name - if limit: + if limit is not None: operation_kwargs[pythonic(LIMIT)] = limit if return_consumed_capacity: operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity)) @@ -712,9 +757,11 @@ def query(self, hash_keyname = self.get_meta_table(table_name).hash_keyname operation_kwargs[pythonic(KEY_CONDITIONS)] = { hash_keyname: { - ATTR_VALUE_LIST: [{ - self.get_attribute_type(table_name, hash_keyname): hash_key, - }], + ATTR_VALUE_LIST: [ + { + self.get_attribute_type(table_name, hash_keyname): hash_key, + } + ], COMPARISON_OPERATOR: EQ }, } @@ -730,7 +777,7 @@ def query(self, COMPARISON_OPERATOR: operator } - response, data = operation.call(self.endpoint, **operation_kwargs) + response, data = self.dispatch(QUERY, operation_kwargs) if not response.ok: raise QueryError("Failed to query items: {0}".format(response.content)) return data diff --git a/pynamodb/constants.py b/pynamodb/constants.py index c3318b96c..196480b4d 100644 --- a/pynamodb/constants.py +++ b/pynamodb/constants.py @@ -22,12 +22,12 @@ RETURN_ITEM_COLL_METRICS = 'ReturnItemCollectionMetrics' EXCLUSIVE_START_TABLE_NAME = 'ExclusiveStartTableName' RETURN_CONSUMED_CAPACITY = 'ReturnConsumedCapacity' -EXCLUSIVE_START_KEY = 'ExclusiveStartKey' COMPARISON_OPERATOR = 'ComparisonOperator' SCAN_INDEX_FORWARD = 'ScanIndexForward' ATTR_DEFINITIONS = 'AttributeDefinitions' ATTR_VALUE_LIST = 'AttributeValueList' TABLE_DESCRIPTION = 'TableDescription' +UNPROCESSED_KEYS = 'UnprocessedKeys' CONSISTENT_READ = 'ConsistentRead' DELETE_REQUEST = 'DeleteRequest' RETURN_VALUES = 'ReturnValues' @@ -112,6 +112,8 @@ # These are constants used in the KeyConditions parameter # See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#DDB-Query-request-KeyConditions +EXCLUSIVE_START_KEY = 'ExclusiveStartKey' +LAST_EVALUATED_KEY = 'LastEvaluatedKey' BEGINS_WITH = 'BEGINS_WITH' BETWEEN = 'BETWEEN' EQ = 'EQ' @@ -123,6 +125,15 @@ IN = 'IN' KEY_CONDITIONS = 'KeyConditions' COMPARISON_OPERATOR_VALUES = [EQ, LE, LT, GE, GT, BEGINS_WITH, BETWEEN] +QUERY_OPERATOR_MAP = { + 'eq': EQ, + 'le': LE, + 'lt': LT, + 'ge': GE, + 'gt': GT, + 'begins_with': BEGINS_WITH, + 'between': BETWEEN +} # These are the valid select values for the Scan operation # See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html#DDB-Scan-request-Select @@ -135,6 +146,20 @@ SPECIFIC_ATTRIBUTES = 'SPECIFIC_ATTRIBUTES' COUNT = 'COUNT' SELECT_VALUES = [ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, SPECIFIC_ATTRIBUTES, COUNT] +SCAN_OPERATOR_MAP = { + 'eq': EQ, + 'ne': NE, + 'le': LE, + 'lt': LT, + 'ge': GT, + 'not_null': NOT_NULL, + 'null': NULL, + 'contains': CONTAINS, + 'not_contains': NOT_CONTAINS, + 'begins_with': BEGINS_WITH, + 'in': IN, + 'between': BETWEEN +} # These are the valid comparison operators for the Scan operation # See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html#DDB-Scan-request-ScanFilter @@ -149,6 +174,8 @@ # These are the valid ReturnConsumedCapacity values used in multiple operations # See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html#DDB-BatchGetItem-request-ReturnConsumedCapacity +CONSUMED_CAPACITY = 'ConsumedCapacity' +CAPACITY_UNITS = 'CapacityUnits' INDEXES = 'INDEXES' TOTAL = 'TOTAL' NONE = 'NONE' @@ -173,3 +200,5 @@ DELETE = 'DELETE' ADD = 'ADD' ATTR_UPDATE_ACTIONS = [PUT, DELETE, ADD] +BATCH_GET_PAGE_LIMIT = 100 +BATCH_WRITE_PAGE_LIMIT = 25 diff --git a/pynamodb/indexes.py b/pynamodb/indexes.py index b932b1d64..e190dff65 100644 --- a/pynamodb/indexes.py +++ b/pynamodb/indexes.py @@ -161,4 +161,4 @@ class AllProjection(Projection): """ An ALL projection """ - projection_type = ALL \ No newline at end of file + projection_type = ALL diff --git a/pynamodb/models.py b/pynamodb/models.py index dc4d8267e..6603f7c6b 100644 --- a/pynamodb/models.py +++ b/pynamodb/models.py @@ -9,6 +9,7 @@ import six import copy from six import with_metaclass +from .throttle import NoThrottle from .attributes import Attribute from .connection.base import MetaTable from .connection.table import TableConnection @@ -19,10 +20,13 @@ ATTR_TYPE_MAP, ATTR_DEFINITIONS, ATTR_NAME, ATTR_TYPE, KEY_SCHEMA, KEY_TYPE, ITEM, ITEMS, READ_CAPACITY_UNITS, WRITE_CAPACITY_UNITS, RANGE_KEY, ATTRIBUTES, PUT, DELETE, RESPONSES, GLOBAL_SECONDARY_INDEX, - INDEX_NAME, PROVISIONED_THROUGHPUT, PROJECTION, - GLOBAL_SECONDARY_INDEXES, LOCAL_SECONDARY_INDEXES, - PROJECTION_TYPE, NON_KEY_ATTRIBUTES, EQ, LE, LT, GT, GE, BEGINS_WITH, BETWEEN, - COMPARISON_OPERATOR, ATTR_VALUE_LIST, TABLE_STATUS, ACTIVE) + INDEX_NAME, PROVISIONED_THROUGHPUT, PROJECTION, ATTR_UPDATES, ALL_NEW, + GLOBAL_SECONDARY_INDEXES, LOCAL_SECONDARY_INDEXES, ACTION, VALUE, KEYS, + PROJECTION_TYPE, NON_KEY_ATTRIBUTES, COMPARISON_OPERATOR, ATTR_VALUE_LIST, + TABLE_STATUS, ACTIVE, RETURN_VALUES, BATCH_GET_PAGE_LIMIT, UNPROCESSED_KEYS, + PUT_REQUEST, DELETE_REQUEST, LAST_EVALUATED_KEY, QUERY_OPERATOR_MAP, + SCAN_OPERATOR_MAP, CONSUMED_CAPACITY, BATCH_WRITE_PAGE_LIMIT, TABLE_NAME, + CAPACITY_UNITS) class ModelContextManager(object): @@ -33,7 +37,7 @@ class ModelContextManager(object): def __init__(self, model, auto_commit=True): self.model = model self.auto_commit = auto_commit - self.max_operations = 25 + self.max_operations = BATCH_WRITE_PAGE_LIMIT self.pending_operations = [] def __enter__(self): @@ -80,10 +84,32 @@ def commit(self): elif item['action'] == DELETE: delete_items.append(item['item'].get_keys()) self.pending_operations = [] - return self.model.get_connection().batch_write_item( + if not len(put_items) and not len(delete_items): + return + self.model.throttle.throttle() + data = self.model.get_connection().batch_write_item( put_items=put_items, delete_items=delete_items ) + self.model.add_throttle_record(data.get(CONSUMED_CAPACITY, None)) + if not data: + return + unprocessed_keys = data.get(UNPROCESSED_KEYS, {}).get(self.model.table_name) + while unprocessed_keys: + put_items = [] + delete_items = [] + for key in unprocessed_keys: + if PUT_REQUEST in key: + put_items.append(key.get(PUT_REQUEST)) + elif DELETE_REQUEST in key: + delete_items.append(key.get(DELETE_REQUEST)) + self.model.throttle.throttle() + data = self.model.get_connection().batch_write_item( + put_items=put_items, + delete_items=delete_items + ) + self.model.add_throttle_record(data.get(CONSUMED_CAPACITY)) + unprocessed_keys = data.get(UNPROCESSED_KEYS, {}).get(self.model.table_name) class MetaModel(type): @@ -102,6 +128,7 @@ def __init__(cls, name, bases, attrs): elif issubclass(attr_obj.__class__, (Attribute, )): attr_obj.attr_name = attr_name + class Model(with_metaclass(MetaModel)): """ Defines a `PynamoDB` Model @@ -117,6 +144,7 @@ class Model(with_metaclass(MetaModel)): indexes = None connection = None index_classes = None + throttle = NoThrottle() def __init__(self, hash_key=None, range_key=None, **attrs): """ @@ -132,8 +160,17 @@ def __init__(self, hash_key=None, range_key=None, **attrs): setattr(self, self.meta().range_keyname, range_key) self.set_attributes(**attrs) - def __getattribute__(self, item): - return object.__getattribute__(self, item) + @classmethod + def add_throttle_record(cls, records): + """ + Pulls out the table name and capacity units from `records` and + puts it in `self.throttle` + """ + if records: + for record in records: + if record.get(TABLE_NAME) == cls.table_name: + cls.throttle.add_record(record.get(CAPACITY_UNITS)) + break @classmethod def batch_get(cls, items): @@ -143,7 +180,17 @@ def batch_get(cls, items): hash_keyname = cls.meta().hash_keyname range_keyname = cls.meta().range_keyname keys_to_get = [] - for item in items: + while items: + if len(keys_to_get) == BATCH_GET_PAGE_LIMIT: + while keys_to_get: + page, unprocessed_keys = cls._batch_get_page(keys_to_get) + for batch_item in page: + yield cls.from_raw_data(batch_item) + if unprocessed_keys: + keys_to_get = unprocessed_keys + else: + keys_to_get = [] + item = items.pop() if range_keyname: hash_key, range_key = cls.serialize_keys(item[0], item[1]) keys_to_get.append({ @@ -156,11 +203,28 @@ def batch_get(cls, items): hash_keyname: hash_key }) + while keys_to_get: + page, unprocessed_keys = cls._batch_get_page(keys_to_get) + for batch_item in page: + yield cls.from_raw_data(batch_item) + if unprocessed_keys: + keys_to_get = unprocessed_keys + else: + keys_to_get = [] + + @classmethod + def _batch_get_page(cls, keys_to_get): + """ + Returns a single page from BatchGetItem + Also returns any unprocessed items + """ data = cls.get_connection().batch_get_item( keys_to_get - ).get(RESPONSES).get(cls.table_name) - for item in data: - yield cls.from_raw_data(item) + ) + cls.throttle.add_record(data.get(CONSUMED_CAPACITY)) + item_data = data.get(RESPONSES).get(cls.table_name) + unprocessed_items = data.get(UNPROCESSED_KEYS).get(cls.table_name, {}).get(KEYS, None) + return item_data, unprocessed_items @classmethod def batch_write(cls, auto_commit=True): @@ -222,15 +286,42 @@ def delete(self): """ Deletes this object from dynamodb """ - args, kwargs = self._get_save_args(attributes=False) + args, kwargs = self._get_save_args(attributes=False, null_check=False) return self.get_connection().delete_item(*args, **kwargs) + def update_item(self, attribute, value, action=None): + args, kwargs = self._get_save_args() + for attr_name, attr_cls in self.get_attributes().items(): + if attr_name == attribute: + value = attr_cls.serialize(value) + break + del(kwargs[pythonic(ATTRIBUTES)]) + kwargs[pythonic(ATTR_UPDATES)] = { + attribute: { + ACTION: action.upper(), + VALUE: value + } + } + kwargs[pythonic(RETURN_VALUES)] = ALL_NEW + data = self.get_connection().update_item( + *args, + **kwargs + ) + self.throttle.add_record(data.get(CONSUMED_CAPACITY)) + for name, value in data.get(ATTRIBUTES).items(): + attr = self.get_attributes().get(name, None) + if attr: + setattr(self, name, attr.deserialize(value.get(ATTR_TYPE_MAP[attr.attr_type]))) + return data + def save(self): """ Save this object to dynamodb """ args, kwargs = self._get_save_args() - return self.get_connection().put_item(*args, **kwargs) + data = self.get_connection().put_item(*args, **kwargs) + self.throttle.add_record(data.get(CONSUMED_CAPACITY)) + return data def get_keys(self): """ @@ -247,12 +338,12 @@ def get_keys(self): } return attrs - def _get_save_args(self, attributes=True): + def _get_save_args(self, attributes=True, null_check=True): """ Gets the proper *args, **kwargs for saving and retrieving this object """ kwargs = {} - serialized = self.serialize() + serialized = self.serialize(null_check=null_check) hash_key = serialized.get(HASH) range_key = serialized.get(RANGE, None) args = (hash_key, ) @@ -262,13 +353,14 @@ def _get_save_args(self, attributes=True): kwargs[pythonic(ATTRIBUTES)] = serialized[pythonic(ATTRIBUTES)] return args, kwargs - def update(self, consistent_read=False): + def refresh(self, consistent_read=False): """ Retrieves this object's data from dynamodb and syncs this local object """ args, kwargs = self._get_save_args(attributes=False) kwargs.setdefault('consistent_read', consistent_read) attrs = self.get_connection().get_item(*args, **kwargs) + self.throttle.add_record(attrs.get(CONSUMED_CAPACITY)) self.deserialize(attrs.get(ITEM, {})) def deserialize(self, attrs): @@ -283,7 +375,7 @@ def deserialize(self, attrs): if value: setattr(self, name, attr_instance.deserialize(value)) - def serialize(self, attr_map=False): + def serialize(self, attr_map=False, null_check=True): """ Serializes a value for use with DynamoDB """ @@ -294,20 +386,23 @@ def serialize(self, attr_map=False): if value is None: if attr.null: continue - else: + elif null_check: raise ValueError("Attribute '{0}' cannot be None".format(name)) + serialized = attr.serialize(value) + if serialized is None: + continue if attr_map: attrs[attributes][name] = { - ATTR_TYPE_MAP[attr.attr_type]: attr.serialize(value) + ATTR_TYPE_MAP[attr.attr_type]: serialized } else: if attr.is_hash_key: - attrs[HASH] = attr.serialize(value) + attrs[HASH] = serialized elif attr.is_range_key: - attrs[RANGE] = attr.serialize(value) + attrs[RANGE] = serialized else: attrs[attributes][name] = { - ATTR_TYPE_MAP[attr.attr_type]: attr.serialize(value) + ATTR_TYPE_MAP[attr.attr_type]: serialized } return attrs @@ -356,8 +451,10 @@ def get(cls, range_key=range_key, consistent_read=consistent_read ) - if data: - return cls.from_raw_data(data.get(ITEM)) + cls.throttle.add_record(data.get(CONSUMED_CAPACITY)) + item_data = data.get(ITEM) + if item_data: + return cls.from_raw_data(item_data) else: return None @@ -465,30 +562,11 @@ def schema(cls): return schema @classmethod - def query(cls, - hash_key, - consistent_read=False, - index_name=None, - scan_index_forward=None, - **filters): + def _build_filters(cls, operator_map, filters): """ - Provides a high level query API + Builds an appropriate condition map """ - operators = { - 'eq': EQ, - 'le': LE, - 'lt': LT, - 'ge': GE, - 'gt': GT, - 'begins_with': BEGINS_WITH, - 'between': BETWEEN - } key_conditions = {} - cls.get_indexes() - if index_name: - hash_key = cls.index_classes[index_name].hash_key_attribute().serialize(hash_key) - else: - hash_key = cls.serialize_keys(hash_key, None)[0] attribute_classes = cls.get_attributes() for query, value in filters.items(): attribute = None @@ -499,13 +577,32 @@ def query(cls, if not isinstance(value, list): value = [value] value = [attribute_class.serialize(val) for val in value] - elif token in operators: + elif token in operator_map: key_conditions[attribute] = { - COMPARISON_OPERATOR: operators.get(token), + COMPARISON_OPERATOR: operator_map.get(token), ATTR_VALUE_LIST: value } else: raise ValueError("Could not parse filter: {0}".format(query)) + return key_conditions + + @classmethod + def query(cls, + hash_key, + consistent_read=False, + index_name=None, + scan_index_forward=None, + **filters): + """ + Provides a high level query API + """ + key_conditions = {} + cls.get_indexes() + if index_name: + hash_key = cls.index_classes[index_name].hash_key_attribute().serialize(hash_key) + else: + hash_key = cls.serialize_keys(hash_key, None)[0] + key_conditions = cls._build_filters(QUERY_OPERATOR_MAP, filters) data = cls.get_connection().query( hash_key, index_name=index_name, @@ -513,17 +610,55 @@ def query(cls, scan_index_forward=scan_index_forward, key_conditions=key_conditions ) + cls.throttle.add_record(data.get(CONSUMED_CAPACITY)) + last_evaluated_key = data.get(LAST_EVALUATED_KEY, None) for item in data.get(ITEMS): yield cls.from_raw_data(item) + while last_evaluated_key: + data = cls.get_connection().query( + hash_key, + exclusive_start_key=last_evaluated_key, + index_name=index_name, + consistent_read=consistent_read, + scan_index_forward=scan_index_forward, + key_conditions=key_conditions + ) + cls.throttle.add_record(data.get(CONSUMED_CAPACITY)) + for item in data.get(ITEMS): + yield cls.from_raw_data(item) + last_evaluated_key = data.get(LAST_EVALUATED_KEY, None) @classmethod - def scan(cls, segment=None, total_segments=None): + def scan(cls, + segment=None, + total_segments=None, + limit=None, + **filters): """ Iterates through all items in the table """ - data = cls.get_connection().scan(segment=segment, total_segments=total_segments) + scan_filter = cls._build_filters(SCAN_OPERATOR_MAP, filters) + data = cls.get_connection().scan( + segment=segment, + limit=limit, + scan_filter=scan_filter, + total_segments=total_segments + ) + last_evaluated_key = data.get(LAST_EVALUATED_KEY, None) + cls.throttle.add_record(data.get(CONSUMED_CAPACITY)) for item in data.get(ITEMS): yield cls.from_raw_data(item) + while last_evaluated_key: + data = cls.get_connection().scan( + exclusive_start_key=last_evaluated_key, + limit=limit, + scan_filter=scan_filter, + segment=segment, + total_segments=total_segments + ) + for item in data.get(ITEMS): + yield cls.from_raw_data(item) + last_evaluated_key = data.get(LAST_EVALUATED_KEY, None) @classmethod def exists(cls): @@ -564,5 +699,3 @@ def create_table(cls, wait=False, read_capacity_units=None, write_capacity_units time.sleep(2) else: raise ValueError("No TableStatus returned for table") - - diff --git a/pynamodb/tests/integration/model_integration.py b/pynamodb/tests/integration/model_integration.py index 7d04b6cf4..a738516d3 100644 --- a/pynamodb/tests/integration/model_integration.py +++ b/pynamodb/tests/integration/model_integration.py @@ -50,46 +50,47 @@ class TestModel(Model): print("Creating table") TestModel.create_table(read_capacity_units=1, write_capacity_units=1) -pprint.pprint(TestModel.schema()) -obj = TestModel('foo', 'bar') -obj.save() -obj2 = TestModel('foo2', 'bar2') -print(obj2, obj) -print(obj.epoch.strftime(DATETIME_FORMAT), obj2.epoch.strftime(DATETIME_FORMAT)) -obj3 = TestModel('setitem', 'setrange', scores={1, 2.1}) -obj3.save() -obj3.update() - -with TestModel.batch_write() as batch: - items = [TestModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(10)] - for item in items: - batch.save(item) - -item_keys = [('hash-{0}'.format(x), 'thread-{0}'.format(x)) for x in range(10)] - -for item in TestModel.batch_get(item_keys): - print(item) - -for item in TestModel.query('setitem', thread__begins_with='set'): - print("Query Item {0}".format(item)) - -with TestModel.batch_write() as batch: - items = [TestModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(10)] - for item in items: - print("Batch delete") - batch.delete(item) - -for item in TestModel.scan(): - print("Scanned item: {0}".format(item)) - -tstamp = datetime.now() +# pprint.pprint(TestModel.schema()) +# obj = TestModel('foo', 'bar') +# obj.save() +# obj2 = TestModel('foo2', 'bar2') +# print(obj2, obj) +# print(obj.epoch.strftime(DATETIME_FORMAT), obj2.epoch.strftime(DATETIME_FORMAT)) +# obj3 = TestModel('setitem', 'setrange', scores={1, 2.1}) +# obj3.save() +# obj3.refresh() +# +# with TestModel.batch_write() as batch: +# items = [TestModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(10)] +# for item in items: +# batch.save(item) +# +# item_keys = [('hash-{0}'.format(x), 'thread-{0}'.format(x)) for x in range(10)] +# +# for item in TestModel.batch_get(item_keys): +# print(item) +# +# for item in TestModel.query('setitem', thread__begins_with='set'): +# print("Query Item {0}".format(item)) +# +# with TestModel.batch_write() as batch: +# items = [TestModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(10)] +# for item in items: +# print("Batch delete") +# batch.delete(item) +# +# for item in TestModel.scan(): +# print("Scanned item: {0}".format(item)) +# +# tstamp = datetime.now() query_obj = TestModel('query_forum', 'query_thread') query_obj.forum = 'foo' -query_obj.epoch = tstamp +print(query_obj.view) query_obj.save() - -for item in TestModel.epoch_index.query(tstamp): - print("Item queried from index: {0}".format(item)) - -for item in TestModel.view_index.query('foo', view__gt=0): - print("Item queried from index: {0}".format(item.view)) +query_obj.update_item('view', 1, action='add') +print(query_obj.view) +# for item in TestModel.epoch_index.query(tstamp): +# print("Item queried from index: {0}".format(item)) +# +# for item in TestModel.view_index.query('foo', view__gt=0): +# print("Item queried from index: {0}".format(item.view)) diff --git a/pynamodb/tests/test_attributes.py b/pynamodb/tests/test_attributes.py index 3d1e65038..d32715e2b 100644 --- a/pynamodb/tests/test_attributes.py +++ b/pynamodb/tests/test_attributes.py @@ -10,7 +10,9 @@ from pynamodb.constants import UTC, DATETIME_FORMAT from pynamodb.attributes import ( BinarySetAttribute, BinaryAttribute, NumberSetAttribute, NumberAttribute, - UnicodeAttribute, UnicodeSetAttribute, UTCDateTimeAttribute, DEFAULT_ENCODING) + UnicodeAttribute, UnicodeSetAttribute, UTCDateTimeAttribute, BooleanAttribute, + JSONAttribute, DEFAULT_ENCODING, NUMBER, STRING, STRING_SET, NUMBER_SET, BINARY_SET, + BINARY) class UTCDateTimeAttributeTestCase(TestCase): @@ -23,7 +25,7 @@ def test_utc_datetime_attribute(self): """ attr = UTCDateTimeAttribute() self.assertIsNotNone(attr) - + self.assertEqual(attr.attr_type, STRING) tstamp = datetime.now() attr = UTCDateTimeAttribute(default=tstamp) self.assertEqual(attr.default, tstamp) @@ -58,6 +60,7 @@ def test_binary_attribute(self): """ attr = BinaryAttribute() self.assertIsNotNone(attr) + self.assertEqual(attr.attr_type, BINARY) attr = BinaryAttribute(default=b'foo') self.assertEqual(attr.default, b'foo') @@ -92,6 +95,7 @@ def test_binary_set_serialize(self): BinarySetAttribute.serialize """ attr = BinarySetAttribute() + self.assertEqual(attr.attr_type, BINARY_SET) self.assertEqual( attr.serialize({b'foo', b'bar'}), [b64encode(val).decode(DEFAULT_ENCODING) for val in sorted({b'foo', b'bar'})]) @@ -138,6 +142,7 @@ def test_number_attribute(self): """ attr = NumberAttribute() self.assertIsNotNone(attr) + self.assertEqual(attr.attr_type, NUMBER) attr = NumberAttribute(default=1) self.assertEqual(attr.default, 1) @@ -163,6 +168,7 @@ def test_number_set_deserialize(self): NumberSetAttribute.deserialize """ attr = NumberSetAttribute() + self.assertEqual(attr.attr_type, NUMBER_SET) self.assertEqual(attr.deserialize([json.dumps(val) for val in sorted({1, 2})]), {1, 2}) def test_number_set_serialize(self): @@ -183,7 +189,6 @@ def test_number_set_attribute(self): attr = NumberSetAttribute(default={1, 2}) self.assertEqual(attr.default, {1, 2}) - class UnicodeAttributeTestCase(TestCase): """ Tests unicode attributes @@ -194,6 +199,7 @@ def test_unicode_attribute(self): """ attr = UnicodeAttribute() self.assertIsNotNone(attr) + self.assertEqual(attr.attr_type, STRING) attr = UnicodeAttribute(default=six.u('foo')) self.assertEqual(attr.default, six.u('foo')) @@ -204,6 +210,9 @@ def test_unicode_serialize(self): """ attr = UnicodeAttribute() self.assertEqual(attr.serialize('foo'), six.u('foo')) + self.assertEqual(attr.serialize(u'foo'), six.u('foo')) + self.assertEqual(attr.serialize(u''), None) + self.assertEqual(attr.serialize(None), None) def test_unicode_deserialize(self): """ @@ -211,12 +220,14 @@ def test_unicode_deserialize(self): """ attr = UnicodeAttribute() self.assertEqual(attr.deserialize('foo'), six.u('foo')) + self.assertEqual(attr.deserialize(u'foo'), six.u('foo')) def test_unicode_set_serialize(self): """ UnicodeSetAttribute.serialize """ attr = UnicodeSetAttribute() + self.assertEqual(attr.attr_type, STRING_SET) self.assertEqual(attr.deserialize(None), None) self.assertEqual( attr.serialize({six.u('foo'), six.u('bar')}), @@ -238,6 +249,74 @@ def test_unicode_set_attribute(self): """ attr = UnicodeSetAttribute() self.assertIsNotNone(attr) - + self.assertEqual(attr.attr_type, STRING_SET) attr = UnicodeSetAttribute(default={six.u('foo'), six.u('bar')}) self.assertEqual(attr.default, {six.u('foo'), six.u('bar')}) + + +class BooleanAttributeTestCase(TestCase): + """ + Tests boolean attributes + """ + def test_boolean_attribute(self): + """ + BooleanAttribute.default + """ + attr = BooleanAttribute() + self.assertIsNotNone(attr) + + self.assertEqual(attr.attr_type, NUMBER) + attr = BooleanAttribute(default=True) + self.assertEqual(attr.default, True) + + def test_boolean_serialize(self): + """ + BooleanAttribute.serialize + """ + attr = BooleanAttribute() + self.assertEqual(attr.serialize(True), json.dumps(1)) + self.assertEqual(attr.serialize(False), json.dumps(0)) + self.assertEqual(attr.serialize(None), None) + + def test_boolean_deserialize(self): + """ + BooleanAttribute.deserialize + """ + attr = BooleanAttribute() + self.assertEqual(attr.deserialize('1'), True) + self.assertEqual(attr.deserialize('0'), False) + + +class JSONAttributeTestCase(TestCase): + """ + Tests json attributes + """ + def test_json_attribute(self): + """ + JSONAttribute.default + """ + attr = JSONAttribute() + self.assertIsNotNone(attr) + + self.assertEqual(attr.attr_type, STRING) + attr = JSONAttribute(default={}) + self.assertEqual(attr.default, {}) + + def test_json_serialize(self): + """ + JSONAttribute.serialize + """ + attr = JSONAttribute() + item = {'foo': 'bar', 'bool': True, 'number': 3.141} + self.assertEqual(attr.serialize(item), six.u(json.dumps(item))) + self.assertEqual(attr.serialize({}), six.u('{}')) + self.assertEqual(attr.serialize(None), None) + + def test_json_deserialize(self): + """ + JSONAttribute.deserialize + """ + attr = JSONAttribute() + item = {'foo': 'bar', 'bool': True, 'number': 3.141} + encoded = six.u(json.dumps(item)) + self.assertEqual(attr.deserialize(encoded), item) diff --git a/pynamodb/tests/test_base_connection.py b/pynamodb/tests/test_base_connection.py index 6cf331fdc..32be6f820 100644 --- a/pynamodb/tests/test_base_connection.py +++ b/pynamodb/tests/test_base_connection.py @@ -339,6 +339,7 @@ def test_delete_item(self): "Amazon DynamoDB", "How do I update multiple items?") params = { + 'return_consumed_capacity': 'TOTAL', 'key': { 'ForumName': { 'S': 'Amazon DynamoDB' @@ -359,6 +360,7 @@ def test_delete_item(self): return_values='ALL_NEW' ) params = { + 'return_consumed_capacity': 'TOTAL', 'key': { 'ForumName': { 'S': 'Amazon DynamoDB' @@ -436,7 +438,8 @@ def test_delete_item(self): } }, 'table_name': self.test_table_name, - 'return_item_collection_metrics': 'SIZE' + 'return_item_collection_metrics': 'SIZE', + 'return_consumed_capacity': 'TOTAL' } self.assertEqual(req.call_args[1], params) @@ -472,6 +475,7 @@ def test_delete_item(self): } }, 'table_name': self.test_table_name, + 'return_consumed_capacity': 'TOTAL', 'return_item_collection_metrics': 'SIZE' } self.assertEqual(req.call_args[1], params) @@ -510,6 +514,7 @@ def test_get_item(self): attributes_to_get=['ForumName'] ) params = { + 'return_consumed_capacity': 'TOTAL', 'attributes_to_get': ['ForumName'], 'key': { 'ForumName': { @@ -636,6 +641,7 @@ def test_update_item(self): 'Action': 'PUT' } }, + 'return_consumed_capacity': 'TOTAL', 'table_name': 'ci-table' } self.assertEqual(req.call_args[1], params) @@ -696,6 +702,7 @@ def test_put_item(self): attributes={'ForumName': 'foo-value'} ) params = {'table_name': self.test_table_name, + 'return_consumed_capacity': 'TOTAL', 'item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}} self.assertEqual(req.call_args[1], params) @@ -707,8 +714,18 @@ def test_put_item(self): range_key='foo-range-key', attributes={'ForumName': 'foo-value'} ) - params = {'item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}, - 'table_name': self.test_table_name} + params = { + 'return_consumed_capacity': 'TOTAL', + 'item': { + 'ForumName': { + 'S': 'foo-value' + }, + 'Subject': { + 'S': 'foo-range-key' + } + }, + 'table_name': self.test_table_name + } self.assertEqual(req.call_args[1], params) with patch(PATCH_METHOD) as req: @@ -720,8 +737,26 @@ def test_put_item(self): attributes={'foo': {'S': 'bar'}}, expected={'Forum': {'Exists': False}} ) - params = {'table_name': self.test_table_name, 'expected': {'Forum': {'Exists': False}}, - 'item': {'ForumName': {'S': 'item1-hash'}, 'foo': {'S': 'bar'}, 'Subject': {'S': 'item1-range'}}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'table_name': self.test_table_name, + 'expected': { + 'Forum': { + 'Exists': False + } + }, + 'item': { + 'ForumName': { + 'S': 'item1-hash' + }, + 'foo': { + 'S': 'bar' + }, + 'Subject': { + 'S': 'item1-range' + } + } + } self.assertEqual(req.call_args[1], params) with patch(PATCH_METHOD) as req: @@ -733,8 +768,28 @@ def test_put_item(self): attributes={'foo': {'S': 'bar'}}, expected={'ForumName': {'Value': 'item1-hash'}} ) - params = {'table_name': self.test_table_name, 'expected': {'ForumName': {'Value': {'S': 'item1-hash'}}}, - 'item': {'ForumName': {'S': 'item1-hash'}, 'foo': {'S': 'bar'}, 'Subject': {'S': 'item1-range'}}} + params = { + 'table_name': self.test_table_name, + 'expected': { + 'ForumName': { + 'Value': { + 'S': 'item1-hash' + } + } + }, + 'return_consumed_capacity': 'TOTAL', + 'item': { + 'ForumName': { + 'S': 'item1-hash' + }, + 'foo': { + 'S': 'bar' + }, + 'Subject': { + 'S': 'item1-range' + } + } + } self.assertEqual(req.call_args[1], params) def test_batch_write_item(self): @@ -791,17 +846,23 @@ def test_batch_write_item(self): table_name, put_items=items ) - params = {'request_items': { - 'Thread': [{'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}}]}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'request_items': { + 'Thread': [ + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}} + ] + } + } self.assertEqual(req.call_args[1], params) with patch(PATCH_METHOD) as req: req.return_value = HttpBadRequest(), {} @@ -818,17 +879,23 @@ def test_batch_write_item(self): table_name, delete_items=items ) - params = {'request_items': { - 'Thread': [{'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, - {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}}]}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'request_items': { + 'Thread': [ + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, + {'DeleteRequest': {'Key': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}} + ] + } + } self.assertEqual(req.call_args[1], params) with patch(PATCH_METHOD) as req: @@ -924,17 +991,25 @@ def test_batch_get_item(self): table_name, items ) - params = {'request_items': {'Thread': { - 'Keys': [{'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}]}}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'request_items': { + 'Thread': { + 'Keys': [ + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}} + ] + } + } + } self.assertEqual(req.call_args[1], params) def test_query(self): @@ -1012,6 +1087,7 @@ def test_query(self): key_conditions={'ForumName': {'ComparisonOperator': 'BEGINS_WITH', 'AttributeValueList': ['thread']}} ) params = { + 'return_consumed_capacity': 'TOTAL', 'key_conditions': { 'ForumName': { 'ComparisonOperator': 'BEGINS_WITH', 'AttributeValueList': [{ @@ -1036,6 +1112,7 @@ def test_query(self): ) params = { 'limit': 1, + 'return_consumed_capacity': 'TOTAL', 'consistent_read': True, 'exclusive_start_key': { 'ForumName': { @@ -1064,6 +1141,7 @@ def test_query(self): exclusive_start_key="FooForum" ) params = { + 'return_consumed_capacity': 'TOTAL', 'exclusive_start_key': { 'ForumName': { 'S': 'FooForum' @@ -1087,9 +1165,26 @@ def test_scan(self): """ conn = Connection() table_name = 'Thread' + with patch(PATCH_METHOD) as req: req.return_value = HttpOK(), DESCRIBE_TABLE_DATA conn.describe_table(table_name) + + with patch(PATCH_METHOD) as req: + req.return_value = HttpOK(), {} + conn.scan( + table_name, + segment=0, + total_segments=22, + ) + params = { + 'return_consumed_capacity': 'TOTAL', + 'table_name': table_name, + 'segment': 0, + 'total_segments': 22, + } + self.assertDictEqual(req.call_args[1], params) + with patch(PATCH_METHOD) as req: req.return_value = HttpOK(), {} conn.scan( @@ -1121,7 +1216,10 @@ def test_scan(self): conn.scan( table_name, ) - params = {'table_name': table_name} + params = { + 'return_consumed_capacity': 'TOTAL', + 'table_name': table_name + } self.assertEqual(req.call_args[1], params) kwargs = { @@ -1169,6 +1267,7 @@ def test_scan(self): **kwargs ) params = { + 'return_consumed_capacity': 'TOTAL', 'table_name': table_name, 'scan_filter': { 'ForumName': { diff --git a/pynamodb/tests/test_model.py b/pynamodb/tests/test_model.py index a0ddbfb30..54d5d2a30 100644 --- a/pynamodb/tests/test_model.py +++ b/pynamodb/tests/test_model.py @@ -3,10 +3,13 @@ """ import copy import six +from pynamodb.throttle import Throttle +from pynamodb.connection.util import pythonic from pynamodb.connection.exceptions import TableError from pynamodb.types import RANGE from pynamodb.constants import ( - ITEM, STRING_SHORT, ALL, KEYS_ONLY, INCLUDE + ITEM, STRING_SHORT, ALL, KEYS_ONLY, INCLUDE, REQUEST_ITEMS, UNPROCESSED_KEYS, + RESPONSES, KEYS, ITEMS, LAST_EVALUATED_KEY, EXCLUSIVE_START_KEY ) from pynamodb.models import Model from pynamodb.indexes import ( @@ -32,6 +35,7 @@ PATCH_METHOD = 'botocore.operation.Operation.call' + class EmailIndex(GlobalSecondaryIndex): """ A global secondary index for email addresses @@ -91,6 +95,14 @@ class SimpleUserModel(Model): aliases = UnicodeSetAttribute() icons = BinarySetAttribute() +class ThrottledUserModel(Model): + """ + A testing model + """ + table_name = 'UserModel' + user_name = UnicodeAttribute(hash_key=True) + user_id = UnicodeAttribute(range_key=True) + throttle = Throttle('50') class UserModel(Model): """ @@ -208,9 +220,9 @@ def test_model_attrs(self): self.assertRaises(ValueError, UserModel.from_raw_data, None) - def test_update(self): + def test_refresh(self): """ - Model.update + Model.refresh """ with patch(PATCH_METHOD) as req: req.return_value = HttpOK(), MODEL_TABLE_DATA @@ -218,7 +230,7 @@ def test_update(self): with patch(PATCH_METHOD) as req: req.return_value = HttpOK(GET_MODEL_ITEM_DATA), GET_MODEL_ITEM_DATA - item.update() + item.refresh() self.assertEqual( item.user_name, GET_MODEL_ITEM_DATA.get(ITEM).get('user_name').get(STRING_SHORT)) @@ -243,6 +255,7 @@ def test_delete(self): 'S': 'foo' } }, + 'return_consumed_capacity': 'TOTAL', 'table_name': 'UserModel' } args = req.call_args[1] @@ -257,7 +270,7 @@ def test_save(self): item = UserModel('foo', 'bar') with patch(PATCH_METHOD) as req: - req.return_value = HttpOK(), None + req.return_value = HttpOK({}), {} item.save() args = req.call_args[1] params = { @@ -275,6 +288,7 @@ def test_save(self): 'S': u'foo' }, }, + 'return_consumed_capacity': 'TOTAL', 'table_name': 'UserModel' } @@ -311,7 +325,7 @@ def test_query(self): items.append(item) req.return_value = HttpOK({'Items': items}), {'Items': items} queried = [] - for item in UserModel.query('foo', user_id__gt='id-1'): + for item in UserModel.query('foo', user_id__gt='id-1', user_id__le='id-2'): queried.append(item.serialize()) self.assertTrue(len(queried) == len(items)) @@ -387,6 +401,30 @@ def test_query(self): queried.append(item.serialize()) self.assertTrue(len(queried) == len(items)) + def fake_query(*args, **kwargs): + start_key = kwargs.get(pythonic(EXCLUSIVE_START_KEY), None) + if start_key: + idx = 0 + for item in BATCH_GET_ITEMS.get(RESPONSES).get(UserModel.table_name): + idx += 1 + if item == start_key: + break + items = BATCH_GET_ITEMS.get(RESPONSES).get(UserModel.table_name)[idx:idx+1] + else: + items = BATCH_GET_ITEMS.get(RESPONSES).get(UserModel.table_name)[:1] + data = { + ITEMS: items, + LAST_EVALUATED_KEY: items[-1] if len(items) else None + } + return HttpOK(data), data + + FakeQuery = MagicMock() + FakeQuery.side_effect = fake_query + + with patch(PATCH_METHOD, new=FakeQuery) as req: + for item in UserModel.query('foo'): + self.assertIsNotNone(item) + def test_scan(self): """ Model.scan @@ -418,6 +456,7 @@ def fake_dynamodb(*args, **kwargs): if kwargs == {'table_name': UserModel.table_name}: return HttpOK(MODEL_TABLE_DATA), MODEL_TABLE_DATA elif kwargs == { + 'return_consumed_capacity': 'TOTAL', 'table_name': 'UserModel', 'key': {'user_name': {'S': 'foo'}, 'user_id': {'S': 'bar'}}, 'consistent_read': False}: @@ -443,6 +482,7 @@ def fake_dynamodb(*args, **kwargs): 'S': 'foo' } }, + 'return_consumed_capacity': 'TOTAL', 'table_name': 'UserModel' } self.assertEqual(req.call_args[1], params) @@ -503,6 +543,31 @@ def test_batch_get(self): args['request_items']['UserModel']['Keys'], ) + def fake_batch_get(*args, **kwargs): + if pythonic(REQUEST_ITEMS) in kwargs: + item = kwargs.get(pythonic(REQUEST_ITEMS)).get(UserModel.table_name).get(KEYS)[0] + items = kwargs.get(pythonic(REQUEST_ITEMS)).get(UserModel.table_name).get(KEYS)[1:] + response = { + UNPROCESSED_KEYS: { + UserModel.table_name: { + KEYS: items + } + }, + RESPONSES: { + UserModel.table_name: [item] + } + } + return HttpOK(response), response + return HttpOK({}), {} + + batch_get_mock = MagicMock() + batch_get_mock.side_effect = fake_batch_get + + with patch(PATCH_METHOD, new=batch_get_mock) as req: + item_keys = [('hash-{0}'.format(x), '{0}'.format(x)) for x in range(1000)] + for item in UserModel.batch_get(item_keys): + self.assertIsNotNone(item) + def test_batch_write(self): """ Model.batch_write @@ -512,7 +577,7 @@ def test_batch_write(self): UserModel('foo', 'bar') with patch(PATCH_METHOD) as req: - req.return_value = HttpOK(), None + req.return_value = HttpOK({}), {} with UserModel.batch_write(auto_commit=False) as batch: items = [UserModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(25)] @@ -536,15 +601,37 @@ def test_batch_write(self): for item in items: batch.save(item) + def fake_unprocessed_keys(*args, **kwargs): + if pythonic(REQUEST_ITEMS) in kwargs: + items = kwargs.get(pythonic(REQUEST_ITEMS)).get(UserModel.table_name)[1:] + unprocessed = { + UNPROCESSED_KEYS: { + UserModel.table_name: items + } + } + return HttpOK(unprocessed), unprocessed + return HttpOK({}), {} + + batch_write_mock = MagicMock() + batch_write_mock.side_effect = fake_unprocessed_keys + + with patch(PATCH_METHOD, new=batch_write_mock) as req: + items = [UserModel('hash-{0}'.format(x), '{0}'.format(x)) for x in range(500)] + for item in items: + batch.save(item) + def test_global_index(self): """ Models.GlobalSecondaryIndex """ + self.assertIsNotNone(IndexedModel.email_index.hash_key_attribute()) + with patch(PATCH_METHOD) as req: req.return_value = HttpOK(), MODEL_TABLE_DATA IndexedModel('foo', 'bar') scope_args = {'count': 0} + def fake_dynamodb(obj, **kwargs): if kwargs == {'table_name': UserModel.table_name}: if scope_args['count'] == 0: @@ -589,6 +676,7 @@ def test_local_index(self): LocalIndexedModel('foo', 'bar') scope_args = {'count': 0} + def fake_dynamodb(obj, **kwargs): if kwargs == {'table_name': UserModel.table_name}: if scope_args['count'] == 0: @@ -652,4 +740,17 @@ def test_projections(self): class BadIndex(Index): pass - self.assertRaises(ValueError, BadIndex) \ No newline at end of file + self.assertRaises(ValueError, BadIndex) + + def test_throttle(self): + """ + Throttle.add_record + """ + throt = Throttle(30) + throt.add_record(None) + for i in range(10): + throt.add_record(1) + throt.throttle() + for i in range(2): + throt.add_record(50) + throt.throttle() diff --git a/pynamodb/tests/test_table_connection.py b/pynamodb/tests/test_table_connection.py index 8360f33ff..d1952379f 100644 --- a/pynamodb/tests/test_table_connection.py +++ b/pynamodb/tests/test_table_connection.py @@ -197,6 +197,7 @@ def test_delete_item(self): "Amazon DynamoDB", "How do I update multiple items?") params = { + 'return_consumed_capacity': 'TOTAL', 'key': { 'ForumName': { 'S': 'Amazon DynamoDB' @@ -249,6 +250,7 @@ def test_update_item(self): 'Action': 'PUT' } }, + 'return_consumed_capacity': 'TOTAL', 'table_name': 'ci-table' } self.assertEqual(req.call_args[1], params) @@ -283,8 +285,11 @@ def test_put_item(self): range_key='foo-range-key', attributes={'ForumName': 'foo-value'} ) - params = {'table_name': self.test_table_name, - 'item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'table_name': self.test_table_name, + 'item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}} + } self.assertEqual(req.call_args[1], params) with patch(PATCH_METHOD) as req: @@ -294,8 +299,18 @@ def test_put_item(self): range_key='foo-range-key', attributes={'ForumName': 'foo-value'} ) - params = {'item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}, - 'table_name': self.test_table_name} + params = { + 'return_consumed_capacity': 'TOTAL', + 'item': { + 'ForumName': { + 'S': 'foo-value' + }, + 'Subject': { + 'S': 'foo-range-key' + } + }, + 'table_name': self.test_table_name + } self.assertEqual(req.call_args[1], params) def test_batch_write_item(self): @@ -316,20 +331,22 @@ def test_batch_write_item(self): conn.batch_write_item( put_items=items ) - params = {'request_items': { - self.test_table_name: [ - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, - {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}} - ] - } + params = { + 'return_consumed_capacity': 'TOTAL', + 'request_items': { + self.test_table_name: [ + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}}}, + {'PutRequest': {'Item': {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}}} + ] + } } self.assertEqual(req.call_args[1], params) @@ -352,17 +369,25 @@ def test_batch_get_item(self): conn.batch_get_item( items ) - params = {'request_items': {self.test_table_name: { - 'Keys': [{'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}, - {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}}]}}} + params = { + 'return_consumed_capacity': 'TOTAL', + 'request_items': { + self.test_table_name: { + 'Keys': [ + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-0'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-1'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-2'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-3'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-4'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-5'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-6'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-7'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-8'}}, + {'ForumName': {'S': 'FooForum'}, 'Subject': {'S': 'thread-9'}} + ] + } + } + } self.assertEqual(req.call_args[1], params) def test_query(self): @@ -380,6 +405,7 @@ def test_query(self): key_conditions={'ForumName': {'ComparisonOperator': 'BEGINS_WITH', 'AttributeValueList': ['thread']}} ) params = { + 'return_consumed_capacity': 'TOTAL', 'key_conditions': { 'ForumName': { 'ComparisonOperator': 'BEGINS_WITH', 'AttributeValueList': [{ @@ -402,5 +428,8 @@ def test_scan(self): with patch(PATCH_METHOD) as req: req.return_value = HttpOK(), {} conn.scan() - params = {'table_name': self.test_table_name} + params = { + 'return_consumed_capacity': 'TOTAL', + 'table_name': self.test_table_name + } self.assertEqual(req.call_args[1], params) diff --git a/pynamodb/throttle.py b/pynamodb/throttle.py new file mode 100644 index 000000000..d24ac5682 --- /dev/null +++ b/pynamodb/throttle.py @@ -0,0 +1,87 @@ +""" +PynamoDB Throttling +""" +import time +import logging + +log = logging.getLogger(__name__) + + +class ThrottleBase(object): + """ + A class to provide a throttling API to the user + """ + def __init__(self, capacity, window=1200, initial_sleep=None): + self.capacity = float(capacity) + self.window = window + self.records = [] + self.sleep_interval = initial_sleep if initial_sleep else 0.1 + + def add_record(self, record): + """ + Adds a ConsumedCapacity record to the dataset over `window` + """ + if record is None: + return + self._slice_records() + self.records.append({"time": time.time(), "record": record}) + + def _slice_records(self): + idx = 0 + now = time.time() + for record in self.records: + if now - record['time'] < self.window: + break + else: + idx += 1 + self.records = self.records[idx:] + + def throttle(self): + """ + Sleeps for the appropriate period of time, based on the current data + """ + return + + +class NoThrottle(ThrottleBase): + """ + The default throttle class, does nothing + """ + def __init__(self): + pass + + def add_record(self, record): + pass + + +class Throttle(ThrottleBase): + """ + The default throttling + + This class will aggressively throttle API calls if the throughput for a given window is over + the desired capacity. + + If the throughput is under the desired capacity, then API throttling will be reduced cautiously. + """ + def throttle(self): + """ + This uses a method similar to additive increase, multiplicative decrease + + # http://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease + """ + if not len(self.records) >= 2: + return + throughput = sum([value['record'] for value in self.records]) / float(time.time() - self.records[0]['time']) + + # Over capacity + if throughput > self.capacity: + self.sleep_interval *= 2 + # Under capacity + elif throughput < (.9 * self.capacity) and self.sleep_interval > 0.1: + self.sleep_interval -= self.sleep_interval * .10 + log.debug("Sleeping for {0}s, current throughput is {1} and desired throughput is {2}".format( + self.sleep_interval, + throughput, + self.capacity + )) + time.sleep(self.sleep_interval) diff --git a/requirements.txt b/requirements.txt index ab7139d0a..cfc7154aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,5 @@ python-dateutil==2.2 pytz==2013.9 requests==2.2.1 six==1.5.2 -tox==1.6.1 +tox==1.7.0 python-coveralls==2.4.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 45ffd1e1e..01f9b3b5c 100644 --- a/setup.py +++ b/setup.py @@ -6,13 +6,16 @@ if sys.argv[-1] == 'publish': os.system('python setup.py sdist upload') os.system('python setup.py bdist_wheel upload') + print("Now tag me :)") + print(" git tag -a {0} -m 'version {0}'".format(__import__('pynamodb').__version__)) + print(" git push --tags") sys.exit() setup( name='pynamodb', version=__import__('pynamodb').__version__, packages=find_packages(), - url='http://jlafon.io/pynamodb', + url='http://jlafon.io/pynamodb.html', author='Jharrod LaFon', author_email='jlafon@eyesopen.com', description='A Pythonic Interface to DynamoDB', @@ -32,5 +35,6 @@ 'Operating System :: OS Independent', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.3', + 'License :: OSI Approved :: MIT License', ], )