Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dax support #814

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/dax.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
.. _dax:

Use DAX
====================

Amazon DynamoDB Accelerator (DAX) is a write-through caching service that is designed to simplify the process of adding a cache to DynamoDB tables.


.. note::

'query' and 'scan' requests will not hit DAX due to serious consistent issues.

Because DAX operates separately from DynamoDB, it is important that you understand the consistency models of both DAX and DynamoDB to ensure that your applications behave as you expect.
See
`the documentation for more information <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAX.consistency.html>`__.


.. code-block:: python

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute


class Thread(Model):
class Meta:
table_name = "Thread"
dax_read_endpoints = ['xxxx:8111']
dax_write_endpoints = ['xxxx:8111']

forum_name = UnicodeAttribute(hash_key=True)

2 changes: 2 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Features
* Batch operations with automatic pagination
* Iterators for working with Query and Scan operations
* `Fully tested <https://coveralls.io/r/pynamodb/PynamoDB>`_
* Dax support

Topics
======
Expand Down Expand Up @@ -47,6 +48,7 @@ Topics
contributing
release_notes
versioning
dax

API docs
========
Expand Down
41 changes: 40 additions & 1 deletion pynamodb/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist,
VerboseClientError,
TransactGetError, TransactWriteError)
from amazondax.DaxError import DaxClientError
from pynamodb.expressions.condition import Condition
from pynamodb.expressions.operand import Path
from pynamodb.expressions.projection import create_projection_expression
from pynamodb.expressions.update import Action, Update
from pynamodb.settings import get_settings_value
from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send
from pynamodb.types import HASH, RANGE
from pynamodb.connection.dax import OP_READ, OP_WRITE, DaxClient
from pynamodb.util import snake_to_camel_case

BOTOCORE_EXCEPTIONS = (BotoCoreError, ClientError)
Expand Down Expand Up @@ -249,7 +251,9 @@ def __init__(self,
max_retry_attempts: Optional[int] = None,
base_backoff_ms: Optional[int] = None,
max_pool_connections: Optional[int] = None,
extra_headers: Optional[Mapping[str, str]] = None):
extra_headers: Optional[Mapping[str, str]] = None,
dax_write_endpoints: Optional[List[str]] = None,
dax_read_endpoints: Optional[List[str]] = None):
self._tables: Dict[str, MetaTable] = {}
self.host = host
self._local = local()
Expand Down Expand Up @@ -288,6 +292,16 @@ def __init__(self,
self._extra_headers = extra_headers
else:
self._extra_headers = get_settings_value('extra_headers')
if dax_write_endpoints is not None:
self.dax_write_endpoints = dax_write_endpoints
else:
self.dax_write_endpoints = get_settings_value('dax_write_endpoints')
if dax_read_endpoints is not None:
self.dax_read_endpoints = dax_read_endpoints
else:
self.dax_read_endpoints = get_settings_value('dax_read_endpoints')
self._dax_write_client: Optional[DaxClient] = None
self._dax_read_client: Optional[DaxClient] = None

def __repr__(self) -> str:
return "Connection<{}>".format(self.client.meta.endpoint_url)
Expand Down Expand Up @@ -360,6 +374,13 @@ def _make_api_call(self, operation_name, operation_kwargs):
1. It's faster to avoid using botocore's response parsing
2. It provides a place to monkey patch HTTP requests for unit testing
"""
try:
if operation_name in OP_WRITE and self.dax_write_endpoints:
return self.dax_write_client.dispatch(operation_name, operation_kwargs)
elif operation_name in OP_READ and self.dax_read_endpoints:
return self.dax_read_client.dispatch(operation_name, operation_kwargs)
except DaxClientError:
raise
operation_model = self.client._service_model.operation_model(operation_name)
request_dict = self.client._convert_to_request_dict(
operation_kwargs,
Expand Down Expand Up @@ -532,6 +553,24 @@ def client(self):
self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config)
return self._client

@property
def dax_write_client(self):
if self._dax_write_client is None:
self._dax_write_client = DaxClient(
endpoints=self.dax_write_endpoints,
region_name=self.region
)
return self._dax_write_client

@property
def dax_read_client(self):
if self._dax_read_client is None:
self._dax_read_client = DaxClient(
endpoints=self.dax_read_endpoints,
region_name=self.region
)
return self._dax_read_client

def get_meta_table(self, table_name: str, refresh: bool = False):
"""
Returns a MetaTable
Expand Down
37 changes: 37 additions & 0 deletions pynamodb/connection/dax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from amazondax import AmazonDaxClient


OP_WRITE = {
'PutItem': 'put_item',
'DeleteItem': 'delete_item',
'UpdateItem': 'update_item',
'BatchWriteItem': 'batch_write_item',
'TransactWriteItems': 'transact_write_items',

}

OP_READ = {
'GetItem': 'get_item',
'BatchGetItem': 'batch_get_item',
# query and scan has a serious consistency issue
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAX.consistency.html#DAX.consistency.query-cache
# 'Query': 'query',
# 'Scan': 'scan',
'TransactGetItems': 'transact_get_items',
}

OP_NAME_TO_METHOD = OP_WRITE.copy()
OP_NAME_TO_METHOD.update(OP_READ)


class DaxClient(object):

def __init__(self, endpoints, region_name):
self.connection = AmazonDaxClient(
endpoints=endpoints,
region_name=region_name
)

def dispatch(self, operation_name, kwargs):
method = getattr(self.connection, OP_NAME_TO_METHOD[operation_name])
return method(**kwargs)
12 changes: 10 additions & 2 deletions pynamodb/connection/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""

from typing import Any, Dict, Mapping, Optional, Sequence
from typing import Any, Dict, Mapping, Optional, Sequence, List

from pynamodb.connection.base import Connection, MetaTable
from pynamodb.constants import DEFAULT_BILLING_MODE, KEY
Expand All @@ -30,18 +30,26 @@ def __init__(
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
dax_write_endpoints: Optional[List[str]] = None,
dax_read_endpoints: Optional[List[str]] = None
) -> None:
self._hash_keyname = None
self._range_keyname = None
self.table_name = table_name
if not dax_read_endpoints:
dax_read_endpoints = []
if not dax_write_endpoints:
dax_write_endpoints = []
self.connection = Connection(region=region,
host=host,
connect_timeout_seconds=connect_timeout_seconds,
read_timeout_seconds=read_timeout_seconds,
max_retry_attempts=max_retry_attempts,
base_backoff_ms=base_backoff_ms,
max_pool_connections=max_pool_connections,
extra_headers=extra_headers)
extra_headers=extra_headers,
dax_write_endpoints=dax_write_endpoints,
dax_read_endpoints=dax_read_endpoints)

if aws_access_key_id and aws_secret_access_key:
self.connection.session.set_credentials(aws_access_key_id,
Expand Down
10 changes: 9 additions & 1 deletion pynamodb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ class MetaModel(AttributeContainerMeta):
aws_session_token: Optional[str]
billing_mode: Optional[str]
stream_view_type: Optional[str]
dax_write_endpoints: Optional[List[str]]
dax_read_endpoints: Optional[List[str]]

"""
Model meta class
Expand Down Expand Up @@ -242,6 +244,10 @@ def __init__(self, name: str, bases: Any, attrs: Dict[str, Any]) -> None:
setattr(attr_obj, 'aws_secret_access_key', None)
if not hasattr(attr_obj, 'aws_session_token'):
setattr(attr_obj, 'aws_session_token', None)
if not hasattr(attr_obj, 'dax_write_endpoints'):
setattr(attr_obj, 'dax_write_endpoints', get_settings_value('dax_write_endpoints'))
if not hasattr(attr_obj, 'dax_read_endpoints'):
setattr(attr_obj, 'dax_read_endpoints', get_settings_value('dax_read_endpoints'))
elif isinstance(attr_obj, Index):
attr_obj.Meta.model = cls
if not hasattr(attr_obj.Meta, "index_name"):
Expand Down Expand Up @@ -1108,7 +1114,9 @@ def _get_connection(cls) -> TableConnection:
extra_headers=cls.Meta.extra_headers,
aws_access_key_id=cls.Meta.aws_access_key_id,
aws_secret_access_key=cls.Meta.aws_secret_access_key,
aws_session_token=cls.Meta.aws_session_token)
aws_session_token=cls.Meta.aws_session_token,
dax_write_endpoints=cls.Meta.dax_write_endpoints,
dax_read_endpoints=cls.Meta.dax_read_endpoints)
return cls._connection

def _deserialize(self, attrs):
Expand Down
2 changes: 2 additions & 0 deletions pynamodb/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
'region': 'us-east-1',
'max_pool_connections': 10,
'extra_headers': None,
'dax_write_endpoints': [],
'dax_read_endpoints': []
}

OVERRIDE_SETTINGS_PATH = getenv('PYNAMODB_CONFIG', '/etc/pynamodb/global_default_settings.py')
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pytest>=5
pytest-env
pytest-mock
amazon-dax-client>=1.1.7

# Due to https://github.com/boto/botocore/issues/1872. Remove after botocore fixes.
python-dateutil==2.8.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
install_requires = [
'botocore>=1.12.54',
'python-dateutil>=2.1,<3.0.0',
'amazon-dax-client>=1.1.7'
]

setup(
Expand Down